import httpx from urllib.parse import quote, urlparse from fastapi import Request from fastapi.responses import Response from config import Config from cache_manager import cache from utils import get_auth, get_channels, rewrite_m3u8, extract_playlist_url def get_client_base_url(request: Request) -> str: scheme = request.url.scheme netloc = request.url.netloc return f"{scheme}://{netloc}" async def get_live_m3u8_url(chid: str, auth: dict, retry_count: int = 0) -> str: try: channels = await get_channels(auth) channel = next((ch for ch in channels if str(ch['no']) == chid), None) if not channel: raise ValueError(f"Channel {chid} not found") cache_key = f"live_{chid}" playlist_url = cache.get_stream(cache_key) if not playlist_url: source_url = ( f"{auth['vms_host']}{channel['playpath']}.M3U8" f"?type=live&__cross_domain_user={quote(auth['access_token'])}" ) source_url = source_url.replace(auth['vms_host'], Config.UPSTREAM_HOSTS['live']) headers = { 'Referer': Config.REQUIRED_REFERER, 'User-Agent': 'Mozilla/5.0' } async with httpx.AsyncClient( timeout=httpx.Timeout(30.0, connect=5.0), follow_redirects=True, limits=httpx.Limits(max_keepalive_connections=50, max_connections=200) ) as client: main_response = await client.get(source_url, headers=headers) if main_response.status_code in [401, 403] and retry_count < 2: new_auth = await get_auth(force=True) return await get_live_m3u8_url(chid, new_auth, retry_count + 1) main_response.raise_for_status() main_content = main_response.text playlist_url = extract_playlist_url(main_content, source_url) if not playlist_url: playlist_url = source_url cache.set_stream(cache_key, playlist_url) return playlist_url except httpx.HTTPStatusError as e: if e.response.status_code in [401, 403] and retry_count < 2: new_auth = await get_auth(force=True) return await get_live_m3u8_url(chid, new_auth, retry_count + 1) raise e async def proxy_live_stream_direct(chid: str, request: Request) -> Response: try: auth = await get_auth() playlist_url = await get_live_m3u8_url(chid, auth) headers = { 'Referer': Config.REQUIRED_REFERER, 'User-Agent': 'Mozilla/5.0' } async with httpx.AsyncClient( timeout=httpx.Timeout(30.0, connect=5.0), follow_redirects=True, limits=httpx.Limits(max_keepalive_connections=50, max_connections=200) ) as client: playlist_response = await client.get(playlist_url, headers=headers) playlist_response.raise_for_status() playlist_content = playlist_response.text parsed = urlparse(playlist_url) playlist_path = parsed.path + ('?' + parsed.query if parsed.query else '') worker_base = get_client_base_url(request) rewritten = rewrite_m3u8(playlist_content, playlist_path, worker_base) return Response( content=rewritten, media_type='application/vnd.apple.mpegurl', headers={ 'Cache-Control': 'public, max-age=10', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Headers': 'Range', 'Access-Control-Expose-Headers': 'Content-Length, Content-Range' } ) except httpx.HTTPError as e: return Response( content=f'{{"error": "Upstream error: {str(e)}"}}', status_code=502, media_type='application/json' ) except Exception as e: return Response( content=f'{{"error": "{str(e)}"}}', status_code=500, media_type='application/json' ) async def proxy_playback_stream(path: str, request: Request, retry_count: int = 0) -> Response: try: auth = await get_auth() if not path.startswith('query/'): path = f"query/{path}" vod_path = path.replace('.m3u8', '') cache_key = f"vod_{vod_path.replace('/', '_').replace('=', '')}" playlist_url = cache.get_stream(cache_key) if not playlist_url: source_url = ( f"{Config.UPSTREAM_HOSTS['vod']}/{vod_path}.m3u8" f"?type=vod&__cross_domain_user={quote(auth['access_token'])}" ) headers = { 'Referer': Config.REQUIRED_REFERER, 'User-Agent': 'Mozilla/5.0' } async with httpx.AsyncClient( timeout=httpx.Timeout(30.0, connect=5.0), follow_redirects=True, limits=httpx.Limits(max_keepalive_connections=50, max_connections=200) ) as client: main_response = await client.get(source_url, headers=headers) if main_response.status_code in [401, 403] and retry_count < 2: new_auth = await get_auth(force=True) return await proxy_playback_stream(path, request, retry_count + 1) if not main_response.is_success: raise Exception(f"VOD source failed: HTTP {main_response.status_code}") main_content = main_response.text playlist_url = extract_playlist_url(main_content, source_url) if not playlist_url: playlist_url = source_url cache.set_stream(cache_key, playlist_url) headers = { 'Referer': Config.REQUIRED_REFERER, 'User-Agent': 'Mozilla/5.0' } async with httpx.AsyncClient( timeout=httpx.Timeout(30.0, connect=5.0), follow_redirects=True, limits=httpx.Limits(max_keepalive_connections=50, max_connections=200) ) as client: playlist_response = await client.get(playlist_url, headers=headers) if not playlist_response.is_success: raise Exception(f"VOD playlist failed: HTTP {playlist_response.status_code}") playlist_content = playlist_response.text parsed = urlparse(playlist_url) playlist_path = parsed.path + ('?' + parsed.query if parsed.query else '') worker_base = get_client_base_url(request) rewritten = rewrite_m3u8(playlist_content, playlist_path, worker_base) return Response( content=rewritten, media_type='application/vnd.apple.mpegurl', headers={ 'Cache-Control': 'public, max-age=60', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Headers': 'Range', 'Access-Control-Expose-Headers': 'Content-Length, Content-Range' } ) except httpx.HTTPStatusError as e: if e.response.status_code in [401, 403] and retry_count < 2: new_auth = await get_auth(force=True) return await proxy_playback_stream(path, request, retry_count + 1) return Response( content=f'{{"error": "Upstream error: {str(e)}"}}', status_code=502, media_type='application/json' ) except Exception as e: return Response( content=f'{{"error": "{str(e)}"}}', status_code=500, media_type='application/json' ) async def proxy_media(request: Request, path: str, retry_count: int = 0) -> Response: try: auth = await get_auth() if path.startswith('/live/'): upstream_host = Config.UPSTREAM_HOSTS['live'] elif path.startswith('/vod/') or path.startswith('/query/'): upstream_host = Config.UPSTREAM_HOSTS['vod'] else: upstream_host = Config.UPSTREAM_HOSTS['live'] query = str(request.url.query) upstream_url = f"{upstream_host}{path}" if query: upstream_url += f"?{query}" headers = { 'Referer': Config.REQUIRED_REFERER, 'User-Agent': 'Mozilla/5.0' } range_header = request.headers.get('Range') if range_header: headers['Range'] = range_header async with httpx.AsyncClient( timeout=httpx.Timeout(30.0, connect=5.0), follow_redirects=True, limits=httpx.Limits( max_keepalive_connections=50, max_connections=200, keepalive_expiry=30.0 ) ) as client: response = await client.get(upstream_url, headers=headers) if response.status_code in [401, 403] and retry_count < 2: new_auth = await get_auth(force=True) return await proxy_media(request, path, retry_count + 1) response.raise_for_status() content_type = response.headers.get('Content-Type', '') if 'mpegurl' in content_type or path.endswith(('.m3u8', '.M3U8')): content = response.text worker_base = get_client_base_url(request) full_path = path if query: full_path += f"?{query}" rewritten = rewrite_m3u8(content, full_path, worker_base) return Response( content=rewritten, media_type='application/vnd.apple.mpegurl', headers={ 'Cache-Control': 'public, max-age=10', 'Access-Control-Allow-Origin': '*' } ) response_headers = { 'Content-Type': content_type or 'video/MP2T', 'Cache-Control': 'public, max-age=86400', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Headers': 'Range', 'Access-Control-Expose-Headers': 'Content-Length, Content-Range' } if 'Content-Length' in response.headers: response_headers['Content-Length'] = response.headers['Content-Length'] if 'Content-Range' in response.headers: response_headers['Content-Range'] = response.headers['Content-Range'] if 'Accept-Ranges' in response.headers: response_headers['Accept-Ranges'] = response.headers['Accept-Ranges'] return Response( content=response.content, status_code=response.status_code, headers=response_headers ) except httpx.HTTPStatusError as e: if e.response.status_code in [401, 403] and retry_count < 2: new_auth = await get_auth(force=True) return await proxy_media(request, path, retry_count + 1) return Response( content=f'{{"error": "Upstream error: {str(e)}"}}', status_code=502, media_type='application/json' ) except Exception as e: return Response( content=f'{{"error": "{str(e)}"}}', status_code=500, media_type='application/json' )