tet / app.py
GamerC0der's picture
Update app.py
492474e verified
import http.server
import socketserver
import urllib.request
import urllib.parse
import re
import gzip
import time
import json
PORT = 8000
cache = {}
opener = urllib.request.build_opener()
opener.addheaders = [('User-Agent', 'Mozilla/5.0'), ('Accept-Encoding', 'gzip')]
class ProxyHandler(http.server.BaseHTTPRequestHandler):
protocol_version = 'HTTP/1.1'
def do_GET(self):
url_parts = urllib.parse.urlparse(self.path)
# Internet Speed Test Endpoint
if url_parts.path == '/api/internet_speed':
try:
test_url = "https://google.com"
start = time.perf_counter()
with opener.open(test_url, timeout=5) as res:
data = res.read()
duration = time.perf_counter() - start
size_bits = len(data) * 8
speed_mbps = (size_bits / duration) / 1_000_000
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({"download_speed_mbps": round(speed_mbps, 2)}).encode())
return
except Exception as e:
self.send_response(500)
self.end_headers()
self.wfile.write(json.dumps({"error": str(e)}).encode())
return
# Existing Proxy Logic
params = urllib.parse.parse_qs(url_parts.query)
target = params.get('proxy', [None])[0]
if not target:
self.send_response(200)
self.end_headers()
self.wfile.write(b"Usage: /site?proxy=URL or /api/internet_speed")
return
if target in cache:
content, ctype = cache[target]
self.send_response(200)
self.send_header('Content-Type', ctype)
self.send_header('Content-Length', len(content))
self.end_headers()
self.wfile.write(content)
return
try:
with opener.open(target, timeout=5) as res:
ctype = res.headers.get('Content-Type', '')
content = res.read()
if res.headers.get('Content-Encoding') == 'gzip':
content = gzip.decompress(content)
if 'text/html' in ctype:
text = content.decode('utf-8', errors='ignore')
def fix(m):
attr, url = m.group(1), m.group(2)
if url.startswith(('data:', 'javascript:', '#', 'mailto:')): return m.group(0)
joined = urllib.parse.urljoin(target, url)
return f'{attr}="/site?proxy={urllib.parse.quote(joined)}"'
text = re.sub(r'(href|src|action)=["\'](.*?)["\']', fix, text)
if '<head' in text.lower():
text = re.sub(r'(<head.*?>)', r'\1<base href="'+target+'">', text, count=1, flags=re.I)
content = text.encode('utf-8')
if len(cache) < 200: cache[target] = (content, ctype)
self.send_response(200)
self.send_header('Content-Type', ctype)
self.send_header('Content-Length', len(content))
self.end_headers()
self.wfile.write(content)
except Exception as e:
self.send_response(500)
self.end_headers()
self.wfile.write(str(e).encode())
class ThreadedServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
daemon_threads = True
block_size = 65536
if __name__ == "__main__":
with ThreadedServer(("", PORT), ProxyHandler) as httpd:
print(f"Server running on port {PORT}")
httpd.serve_forever()