gate / proxy_handler.py
harii66's picture
Upload 23 files
b4edbc0 verified
import httpx
from urllib.parse import quote, urlparse
from fastapi import Request
from fastapi.responses import Response
from config import Config
from cache_manager import cache
from utils import get_auth, get_channels, rewrite_m3u8, extract_playlist_url
def get_client_base_url(request: Request) -> str:
scheme = request.url.scheme
netloc = request.url.netloc
return f"{scheme}://{netloc}"
async def get_live_m3u8_url(chid: str, auth: dict, retry_count: int = 0) -> str:
try:
channels = await get_channels(auth)
channel = next((ch for ch in channels if str(ch['no']) == chid), None)
if not channel:
raise ValueError(f"Channel {chid} not found")
cache_key = f"live_{chid}"
playlist_url = cache.get_stream(cache_key)
if not playlist_url:
source_url = (
f"{auth['vms_host']}{channel['playpath']}.M3U8"
f"?type=live&__cross_domain_user={quote(auth['access_token'])}"
)
source_url = source_url.replace(auth['vms_host'], Config.UPSTREAM_HOSTS['live'])
headers = {
'Referer': Config.REQUIRED_REFERER,
'User-Agent': 'Mozilla/5.0'
}
async with httpx.AsyncClient(
timeout=httpx.Timeout(30.0, connect=5.0),
follow_redirects=True,
limits=httpx.Limits(max_keepalive_connections=50, max_connections=200)
) as client:
main_response = await client.get(source_url, headers=headers)
if main_response.status_code in [401, 403] and retry_count < 2:
new_auth = await get_auth(force=True)
return await get_live_m3u8_url(chid, new_auth, retry_count + 1)
main_response.raise_for_status()
main_content = main_response.text
playlist_url = extract_playlist_url(main_content, source_url)
if not playlist_url:
playlist_url = source_url
cache.set_stream(cache_key, playlist_url)
return playlist_url
except httpx.HTTPStatusError as e:
if e.response.status_code in [401, 403] and retry_count < 2:
new_auth = await get_auth(force=True)
return await get_live_m3u8_url(chid, new_auth, retry_count + 1)
raise e
async def proxy_live_stream_direct(chid: str, request: Request) -> Response:
try:
auth = await get_auth()
playlist_url = await get_live_m3u8_url(chid, auth)
headers = {
'Referer': Config.REQUIRED_REFERER,
'User-Agent': 'Mozilla/5.0'
}
async with httpx.AsyncClient(
timeout=httpx.Timeout(30.0, connect=5.0),
follow_redirects=True,
limits=httpx.Limits(max_keepalive_connections=50, max_connections=200)
) as client:
playlist_response = await client.get(playlist_url, headers=headers)
playlist_response.raise_for_status()
playlist_content = playlist_response.text
parsed = urlparse(playlist_url)
playlist_path = parsed.path + ('?' + parsed.query if parsed.query else '')
worker_base = get_client_base_url(request)
rewritten = rewrite_m3u8(playlist_content, playlist_path, worker_base)
return Response(
content=rewritten,
media_type='application/vnd.apple.mpegurl',
headers={
'Cache-Control': 'public, max-age=10',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Range',
'Access-Control-Expose-Headers': 'Content-Length, Content-Range'
}
)
except httpx.HTTPError as e:
return Response(
content=f'{{"error": "Upstream error: {str(e)}"}}',
status_code=502,
media_type='application/json'
)
except Exception as e:
return Response(
content=f'{{"error": "{str(e)}"}}',
status_code=500,
media_type='application/json'
)
async def proxy_playback_stream(path: str, request: Request, retry_count: int = 0) -> Response:
try:
auth = await get_auth()
if not path.startswith('query/'):
path = f"query/{path}"
vod_path = path.replace('.m3u8', '')
cache_key = f"vod_{vod_path.replace('/', '_').replace('=', '')}"
playlist_url = cache.get_stream(cache_key)
if not playlist_url:
source_url = (
f"{Config.UPSTREAM_HOSTS['vod']}/{vod_path}.m3u8"
f"?type=vod&__cross_domain_user={quote(auth['access_token'])}"
)
headers = {
'Referer': Config.REQUIRED_REFERER,
'User-Agent': 'Mozilla/5.0'
}
async with httpx.AsyncClient(
timeout=httpx.Timeout(30.0, connect=5.0),
follow_redirects=True,
limits=httpx.Limits(max_keepalive_connections=50, max_connections=200)
) as client:
main_response = await client.get(source_url, headers=headers)
if main_response.status_code in [401, 403] and retry_count < 2:
new_auth = await get_auth(force=True)
return await proxy_playback_stream(path, request, retry_count + 1)
if not main_response.is_success:
raise Exception(f"VOD source failed: HTTP {main_response.status_code}")
main_content = main_response.text
playlist_url = extract_playlist_url(main_content, source_url)
if not playlist_url:
playlist_url = source_url
cache.set_stream(cache_key, playlist_url)
headers = {
'Referer': Config.REQUIRED_REFERER,
'User-Agent': 'Mozilla/5.0'
}
async with httpx.AsyncClient(
timeout=httpx.Timeout(30.0, connect=5.0),
follow_redirects=True,
limits=httpx.Limits(max_keepalive_connections=50, max_connections=200)
) as client:
playlist_response = await client.get(playlist_url, headers=headers)
if not playlist_response.is_success:
raise Exception(f"VOD playlist failed: HTTP {playlist_response.status_code}")
playlist_content = playlist_response.text
parsed = urlparse(playlist_url)
playlist_path = parsed.path + ('?' + parsed.query if parsed.query else '')
worker_base = get_client_base_url(request)
rewritten = rewrite_m3u8(playlist_content, playlist_path, worker_base)
return Response(
content=rewritten,
media_type='application/vnd.apple.mpegurl',
headers={
'Cache-Control': 'public, max-age=60',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Range',
'Access-Control-Expose-Headers': 'Content-Length, Content-Range'
}
)
except httpx.HTTPStatusError as e:
if e.response.status_code in [401, 403] and retry_count < 2:
new_auth = await get_auth(force=True)
return await proxy_playback_stream(path, request, retry_count + 1)
return Response(
content=f'{{"error": "Upstream error: {str(e)}"}}',
status_code=502,
media_type='application/json'
)
except Exception as e:
return Response(
content=f'{{"error": "{str(e)}"}}',
status_code=500,
media_type='application/json'
)
async def proxy_media(request: Request, path: str, retry_count: int = 0) -> Response:
try:
auth = await get_auth()
if path.startswith('/live/'):
upstream_host = Config.UPSTREAM_HOSTS['live']
elif path.startswith('/vod/') or path.startswith('/query/'):
upstream_host = Config.UPSTREAM_HOSTS['vod']
else:
upstream_host = Config.UPSTREAM_HOSTS['live']
query = str(request.url.query)
upstream_url = f"{upstream_host}{path}"
if query:
upstream_url += f"?{query}"
headers = {
'Referer': Config.REQUIRED_REFERER,
'User-Agent': 'Mozilla/5.0'
}
range_header = request.headers.get('Range')
if range_header:
headers['Range'] = range_header
async with httpx.AsyncClient(
timeout=httpx.Timeout(30.0, connect=5.0),
follow_redirects=True,
limits=httpx.Limits(
max_keepalive_connections=50,
max_connections=200,
keepalive_expiry=30.0
)
) as client:
response = await client.get(upstream_url, headers=headers)
if response.status_code in [401, 403] and retry_count < 2:
new_auth = await get_auth(force=True)
return await proxy_media(request, path, retry_count + 1)
response.raise_for_status()
content_type = response.headers.get('Content-Type', '')
if 'mpegurl' in content_type or path.endswith(('.m3u8', '.M3U8')):
content = response.text
worker_base = get_client_base_url(request)
full_path = path
if query:
full_path += f"?{query}"
rewritten = rewrite_m3u8(content, full_path, worker_base)
return Response(
content=rewritten,
media_type='application/vnd.apple.mpegurl',
headers={
'Cache-Control': 'public, max-age=10',
'Access-Control-Allow-Origin': '*'
}
)
response_headers = {
'Content-Type': content_type or 'video/MP2T',
'Cache-Control': 'public, max-age=86400',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Range',
'Access-Control-Expose-Headers': 'Content-Length, Content-Range'
}
if 'Content-Length' in response.headers:
response_headers['Content-Length'] = response.headers['Content-Length']
if 'Content-Range' in response.headers:
response_headers['Content-Range'] = response.headers['Content-Range']
if 'Accept-Ranges' in response.headers:
response_headers['Accept-Ranges'] = response.headers['Accept-Ranges']
return Response(
content=response.content,
status_code=response.status_code,
headers=response_headers
)
except httpx.HTTPStatusError as e:
if e.response.status_code in [401, 403] and retry_count < 2:
new_auth = await get_auth(force=True)
return await proxy_media(request, path, retry_count + 1)
return Response(
content=f'{{"error": "Upstream error: {str(e)}"}}',
status_code=502,
media_type='application/json'
)
except Exception as e:
return Response(
content=f'{{"error": "{str(e)}"}}',
status_code=500,
media_type='application/json'
)