Spaces:
Runtime error
Runtime error
| import http.server | |
| import socketserver | |
| import urllib.request | |
| import urllib.parse | |
| import re | |
| import gzip | |
| import time | |
| import json | |
| PORT = 8000 | |
| cache = {} | |
| opener = urllib.request.build_opener() | |
| opener.addheaders = [('User-Agent', 'Mozilla/5.0'), ('Accept-Encoding', 'gzip')] | |
| class ProxyHandler(http.server.BaseHTTPRequestHandler): | |
| protocol_version = 'HTTP/1.1' | |
| def do_GET(self): | |
| url_parts = urllib.parse.urlparse(self.path) | |
| # Internet Speed Test Endpoint | |
| if url_parts.path == '/api/internet_speed': | |
| try: | |
| test_url = "https://google.com" | |
| start = time.perf_counter() | |
| with opener.open(test_url, timeout=5) as res: | |
| data = res.read() | |
| duration = time.perf_counter() - start | |
| size_bits = len(data) * 8 | |
| speed_mbps = (size_bits / duration) / 1_000_000 | |
| self.send_response(200) | |
| self.send_header('Content-Type', 'application/json') | |
| self.end_headers() | |
| self.wfile.write(json.dumps({"download_speed_mbps": round(speed_mbps, 2)}).encode()) | |
| return | |
| except Exception as e: | |
| self.send_response(500) | |
| self.end_headers() | |
| self.wfile.write(json.dumps({"error": str(e)}).encode()) | |
| return | |
| # Existing Proxy Logic | |
| params = urllib.parse.parse_qs(url_parts.query) | |
| target = params.get('proxy', [None])[0] | |
| if not target: | |
| self.send_response(200) | |
| self.end_headers() | |
| self.wfile.write(b"Usage: /site?proxy=URL or /api/internet_speed") | |
| return | |
| if target in cache: | |
| content, ctype = cache[target] | |
| self.send_response(200) | |
| self.send_header('Content-Type', ctype) | |
| self.send_header('Content-Length', len(content)) | |
| self.end_headers() | |
| self.wfile.write(content) | |
| return | |
| try: | |
| with opener.open(target, timeout=5) as res: | |
| ctype = res.headers.get('Content-Type', '') | |
| content = res.read() | |
| if res.headers.get('Content-Encoding') == 'gzip': | |
| content = gzip.decompress(content) | |
| if 'text/html' in ctype: | |
| text = content.decode('utf-8', errors='ignore') | |
| def fix(m): | |
| attr, url = m.group(1), m.group(2) | |
| if url.startswith(('data:', 'javascript:', '#', 'mailto:')): return m.group(0) | |
| joined = urllib.parse.urljoin(target, url) | |
| return f'{attr}="/site?proxy={urllib.parse.quote(joined)}"' | |
| text = re.sub(r'(href|src|action)=["\'](.*?)["\']', fix, text) | |
| if '<head' in text.lower(): | |
| text = re.sub(r'(<head.*?>)', r'\1<base href="'+target+'">', text, count=1, flags=re.I) | |
| content = text.encode('utf-8') | |
| if len(cache) < 200: cache[target] = (content, ctype) | |
| self.send_response(200) | |
| self.send_header('Content-Type', ctype) | |
| self.send_header('Content-Length', len(content)) | |
| self.end_headers() | |
| self.wfile.write(content) | |
| except Exception as e: | |
| self.send_response(500) | |
| self.end_headers() | |
| self.wfile.write(str(e).encode()) | |
| class ThreadedServer(socketserver.ThreadingMixIn, socketserver.TCPServer): | |
| daemon_threads = True | |
| block_size = 65536 | |
| if __name__ == "__main__": | |
| with ThreadedServer(("", PORT), ProxyHandler) as httpd: | |
| print(f"Server running on port {PORT}") | |
| httpd.serve_forever() | |