Spaces:
Runtime error
Runtime error
File size: 3,773 Bytes
3d5da4c 7b39661 492474e 3d5da4c 492474e a389876 7b39661 3d5da4c ad8f96a 7b39661 3d5da4c 492474e ad8f96a 3d5da4c 492474e 3d5da4c a389876 7b39661 a389876 3d5da4c 492474e ad8f96a 7b39661 ad8f96a 7b39661 492474e 7b39661 ad8f96a 7b39661 ad8f96a 76ff673 492474e a389876 3d5da4c ad8f96a 7b39661 3d5da4c 76ff673 3d5da4c ad8f96a 492474e ad8f96a 492474e ad8f96a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 | import http.server
import socketserver
import urllib.request
import urllib.parse
import re
import gzip
import time
import json
PORT = 8000
cache = {}
opener = urllib.request.build_opener()
opener.addheaders = [('User-Agent', 'Mozilla/5.0'), ('Accept-Encoding', 'gzip')]
class ProxyHandler(http.server.BaseHTTPRequestHandler):
protocol_version = 'HTTP/1.1'
def do_GET(self):
url_parts = urllib.parse.urlparse(self.path)
# Internet Speed Test Endpoint
if url_parts.path == '/api/internet_speed':
try:
test_url = "https://google.com"
start = time.perf_counter()
with opener.open(test_url, timeout=5) as res:
data = res.read()
duration = time.perf_counter() - start
size_bits = len(data) * 8
speed_mbps = (size_bits / duration) / 1_000_000
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({"download_speed_mbps": round(speed_mbps, 2)}).encode())
return
except Exception as e:
self.send_response(500)
self.end_headers()
self.wfile.write(json.dumps({"error": str(e)}).encode())
return
# Existing Proxy Logic
params = urllib.parse.parse_qs(url_parts.query)
target = params.get('proxy', [None])[0]
if not target:
self.send_response(200)
self.end_headers()
self.wfile.write(b"Usage: /site?proxy=URL or /api/internet_speed")
return
if target in cache:
content, ctype = cache[target]
self.send_response(200)
self.send_header('Content-Type', ctype)
self.send_header('Content-Length', len(content))
self.end_headers()
self.wfile.write(content)
return
try:
with opener.open(target, timeout=5) as res:
ctype = res.headers.get('Content-Type', '')
content = res.read()
if res.headers.get('Content-Encoding') == 'gzip':
content = gzip.decompress(content)
if 'text/html' in ctype:
text = content.decode('utf-8', errors='ignore')
def fix(m):
attr, url = m.group(1), m.group(2)
if url.startswith(('data:', 'javascript:', '#', 'mailto:')): return m.group(0)
joined = urllib.parse.urljoin(target, url)
return f'{attr}="/site?proxy={urllib.parse.quote(joined)}"'
text = re.sub(r'(href|src|action)=["\'](.*?)["\']', fix, text)
if '<head' in text.lower():
text = re.sub(r'(<head.*?>)', r'\1<base href="'+target+'">', text, count=1, flags=re.I)
content = text.encode('utf-8')
if len(cache) < 200: cache[target] = (content, ctype)
self.send_response(200)
self.send_header('Content-Type', ctype)
self.send_header('Content-Length', len(content))
self.end_headers()
self.wfile.write(content)
except Exception as e:
self.send_response(500)
self.end_headers()
self.wfile.write(str(e).encode())
class ThreadedServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
daemon_threads = True
block_size = 65536
if __name__ == "__main__":
with ThreadedServer(("", PORT), ProxyHandler) as httpd:
print(f"Server running on port {PORT}")
httpd.serve_forever()
|