| import httpx | |
| from urllib.parse import quote, urlparse | |
| from fastapi import Request | |
| from fastapi.responses import Response | |
| from config import Config | |
| from cache_manager import cache | |
| from utils import get_auth, get_channels, rewrite_m3u8, extract_playlist_url | |
| def get_client_base_url(request: Request) -> str: | |
| scheme = request.url.scheme | |
| netloc = request.url.netloc | |
| return f"{scheme}://{netloc}" | |
| async def get_live_m3u8_url(chid: str, auth: dict, retry_count: int = 0) -> str: | |
| try: | |
| channels = await get_channels(auth) | |
| channel = next((ch for ch in channels if str(ch['no']) == chid), None) | |
| if not channel: | |
| raise ValueError(f"Channel {chid} not found") | |
| cache_key = f"live_{chid}" | |
| playlist_url = cache.get_stream(cache_key) | |
| if not playlist_url: | |
| source_url = ( | |
| f"{auth['vms_host']}{channel['playpath']}.M3U8" | |
| f"?type=live&__cross_domain_user={quote(auth['access_token'])}" | |
| ) | |
| source_url = source_url.replace(auth['vms_host'], Config.UPSTREAM_HOSTS['live']) | |
| headers = { | |
| 'Referer': Config.REQUIRED_REFERER, | |
| 'User-Agent': 'Mozilla/5.0' | |
| } | |
| async with httpx.AsyncClient( | |
| timeout=httpx.Timeout(30.0, connect=5.0), | |
| follow_redirects=True, | |
| limits=httpx.Limits(max_keepalive_connections=50, max_connections=200) | |
| ) as client: | |
| main_response = await client.get(source_url, headers=headers) | |
| if main_response.status_code in [401, 403] and retry_count < 2: | |
| new_auth = await get_auth(force=True) | |
| return await get_live_m3u8_url(chid, new_auth, retry_count + 1) | |
| main_response.raise_for_status() | |
| main_content = main_response.text | |
| playlist_url = extract_playlist_url(main_content, source_url) | |
| if not playlist_url: | |
| playlist_url = source_url | |
| cache.set_stream(cache_key, playlist_url) | |
| return playlist_url | |
| except httpx.HTTPStatusError as e: | |
| if e.response.status_code in [401, 403] and retry_count < 2: | |
| new_auth = await get_auth(force=True) | |
| return await get_live_m3u8_url(chid, new_auth, retry_count + 1) | |
| raise e | |
| async def proxy_live_stream_direct(chid: str, request: Request) -> Response: | |
| try: | |
| auth = await get_auth() | |
| playlist_url = await get_live_m3u8_url(chid, auth) | |
| headers = { | |
| 'Referer': Config.REQUIRED_REFERER, | |
| 'User-Agent': 'Mozilla/5.0' | |
| } | |
| async with httpx.AsyncClient( | |
| timeout=httpx.Timeout(30.0, connect=5.0), | |
| follow_redirects=True, | |
| limits=httpx.Limits(max_keepalive_connections=50, max_connections=200) | |
| ) as client: | |
| playlist_response = await client.get(playlist_url, headers=headers) | |
| playlist_response.raise_for_status() | |
| playlist_content = playlist_response.text | |
| parsed = urlparse(playlist_url) | |
| playlist_path = parsed.path + ('?' + parsed.query if parsed.query else '') | |
| worker_base = get_client_base_url(request) | |
| rewritten = rewrite_m3u8(playlist_content, playlist_path, worker_base) | |
| return Response( | |
| content=rewritten, | |
| media_type='application/vnd.apple.mpegurl', | |
| headers={ | |
| 'Cache-Control': 'public, max-age=10', | |
| 'Access-Control-Allow-Origin': '*', | |
| 'Access-Control-Allow-Headers': 'Range', | |
| 'Access-Control-Expose-Headers': 'Content-Length, Content-Range' | |
| } | |
| ) | |
| except httpx.HTTPError as e: | |
| return Response( | |
| content=f'{{"error": "Upstream error: {str(e)}"}}', | |
| status_code=502, | |
| media_type='application/json' | |
| ) | |
| except Exception as e: | |
| return Response( | |
| content=f'{{"error": "{str(e)}"}}', | |
| status_code=500, | |
| media_type='application/json' | |
| ) | |
| async def proxy_playback_stream(path: str, request: Request, retry_count: int = 0) -> Response: | |
| try: | |
| auth = await get_auth() | |
| if not path.startswith('query/'): | |
| path = f"query/{path}" | |
| vod_path = path.replace('.m3u8', '') | |
| cache_key = f"vod_{vod_path.replace('/', '_').replace('=', '')}" | |
| playlist_url = cache.get_stream(cache_key) | |
| if not playlist_url: | |
| source_url = ( | |
| f"{Config.UPSTREAM_HOSTS['vod']}/{vod_path}.m3u8" | |
| f"?type=vod&__cross_domain_user={quote(auth['access_token'])}" | |
| ) | |
| headers = { | |
| 'Referer': Config.REQUIRED_REFERER, | |
| 'User-Agent': 'Mozilla/5.0' | |
| } | |
| async with httpx.AsyncClient( | |
| timeout=httpx.Timeout(30.0, connect=5.0), | |
| follow_redirects=True, | |
| limits=httpx.Limits(max_keepalive_connections=50, max_connections=200) | |
| ) as client: | |
| main_response = await client.get(source_url, headers=headers) | |
| if main_response.status_code in [401, 403] and retry_count < 2: | |
| new_auth = await get_auth(force=True) | |
| return await proxy_playback_stream(path, request, retry_count + 1) | |
| if not main_response.is_success: | |
| raise Exception(f"VOD source failed: HTTP {main_response.status_code}") | |
| main_content = main_response.text | |
| playlist_url = extract_playlist_url(main_content, source_url) | |
| if not playlist_url: | |
| playlist_url = source_url | |
| cache.set_stream(cache_key, playlist_url) | |
| headers = { | |
| 'Referer': Config.REQUIRED_REFERER, | |
| 'User-Agent': 'Mozilla/5.0' | |
| } | |
| async with httpx.AsyncClient( | |
| timeout=httpx.Timeout(30.0, connect=5.0), | |
| follow_redirects=True, | |
| limits=httpx.Limits(max_keepalive_connections=50, max_connections=200) | |
| ) as client: | |
| playlist_response = await client.get(playlist_url, headers=headers) | |
| if not playlist_response.is_success: | |
| raise Exception(f"VOD playlist failed: HTTP {playlist_response.status_code}") | |
| playlist_content = playlist_response.text | |
| parsed = urlparse(playlist_url) | |
| playlist_path = parsed.path + ('?' + parsed.query if parsed.query else '') | |
| worker_base = get_client_base_url(request) | |
| rewritten = rewrite_m3u8(playlist_content, playlist_path, worker_base) | |
| return Response( | |
| content=rewritten, | |
| media_type='application/vnd.apple.mpegurl', | |
| headers={ | |
| 'Cache-Control': 'public, max-age=60', | |
| 'Access-Control-Allow-Origin': '*', | |
| 'Access-Control-Allow-Headers': 'Range', | |
| 'Access-Control-Expose-Headers': 'Content-Length, Content-Range' | |
| } | |
| ) | |
| except httpx.HTTPStatusError as e: | |
| if e.response.status_code in [401, 403] and retry_count < 2: | |
| new_auth = await get_auth(force=True) | |
| return await proxy_playback_stream(path, request, retry_count + 1) | |
| return Response( | |
| content=f'{{"error": "Upstream error: {str(e)}"}}', | |
| status_code=502, | |
| media_type='application/json' | |
| ) | |
| except Exception as e: | |
| return Response( | |
| content=f'{{"error": "{str(e)}"}}', | |
| status_code=500, | |
| media_type='application/json' | |
| ) | |
| async def proxy_media(request: Request, path: str, retry_count: int = 0) -> Response: | |
| try: | |
| auth = await get_auth() | |
| if path.startswith('/live/'): | |
| upstream_host = Config.UPSTREAM_HOSTS['live'] | |
| elif path.startswith('/vod/') or path.startswith('/query/'): | |
| upstream_host = Config.UPSTREAM_HOSTS['vod'] | |
| else: | |
| upstream_host = Config.UPSTREAM_HOSTS['live'] | |
| query = str(request.url.query) | |
| upstream_url = f"{upstream_host}{path}" | |
| if query: | |
| upstream_url += f"?{query}" | |
| headers = { | |
| 'Referer': Config.REQUIRED_REFERER, | |
| 'User-Agent': 'Mozilla/5.0' | |
| } | |
| range_header = request.headers.get('Range') | |
| if range_header: | |
| headers['Range'] = range_header | |
| async with httpx.AsyncClient( | |
| timeout=httpx.Timeout(30.0, connect=5.0), | |
| follow_redirects=True, | |
| limits=httpx.Limits( | |
| max_keepalive_connections=50, | |
| max_connections=200, | |
| keepalive_expiry=30.0 | |
| ) | |
| ) as client: | |
| response = await client.get(upstream_url, headers=headers) | |
| if response.status_code in [401, 403] and retry_count < 2: | |
| new_auth = await get_auth(force=True) | |
| return await proxy_media(request, path, retry_count + 1) | |
| response.raise_for_status() | |
| content_type = response.headers.get('Content-Type', '') | |
| if 'mpegurl' in content_type or path.endswith(('.m3u8', '.M3U8')): | |
| content = response.text | |
| worker_base = get_client_base_url(request) | |
| full_path = path | |
| if query: | |
| full_path += f"?{query}" | |
| rewritten = rewrite_m3u8(content, full_path, worker_base) | |
| return Response( | |
| content=rewritten, | |
| media_type='application/vnd.apple.mpegurl', | |
| headers={ | |
| 'Cache-Control': 'public, max-age=10', | |
| 'Access-Control-Allow-Origin': '*' | |
| } | |
| ) | |
| response_headers = { | |
| 'Content-Type': content_type or 'video/MP2T', | |
| 'Cache-Control': 'public, max-age=86400', | |
| 'Access-Control-Allow-Origin': '*', | |
| 'Access-Control-Allow-Headers': 'Range', | |
| 'Access-Control-Expose-Headers': 'Content-Length, Content-Range' | |
| } | |
| if 'Content-Length' in response.headers: | |
| response_headers['Content-Length'] = response.headers['Content-Length'] | |
| if 'Content-Range' in response.headers: | |
| response_headers['Content-Range'] = response.headers['Content-Range'] | |
| if 'Accept-Ranges' in response.headers: | |
| response_headers['Accept-Ranges'] = response.headers['Accept-Ranges'] | |
| return Response( | |
| content=response.content, | |
| status_code=response.status_code, | |
| headers=response_headers | |
| ) | |
| except httpx.HTTPStatusError as e: | |
| if e.response.status_code in [401, 403] and retry_count < 2: | |
| new_auth = await get_auth(force=True) | |
| return await proxy_media(request, path, retry_count + 1) | |
| return Response( | |
| content=f'{{"error": "Upstream error: {str(e)}"}}', | |
| status_code=502, | |
| media_type='application/json' | |
| ) | |
| except Exception as e: | |
| return Response( | |
| content=f'{{"error": "{str(e)}"}}', | |
| status_code=500, | |
| media_type='application/json' | |
| ) |