| """
|
| Reverse proxy for virtual ports.
|
|
|
| Single Responsibility: only handles HTTP/WebSocket proxying.
|
| Port CRUD is in ports.py — separate concern.
|
| """
|
|
|
| import asyncio
|
| import json
|
|
|
| import httpx
|
| from fastapi import APIRouter, Depends, Request, WebSocket, WebSocketDisconnect
|
| from fastapi.responses import Response
|
|
|
| from auth import AuthUser, get_current_user, get_ws_user
|
| from config import MIN_PORT, MAX_PORT
|
| from storage import load_meta, check_zone_owner
|
|
|
| router = APIRouter(tags=["proxy"])
|
|
|
|
|
|
|
| _HOP_HEADERS = frozenset({
|
| "connection", "keep-alive", "proxy-authenticate", "proxy-authorization",
|
| "te", "trailers", "transfer-encoding", "upgrade",
|
| })
|
|
|
| _client: httpx.AsyncClient | None = None
|
|
|
|
|
| def _get_client() -> httpx.AsyncClient:
|
| global _client
|
| if _client is None:
|
| _client = httpx.AsyncClient(
|
| timeout=httpx.Timeout(30.0, connect=5.0),
|
| follow_redirects=False,
|
| limits=httpx.Limits(max_connections=50),
|
| )
|
| return _client
|
|
|
|
|
| def _validate_proxy_access(zone_name: str, port: int):
|
| """Validate port range and check it's registered for the zone."""
|
| if not (MIN_PORT <= port <= MAX_PORT):
|
| raise ValueError(f"Port must be between {MIN_PORT} and {MAX_PORT}")
|
| meta = load_meta()
|
| if zone_name not in meta:
|
| raise ValueError(f"Zone '{zone_name}' does not exist")
|
| ports = meta[zone_name].get("ports", [])
|
| if not any(p["port"] == port for p in ports):
|
| raise ValueError("Port not mapped")
|
|
|
|
|
|
|
|
|
| @router.api_route(
|
| "/port/{zone_name}/{port}/{subpath:path}",
|
| methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"],
|
| )
|
| async def proxy_http(request: Request, zone_name: str, port: int, subpath: str = "", user: AuthUser = Depends(get_current_user)):
|
| try:
|
| _validate_proxy_access(zone_name, port)
|
| check_zone_owner(zone_name, user.sub, user.role)
|
| except ValueError:
|
| return Response(content="Port not mapped", status_code=404)
|
|
|
| target_url = f"http://127.0.0.1:{port}/{subpath}"
|
| if request.url.query:
|
| target_url += f"?{request.url.query}"
|
|
|
| headers = {}
|
| for key, value in request.headers.items():
|
| if key.lower() not in _HOP_HEADERS and key.lower() != "host":
|
| headers[key] = value
|
| headers["host"] = f"127.0.0.1:{port}"
|
| headers["x-forwarded-for"] = request.client.host if request.client else "127.0.0.1"
|
| headers["x-forwarded-proto"] = request.url.scheme
|
| headers["x-forwarded-prefix"] = f"/port/{zone_name}/{port}"
|
|
|
| body = await request.body()
|
| client = _get_client()
|
|
|
| try:
|
| resp = await client.request(method=request.method, url=target_url, headers=headers, content=body)
|
| except httpx.ConnectError:
|
| return Response(
|
| content=f"Cannot connect to port {port}. Make sure your server is running.",
|
| status_code=502,
|
| media_type="text/plain",
|
| )
|
| except httpx.TimeoutException:
|
| return Response(content=f"Timeout connecting to port {port}", status_code=504, media_type="text/plain")
|
|
|
| resp_headers = {}
|
| for key, value in resp.headers.items():
|
| if key.lower() not in _HOP_HEADERS and key.lower() != "content-encoding":
|
| resp_headers[key] = value
|
|
|
| return Response(content=resp.content, status_code=resp.status_code, headers=resp_headers)
|
|
|
|
|
|
|
|
|
| @router.websocket("/port/{zone_name}/{port}/ws/{subpath:path}")
|
| async def proxy_ws(websocket: WebSocket, zone_name: str, port: int, subpath: str = ""):
|
|
|
| user = get_ws_user(websocket)
|
| if not user:
|
| await websocket.close(code=4001, reason="Chưa đăng nhập")
|
| return
|
|
|
| try:
|
| _validate_proxy_access(zone_name, port)
|
| check_zone_owner(zone_name, user.sub, user.role)
|
| except ValueError:
|
| await websocket.close(code=4004, reason="Port not mapped")
|
| return
|
|
|
| await websocket.accept()
|
| target_url = f"ws://127.0.0.1:{port}/ws/{subpath}"
|
|
|
| import websockets as ws_lib
|
|
|
| try:
|
| async with ws_lib.connect(target_url) as backend_ws:
|
| async def client_to_backend():
|
| try:
|
| while True:
|
| msg = await websocket.receive()
|
| if msg.get("type") == "websocket.disconnect":
|
| break
|
| if "text" in msg:
|
| await backend_ws.send(msg["text"])
|
| elif "bytes" in msg:
|
| await backend_ws.send(msg["bytes"])
|
| except (WebSocketDisconnect, Exception):
|
| pass
|
|
|
| async def backend_to_client():
|
| try:
|
| async for message in backend_ws:
|
| if isinstance(message, str):
|
| await websocket.send_text(message)
|
| else:
|
| await websocket.send_bytes(message)
|
| except (WebSocketDisconnect, Exception):
|
| pass
|
|
|
| await asyncio.gather(client_to_backend(), backend_to_client())
|
| except Exception:
|
| try:
|
| await websocket.send_text(json.dumps({"error": f"Cannot connect WebSocket to port {port}"}))
|
| except Exception:
|
| pass
|
| finally:
|
| try:
|
| await websocket.close()
|
| except Exception:
|
| pass
|
|
|