Spaces:
Sleeping
Sleeping
| """ | |
| FastAPI application for CounterFeint — a multi-agent ad-fraud FraudArena. | |
| Creates the OpenEnv server via create_app() (Round-1-compatible `/ws`) | |
| and layers on Round-2 role-specific routes: | |
| /ws/fraudster Fraudster agent (proposes / modifies ads) | |
| /ws/investigator Investigator agent (reviews ads, renders verdicts) | |
| /ws/auditor Auditor agent (audits Investigator + Fraudster traces) | |
| Custom HTTP endpoints: /tasks, /baseline, /grader, /matches. | |
| Gradio is disabled (ENABLE_WEB_INTERFACE=false); the HTML UI lives at /investigate. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| import os | |
| from pathlib import Path | |
| from typing import Any, Dict | |
| from openenv.core.env_server import create_app | |
| try: | |
| from ..models import ( | |
| AdReviewAction, | |
| AdReviewObservation, | |
| AuditorAction, | |
| AuditorObservation, | |
| FraudsterAction, | |
| FraudsterObservation, | |
| ) | |
| except ImportError: | |
| from models import ( | |
| AdReviewAction, | |
| AdReviewObservation, | |
| AuditorAction, | |
| AuditorObservation, | |
| FraudsterAction, | |
| FraudsterObservation, | |
| ) | |
| from .environment import AdFraudEnvironment, get_last_grader_result | |
| from .arena_ui import register_arena_ui | |
| from .investigate_ui import register_investigate_ui | |
| from .multi_agent_ws import register_multi_agent_routes | |
| from .public_api import register_public_api | |
| from .referee import get_last_grader_result as get_last_multi_agent_grader_result | |
| logger = logging.getLogger(__name__) | |
| # Do not mount OpenEnv's Gradio stack (single FastAPI process on port 8000). | |
| os.environ.setdefault("ENABLE_WEB_INTERFACE", "false") | |
| app = create_app( | |
| AdFraudEnvironment, | |
| AdReviewAction, | |
| AdReviewObservation, | |
| env_name="counterfeint", | |
| ) | |
| register_investigate_ui(app) | |
| register_multi_agent_routes(app) | |
| register_public_api(app) | |
| register_arena_ui(app) | |
| # ------------------------------------------------------------------ | |
| # Custom endpoints required by the competition | |
| # ------------------------------------------------------------------ | |
| async def tasks() -> Dict[str, Any]: | |
| """Return the list of tasks, the action schema, and the R2 role catalog.""" | |
| try: | |
| from ..data.ad_generator import TASK_CONFIGS | |
| except ImportError: | |
| from data.ad_generator import TASK_CONFIGS | |
| task_list = [] | |
| for tid, cfg in TASK_CONFIGS.items(): | |
| task_list.append({ | |
| "id": cfg.task_id, | |
| "name": cfg.name, | |
| "difficulty": cfg.difficulty, | |
| "queue_size": cfg.queue_size, | |
| "action_budget": cfg.action_budget, | |
| "description": cfg.description, | |
| }) | |
| roles = { | |
| "fraudster": { | |
| "description": ( | |
| "Adversarial agent. Proposes and mutates ads into the shared " | |
| "queue during its turn, reacting to Investigator feedback." | |
| ), | |
| "ws": "/ws/fraudster", | |
| "action_schema": FraudsterAction.model_json_schema(), | |
| "observation_schema": FraudsterObservation.model_json_schema(), | |
| }, | |
| "investigator": { | |
| "description": ( | |
| "Review agent. Investigates ads via sub-tools and renders " | |
| "verdicts (approve/reject/escalate). Cannot see Fraudster " | |
| "intent — only the growing queue." | |
| ), | |
| "ws": "/ws/investigator", | |
| "action_schema": AdReviewAction.model_json_schema(), | |
| "observation_schema": AdReviewObservation.model_json_schema(), | |
| }, | |
| "auditor": { | |
| "description": ( | |
| "Third-agent arbiter. After the match ends, audits the " | |
| "Investigator's reasoning (Track A) and the Fraudster's " | |
| "ad plausibility (Track B). Emits flags + a final audit report." | |
| ), | |
| "ws": "/ws/auditor", | |
| "action_schema": AuditorAction.model_json_schema(), | |
| "observation_schema": AuditorObservation.model_json_schema(), | |
| }, | |
| } | |
| return { | |
| "tasks": task_list, | |
| "action_schema": AdReviewAction.model_json_schema(), | |
| "roles": roles, | |
| "multi_agent_endpoints": { | |
| "fraudster_ws": "/ws/fraudster", | |
| "investigator_ws": "/ws/investigator", | |
| "auditor_ws": "/ws/auditor", | |
| "matches": "/matches", | |
| "grader": "/grader", | |
| }, | |
| } | |
| async def baseline() -> Dict[str, Any]: | |
| """Return baseline scores, running live inference if credentials are available.""" | |
| baseline_path = Path(__file__).resolve().parent.parent / "baseline_scores.json" | |
| has_creds = all(os.getenv(v) for v in ("API_BASE_URL", "MODEL_NAME", "HF_TOKEN")) | |
| if has_creds: | |
| try: | |
| try: | |
| from ..inference import run_baseline | |
| except ImportError: | |
| from inference import run_baseline | |
| scores = run_baseline() | |
| with open(baseline_path, "w") as f: | |
| json.dump(scores, f, indent=2) | |
| return scores | |
| except Exception as e: | |
| logger.warning("Live baseline failed, falling back to cached: %s", e) | |
| if baseline_path.exists(): | |
| with open(baseline_path) as f: | |
| return json.load(f) | |
| return { | |
| "error": "No baseline scores available. Set API_BASE_URL, MODEL_NAME, and HF_TOKEN to run live inference.", | |
| "tasks": {}, | |
| } | |
| async def grader() -> Dict[str, Any]: | |
| """ | |
| Return grader score from the most recently completed episode. Prefers | |
| a multi-agent (Referee) result if one exists; falls back to the R1 | |
| Investigator result otherwise. | |
| """ | |
| multi_agent_result = get_last_multi_agent_grader_result() | |
| if multi_agent_result and multi_agent_result.get("grader_score") is not None: | |
| multi_agent_result.setdefault("mode", "multi_agent") | |
| return multi_agent_result | |
| result = get_last_grader_result() | |
| if not result: | |
| return { | |
| "error": "No completed episode. Run an episode via WebSocket or the /investigate UI.", | |
| "grader_score": None, | |
| } | |
| result.setdefault("mode", "single_agent") | |
| return result | |
| def main() -> None: | |
| """Entry point for direct execution.""" | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8000) | |
| if __name__ == "__main__": | |
| main() | |