Spaces:
Sleeping
Sleeping
| """Smoke test the command center and live OpenEnv control surface. | |
| This script is intentionally lightweight: | |
| - validates the public HTTP surfaces the command center depends on | |
| - validates the saved command center payload shape | |
| - opens a real persistent WebSocket session through BrowserEnv | |
| - runs reset -> noop to prove the live episode flow is working | |
| Use it against a local server or a deployed HF Space. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import sys | |
| import urllib.error | |
| import urllib.parse | |
| import urllib.request | |
| from pathlib import Path | |
| from typing import Any, Dict, Optional, Tuple | |
| ROOT = Path(__file__).resolve().parents[1] | |
| if str(ROOT) not in sys.path: | |
| sys.path.insert(0, str(ROOT)) | |
| from client import BrowserEnv | |
| from models import BrowserAction | |
| def _join_url(base_url: str, path: str) -> str: | |
| return urllib.parse.urljoin(base_url.rstrip("/") + "/", path.lstrip("/")) | |
| def _http_get(url: str, timeout: int = 60) -> tuple[int, bytes, Dict[str, str]]: | |
| request = urllib.request.Request( | |
| url=url, | |
| method="GET", | |
| headers={"Accept": "*/*"}, | |
| ) | |
| with urllib.request.urlopen(request, timeout=timeout) as response: | |
| body = response.read() | |
| headers = {k.lower(): v for k, v in response.headers.items()} | |
| return response.status, body, headers | |
| def _read_json(url: str, timeout: int = 60) -> Dict[str, Any]: | |
| status, body, _ = _http_get(url, timeout=timeout) | |
| if status != 200: | |
| raise RuntimeError(f"GET {url} returned status {status}") | |
| return json.loads(body.decode("utf-8")) | |
| def _assert(condition: bool, message: str) -> None: | |
| if not condition: | |
| raise RuntimeError(message) | |
| def _unwrap_client_result(result: Any) -> Tuple[Any, Dict[str, Any]]: | |
| """Normalize OpenEnv sync results across wrapper variants. | |
| BrowserEnv(...).sync() currently returns StepResult objects for both reset and | |
| step. Some direct clients may return the observation object itself. This keeps | |
| the smoke check compatible with both shapes. | |
| """ | |
| observation = getattr(result, "observation", result) | |
| step_meta = { | |
| "reward": getattr(result, "reward", getattr(observation, "reward", None)), | |
| "done": getattr(result, "done", getattr(observation, "done", False)), | |
| } | |
| return observation, step_meta | |
| def _validate_health(base_url: str) -> Dict[str, Any]: | |
| payload = _read_json(_join_url(base_url, "/health")) | |
| _assert(payload.get("status") in {"healthy", "ok"}, f"/health returned unexpected status payload: {payload}") | |
| return payload | |
| def _validate_schema(base_url: str) -> Dict[str, Any]: | |
| payload = _read_json(_join_url(base_url, "/schema")) | |
| for key in ("action", "observation", "state"): | |
| _assert(key in payload, f"/schema missing top-level key: {key}") | |
| return payload | |
| def _validate_command_center_page(base_url: str) -> None: | |
| status, body, headers = _http_get(_join_url(base_url, "/command-center")) | |
| _assert(status == 200, f"/command-center returned status {status}") | |
| content_type = headers.get("content-type", "") | |
| _assert("text/html" in content_type, f"/command-center returned unexpected content-type: {content_type}") | |
| html = body.decode("utf-8", errors="replace") | |
| _assert("Command Center" in html, "command center HTML does not include expected title text") | |
| _assert("pipelineSteps" in html, "command center HTML is missing pipeline container markup") | |
| def _validate_root_page(base_url: str) -> None: | |
| status, body, headers = _http_get(_join_url(base_url, "/")) | |
| _assert(status == 200, f"/ returned status {status}") | |
| content_type = headers.get("content-type", "") | |
| _assert("text/html" in content_type, f"/ returned unexpected content-type: {content_type}") | |
| html = body.decode("utf-8", errors="replace") | |
| _assert("Command Center" in html, "root page does not land on the command center UI") | |
| def _validate_logo(base_url: str) -> None: | |
| status, body, headers = _http_get(_join_url(base_url, "/command-center/logo.png")) | |
| _assert(status == 200, f"/command-center/logo.png returned status {status}") | |
| content_type = headers.get("content-type", "") | |
| _assert("image/" in content_type, f"logo returned unexpected content-type: {content_type}") | |
| _assert(len(body) > 0, "logo endpoint returned an empty body") | |
| def _validate_command_center_data(base_url: str) -> Dict[str, Any]: | |
| payload = _read_json(_join_url(base_url, "/command-center/data")) | |
| _assert("runtime" in payload, "/command-center/data missing runtime") | |
| _assert("task_presets" in payload, "/command-center/data missing task_presets") | |
| _assert(isinstance(payload["task_presets"], list), "task_presets is not a list") | |
| _assert(len(payload["task_presets"]) >= 1, "task_presets is empty") | |
| return payload | |
| def _run_live_ws_check(base_url: str, variant_id: Optional[str], seed: Optional[int]) -> Dict[str, Any]: | |
| env = BrowserEnv(base_url=base_url).sync() | |
| try: | |
| reset_kwargs: Dict[str, Any] = {} | |
| if variant_id: | |
| reset_kwargs["variant_id"] = variant_id | |
| if seed is not None: | |
| reset_kwargs["seed"] = seed | |
| reset_result = env.reset(**reset_kwargs) | |
| obs, _ = _unwrap_client_result(reset_result) | |
| _assert(bool(obs.episode_id), "reset returned observation without episode_id") | |
| _assert(bool(obs.task_id), "reset returned observation without task_id") | |
| step_result = env.step(BrowserAction(action_type="noop", confidence=0.0, reasoning="command-center smoke noop")) | |
| step_obs, step_meta = _unwrap_client_result(step_result) | |
| return { | |
| "reset": { | |
| "episode_id": obs.episode_id, | |
| "task_id": obs.task_id, | |
| "difficulty": obs.difficulty, | |
| "instruction": obs.instruction, | |
| "elements": len(obs.elements), | |
| }, | |
| "step": { | |
| "episode_id": step_obs.episode_id, | |
| "step_index": step_obs.step_index, | |
| "reward": step_meta["reward"], | |
| "done": step_meta["done"], | |
| "success": step_obs.success, | |
| "failure_reason": step_obs.failure_reason, | |
| }, | |
| } | |
| finally: | |
| env.close() | |
| def main() -> None: | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--base-url", default="http://127.0.0.1:8000", help="Server base URL") | |
| parser.add_argument("--variant-id", default="", help="Optional curriculum variant id for the reset check") | |
| parser.add_argument("--seed", type=int, default=None, help="Optional seed override for the reset check") | |
| args = parser.parse_args() | |
| base_url = args.base_url.rstrip("/") | |
| summary = { | |
| "base_url": base_url, | |
| "health": _validate_health(base_url), | |
| "schema": {}, | |
| "command_center_data": {}, | |
| "live_ws": {}, | |
| } | |
| summary["schema"] = { | |
| "top_level_keys": list(_validate_schema(base_url).keys()), | |
| } | |
| _validate_root_page(base_url) | |
| _validate_command_center_page(base_url) | |
| _validate_logo(base_url) | |
| command_center_data = _validate_command_center_data(base_url) | |
| summary["command_center_data"] = { | |
| "preset_count": len(command_center_data.get("task_presets", [])), | |
| "artifact_available": bool((command_center_data.get("artifact") or {}).get("available")), | |
| "runtime": command_center_data.get("runtime", {}), | |
| } | |
| summary["live_ws"] = _run_live_ws_check( | |
| base_url=base_url, | |
| variant_id=args.variant_id or None, | |
| seed=args.seed, | |
| ) | |
| print(json.dumps(summary, indent=2, default=str)) | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except urllib.error.HTTPError as exc: | |
| detail = exc.read().decode("utf-8", errors="replace") | |
| raise SystemExit(f"HTTP error {exc.code} for {exc.url}: {detail}") from exc | |
| except Exception as exc: | |
| raise SystemExit(str(exc)) from exc | |