Spaces:
Sleeping
Sleeping
| """ | |
| Core Ad Fraud Investigation Environment (Investigator role). | |
| Implements the OpenEnv Environment interface for the Investigator agent: | |
| reviewing a queue of ads, investigating them, and rendering verdicts under | |
| a budget constraint. | |
| In Round 1 this environment is used standalone (`AdFraudEnvironment` alias | |
| preserves backwards compatibility). In Round 2 it is driven by the | |
| `RefereeEnvironment`, which pre-generates episodes and supplies a shared | |
| `InvestigationToolRegistry` so Fraudster-proposed ads are reachable through | |
| the same investigation code path. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| from typing import Any, Dict, List, Optional | |
| from uuid import uuid4 | |
| from openenv.core.env_server.interfaces import Environment | |
| try: | |
| from ..data.ad_generator import ( | |
| TASK_CONFIGS, | |
| Ad, | |
| GeneratedEpisode, | |
| generate_episode, | |
| ) | |
| from ..data.tool_registry import InvestigationToolRegistry | |
| from ..models import AdFraudState, AdReviewAction, AdReviewObservation | |
| from ..graders.base_grader import EpisodeRecord, LinkResult, VerdictResult, grade_episode | |
| from .evidence_ledger import build_evidence_ledger | |
| except ImportError: | |
| from data.ad_generator import ( | |
| TASK_CONFIGS, | |
| Ad, | |
| GeneratedEpisode, | |
| generate_episode, | |
| ) | |
| from data.tool_registry import InvestigationToolRegistry | |
| from models import AdFraudState, AdReviewAction, AdReviewObservation | |
| from graders.base_grader import EpisodeRecord, LinkResult, VerdictResult, grade_episode | |
| from server.evidence_ledger import build_evidence_ledger | |
| logger = logging.getLogger(__name__) | |
| # Module-level store so the /grader endpoint can read the last score. | |
| _last_grader_result: Dict[str, Any] = {} | |
| def get_last_grader_result() -> Dict[str, Any]: | |
| return dict(_last_grader_result) | |
| class InvestigatorEnvironment( | |
| Environment[AdReviewAction, AdReviewObservation, AdFraudState] | |
| ): | |
| """ | |
| Ad fraud investigation environment (Investigator role). | |
| Each episode is a review session: the agent processes a queue of N ads | |
| within a limited action budget, choosing what to investigate and when | |
| to render verdicts. Unreviewed ads auto-approve at episode end. | |
| `reset()` accepts optional `episode` and `registry` kwargs so the | |
| Referee (or a test harness) can inject a pre-built `GeneratedEpisode` | |
| plus a shared `InvestigationToolRegistry`. Without them, the | |
| environment generates its own synthetic episode and a fresh registry | |
| (the Round 1 behaviour). | |
| """ | |
| SUPPORTS_CONCURRENT_SESSIONS = True | |
| def __init__(self) -> None: | |
| super().__init__() | |
| self._state = AdFraudState(episode_id=str(uuid4()), step_count=0) | |
| self._episode: Optional[GeneratedEpisode] = None | |
| self._registry: Optional[InvestigationToolRegistry] = None | |
| self._verdicts: Dict[str, Dict[str, Any]] = {} | |
| self._links: List[Dict[str, Any]] = [] | |
| self._investigations: Dict[str, List[str]] = {} | |
| # Total `investigate` attempts per ad — INCLUDING ones that the | |
| # env rejects (duplicate target, hit cap, etc.). | |
| self._investigation_attempts: Dict[str, int] = {} | |
| self._cumulative_reward: float = 0.0 | |
| self._done = False | |
| self._last_feedback = "" | |
| self._focused_ad_id: Optional[str] = None | |
| self._queue_may_grow: bool = False | |
| # ------------------------------------------------------------------ | |
| # OpenEnv interface | |
| # ------------------------------------------------------------------ | |
| def reset( | |
| self, | |
| seed: int | None = None, | |
| episode_id: str | None = None, | |
| **kwargs: Any, | |
| ) -> AdReviewObservation: | |
| task_id = kwargs.get("task_id", "task_1") | |
| if task_id not in TASK_CONFIGS: | |
| task_id = "task_1" | |
| injected_episode: Optional[GeneratedEpisode] = kwargs.get("episode") | |
| injected_registry: Optional[InvestigationToolRegistry] = kwargs.get("registry") | |
| self._queue_may_grow = bool(kwargs.get("queue_may_grow", False)) | |
| if injected_episode is not None: | |
| self._episode = injected_episode | |
| else: | |
| effective_seed = ( | |
| seed if seed is not None else hash(uuid4()) & 0xFFFFFFFF | |
| ) | |
| self._episode = generate_episode(effective_seed, task_id) | |
| if injected_registry is not None: | |
| self._registry = injected_registry | |
| else: | |
| self._registry = InvestigationToolRegistry.from_episode(self._episode) | |
| config = self._episode.task_config | |
| self._state = AdFraudState( | |
| episode_id=episode_id or str(uuid4()), | |
| step_count=0, | |
| task_id=task_id, | |
| total_ads=config.queue_size, | |
| reviewed_count=0, | |
| remaining_budget=config.action_budget, | |
| verdicts={}, | |
| grader_score=None, | |
| ) | |
| self._verdicts = {} | |
| self._links = [] | |
| self._investigations = {} | |
| self._investigation_attempts = {} | |
| self._cumulative_reward = 0.0 | |
| self._done = False | |
| self._last_feedback = "Episode started. Review the ad queue and begin your investigation." | |
| self._focused_ad_id = self._episode.ads[0].ad_id if self._episode.ads else None | |
| return self._build_observation(reward=0.0, done=False) | |
| def step( | |
| self, | |
| action: AdReviewAction, | |
| timeout_s: float | None = None, | |
| **kwargs: Any, | |
| ) -> AdReviewObservation: | |
| if self._done: | |
| return self._build_observation( | |
| reward=0.0, done=True, | |
| feedback_override="Episode is already complete. Call reset() to start a new episode.", | |
| ) | |
| if self._episode is None: | |
| return self._build_observation( | |
| reward=0.0, done=False, | |
| feedback_override="Environment not initialized. Call reset() first.", | |
| ) | |
| self._state.step_count += 1 | |
| ad_ids = {a.ad_id for a in self._episode.ads} | |
| if action.ad_id not in ad_ids: | |
| self._last_feedback = f"Invalid ad_id '{action.ad_id}'. Valid IDs: {', '.join(sorted(ad_ids))}" | |
| return self._build_observation(reward=-0.05, done=False) | |
| if action.action_type == "investigate": | |
| reward = self._handle_investigate(action) | |
| elif action.action_type == "verdict": | |
| reward = self._handle_verdict(action) | |
| elif action.action_type == "link_accounts": | |
| reward = self._handle_link(action) | |
| else: | |
| self._last_feedback = f"Unknown action_type '{action.action_type}'." | |
| reward = -0.05 | |
| self._cumulative_reward += reward | |
| done = self._check_done() | |
| if done and not self._done: | |
| end_reward = self._handle_episode_end() | |
| reward += end_reward | |
| self._cumulative_reward += end_reward | |
| self._done = True | |
| self._state.remaining_budget = max(0, self._state.remaining_budget) | |
| self._state.reviewed_count = len(self._verdicts) | |
| self._state.verdicts = { | |
| ad_id: v.get("verdict", "") for ad_id, v in self._verdicts.items() | |
| } | |
| return self._build_observation(reward=reward, done=self._done) | |
| def state(self) -> AdFraudState: | |
| return self._state | |
| # ------------------------------------------------------------------ | |
| # Action handlers | |
| # ------------------------------------------------------------------ | |
| # Hard cap on TOTAL investigate attempts per ad — including the ones | |
| # the env rejects (duplicate target, ad already verdicted, etc.). | |
| _MAX_INVESTIGATION_ATTEMPTS_PER_AD = 5 | |
| _MAX_INVESTIGATIONS_PER_AD = 3 | |
| def _rotate_focus_off(self, current_ad_id: str) -> Optional[str]: | |
| """Move focus to the next pending ad that is NOT ``current_ad_id``. | |
| Returns the new focus ad_id, or ``None`` if there is no other | |
| pending ad. Used by ``_handle_investigate`` when the model is | |
| stuck looping on a single ad — rotating focus changes the | |
| ``Ad in Focus`` block in the next observation and breaks the | |
| prompt-anchoring effect that's keeping the 1.5B Investigator | |
| glued to one ad. | |
| """ | |
| if self._episode is None: | |
| return None | |
| pending_others = [ | |
| a.ad_id for a in self._episode.ads | |
| if a.ad_id not in self._verdicts and a.ad_id != current_ad_id | |
| ] | |
| if pending_others: | |
| self._focused_ad_id = pending_others[0] | |
| return pending_others[0] | |
| return None | |
| def _handle_investigate(self, action: AdReviewAction) -> float: | |
| ad_id = action.ad_id | |
| # Count EVERY attempt, including the ones we're about to reject — | |
| # this is the primary loop-breaking signal. | |
| attempts = self._investigation_attempts.get(ad_id, 0) + 1 | |
| self._investigation_attempts[ad_id] = attempts | |
| if self._state.remaining_budget <= 0: | |
| self._last_feedback = "No budget remaining. You must render verdicts on remaining ads or end the episode." | |
| return -0.02 | |
| if action.investigation_target is None: | |
| self._last_feedback = "investigation_target is required for action_type='investigate'." | |
| return -0.05 | |
| if ad_id in self._verdicts: | |
| new_focus = self._rotate_focus_off(ad_id) | |
| self._last_feedback = ( | |
| f"You already rendered a verdict on {ad_id}. " | |
| + ( | |
| f"Focus moved to {new_focus} — investigate or verdict that ad next." | |
| if new_focus else | |
| "All other ads are also verdicted; emit verdict actions for any remaining pending ads." | |
| ) | |
| ) | |
| return -0.02 | |
| # Hard cap on TOTAL attempts (catches the duplicate-target spam | |
| # loop). Fires BEFORE the duplicate-target check so we hit the | |
| # cap first when the model is just spamming the same target. | |
| if attempts > self._MAX_INVESTIGATION_ATTEMPTS_PER_AD: | |
| new_focus = self._rotate_focus_off(ad_id) | |
| done_targets = self._investigations.get(ad_id, []) | |
| self._last_feedback = ( | |
| f"REJECTED: {ad_id} has been probed {attempts} times — that's the cap. " | |
| f"STOP TRYING TO INVESTIGATE {ad_id}. " | |
| f"Issue a verdict on {ad_id} NOW (action_type='verdict', verdict in " | |
| "{approve, reject, escalate}). " | |
| + ( | |
| f"Successful pulls on {ad_id} so far: {', '.join(done_targets) or 'none'}. " | |
| if done_targets else "" | |
| ) | |
| + ( | |
| f"Focus moved to {new_focus}; investigate that ad if you'd rather " | |
| "build evidence on a fresh ad first." | |
| if new_focus else | |
| "No other pending ads — verdict the remaining pending ads." | |
| ) | |
| ) | |
| return -0.10 | |
| prev = self._investigations.setdefault(ad_id, []) | |
| target = action.investigation_target | |
| if target in prev: | |
| # Duplicate-target case. Don't spend budget. Surface the | |
| # un-pulled targets so the model has a concrete next pick. | |
| allowed = { | |
| "advertiser_history", "landing_page", "payment_method", | |
| "targeting_overlap", "campaign_structure", "policy_classifier", | |
| } | |
| remaining = sorted(allowed - set(prev)) | |
| self._last_feedback = ( | |
| f"You already investigated '{target}' for {ad_id} " | |
| f"(attempt {attempts}/{self._MAX_INVESTIGATION_ATTEMPTS_PER_AD}). " | |
| f"Either issue a verdict on {ad_id} now, OR pick a fresh target from " | |
| f"[{', '.join(remaining) if remaining else '(none left — verdict it)'}], " | |
| "OR investigate a different ad_id from the pending queue." | |
| ) | |
| return -0.02 | |
| # Sub-cap on successful pulls (tighter than the attempt cap). | |
| # Forces verdict after 3 successful investigations. | |
| if len(prev) >= self._MAX_INVESTIGATIONS_PER_AD: | |
| new_focus = self._rotate_focus_off(ad_id) | |
| self._last_feedback = ( | |
| f"{ad_id} has reached the {self._MAX_INVESTIGATIONS_PER_AD}-investigation cap " | |
| f"(already pulled: {', '.join(prev)}). Issue a verdict on {ad_id} now " | |
| "(action_type='verdict' with verdict in {approve, reject, escalate})." | |
| + (f" Focus moved to {new_focus}." if new_focus else "") | |
| ) | |
| return -0.05 | |
| self._state.remaining_budget -= 1 | |
| prev.append(target) | |
| # Note: focus is updated to the ad we just investigated only when | |
| # the Investigator hasn't already accumulated >=2 investigations | |
| # on it. Past 2 investigations on the same ad we keep focus on | |
| # the EXISTING focused ad (which may be a different one) so the | |
| # prompt doesn't re-anchor the model to an ad it should be | |
| # rendering a verdict on, NOT investigating further. This nudges | |
| # the policy toward "investigate twice → verdict → move on" | |
| # instead of looping investigations on a single ad. | |
| if len(prev) <= 2 or self._focused_ad_id is None: | |
| self._focused_ad_id = ad_id | |
| if self._registry is not None: | |
| findings = self._registry.lookup(ad_id, target) | |
| else: | |
| findings = self._episode.investigation_data.get(ad_id, {}).get( | |
| target, "No data available for this investigation type." | |
| ) | |
| # Escalating per-investigate penalty so a runaway investigate | |
| # loop on a single ad gets progressively more expensive — pushes | |
| # the policy toward issuing a verdict once it has enough signal, | |
| # rather than burning steps re-checking the same ad. | |
| n_targets = len(prev) | |
| if n_targets <= 2: | |
| penalty = -0.02 | |
| elif n_targets == 3: | |
| penalty = -0.05 | |
| else: | |
| penalty = -0.10 | |
| feedback_lines = [ | |
| f"Investigation complete: {target} for {ad_id}.", | |
| f"--- Findings ---\n{findings}", | |
| ] | |
| if n_targets >= 2: | |
| feedback_lines.append( | |
| f"Note: you have now investigated {ad_id} {n_targets}x — " | |
| "issue a verdict on it instead of more investigations." | |
| ) | |
| self._last_feedback = "\n".join(feedback_lines) | |
| return penalty | |
| def _handle_verdict(self, action: AdReviewAction) -> float: | |
| ad_id = action.ad_id | |
| if ad_id in self._verdicts: | |
| self._last_feedback = f"You already rendered a verdict on {ad_id}." | |
| return -0.02 | |
| if action.verdict is None: | |
| self._last_feedback = "verdict field is required for action_type='verdict'." | |
| return -0.05 | |
| confidence = action.confidence if action.confidence is not None else 0.5 | |
| ad = self._get_ad(ad_id) | |
| ground_truth = ad.ground_truth_label if ad else "legit" | |
| severity = ad.severity if ad else 0.0 | |
| self._verdicts[ad_id] = { | |
| "verdict": action.verdict, | |
| "confidence": confidence, | |
| "ground_truth": ground_truth, | |
| } | |
| reward = self._compute_verdict_reward(action.verdict, ground_truth, severity, confidence) | |
| pending = [a.ad_id for a in self._episode.ads if a.ad_id not in self._verdicts] | |
| self._last_feedback = ( | |
| f"Verdict recorded for {ad_id}: {action.verdict} " | |
| f"(confidence: {confidence:.2f}). " | |
| f"{len(pending)} ad(s) remaining in queue." | |
| ) | |
| if pending: | |
| self._focused_ad_id = pending[0] | |
| return reward | |
| def _handle_link(self, action: AdReviewAction) -> float: | |
| if action.linked_ad_id is None: | |
| self._last_feedback = "linked_ad_id is required for action_type='link_accounts'." | |
| return -0.05 | |
| ad_ids = {a.ad_id for a in self._episode.ads} | |
| if action.linked_ad_id not in ad_ids: | |
| self._last_feedback = f"Invalid linked_ad_id '{action.linked_ad_id}'." | |
| return -0.05 | |
| if action.ad_id == action.linked_ad_id: | |
| self._last_feedback = "Cannot link an ad to itself." | |
| return -0.05 | |
| link_key = tuple(sorted([action.ad_id, action.linked_ad_id])) | |
| existing = {tuple(sorted([l["ad_id_1"], l["ad_id_2"]])) for l in self._links} | |
| if link_key in existing: | |
| self._last_feedback = f"Link between {action.ad_id} and {action.linked_ad_id} already recorded." | |
| return -0.02 | |
| is_correct = self._check_link_correct(action.ad_id, action.linked_ad_id) | |
| self._links.append({ | |
| "ad_id_1": action.ad_id, | |
| "ad_id_2": action.linked_ad_id, | |
| "reason": action.link_reason or "", | |
| "correct": is_correct, | |
| }) | |
| self._last_feedback = ( | |
| f"Network link recorded: {action.ad_id} <-> {action.linked_ad_id}. " | |
| f"Reason: {action.link_reason or 'not specified'}." | |
| ) | |
| return 0.4 if is_correct else -0.25 | |
| # ------------------------------------------------------------------ | |
| # Reward computation | |
| # ------------------------------------------------------------------ | |
| def _compute_verdict_reward( | |
| self, verdict: str, ground_truth: str, severity: float, confidence: float | |
| ) -> float: | |
| if verdict == "reject" and ground_truth == "fraud": | |
| return 0.3 + 0.1 * severity | |
| elif verdict == "approve" and ground_truth == "legit": | |
| return 0.1 | |
| elif verdict == "escalate" and ground_truth == "escalate": | |
| return 0.15 | |
| elif verdict == "reject" and ground_truth == "legit": | |
| return -0.35 | |
| elif verdict == "approve" and ground_truth == "fraud": | |
| return -0.5 | |
| elif verdict == "escalate": | |
| return -0.05 | |
| elif verdict == "approve" and ground_truth == "escalate": | |
| return -0.15 | |
| elif verdict == "reject" and ground_truth == "escalate": | |
| return -0.1 | |
| else: | |
| return -0.05 | |
| def _handle_episode_end(self) -> float: | |
| """Apply end-of-episode adjustments for unreviewed ads, then delegate to graders.""" | |
| unreviewed_fraud = 0 | |
| for ad in self._episode.ads: | |
| if ad.ad_id not in self._verdicts: | |
| self._verdicts[ad.ad_id] = { | |
| "verdict": "approve", | |
| "confidence": 0.0, | |
| "ground_truth": ad.ground_truth_label, | |
| "auto_approved": True, | |
| } | |
| if ad.ground_truth_label == "fraud": | |
| unreviewed_fraud += 1 | |
| record = self._build_episode_record() | |
| grader_score = grade_episode(record) | |
| self._state.grader_score = grader_score | |
| reviewed_count = len([v for v in self._verdicts.values() if not v.get("auto_approved")]) | |
| total_ads = len(self._episode.ads) | |
| total_correct = sum( | |
| 1 for v in self._verdicts.values() | |
| if not v.get("auto_approved") | |
| and ( | |
| (v["verdict"] == "reject" and v["ground_truth"] == "fraud") | |
| or (v["verdict"] == "approve" and v["ground_truth"] == "legit") | |
| or (v["verdict"] == "escalate" and v["ground_truth"] == "escalate") | |
| ) | |
| ) | |
| false_positives = sum( | |
| 1 for v in self._verdicts.values() | |
| if not v.get("auto_approved") | |
| and v["verdict"] == "reject" and v["ground_truth"] == "legit" | |
| ) | |
| false_negatives = sum( | |
| 1 for v in self._verdicts.values() | |
| if not v.get("auto_approved") | |
| and v["verdict"] == "approve" and v["ground_truth"] == "fraud" | |
| ) | |
| correct_links = sum(1 for l in self._links if l.get("correct")) | |
| incorrect_links = sum(1 for l in self._links if not l.get("correct")) | |
| global _last_grader_result | |
| _last_grader_result = { | |
| "task_id": self._state.task_id, | |
| "grader_score": grader_score, | |
| "episode_id": self._state.episode_id, | |
| "total_steps": self._state.step_count, | |
| "verdicts_rendered": reviewed_count, | |
| "correct_decisions": total_correct, | |
| "false_positives": false_positives, | |
| "false_negatives": false_negatives, | |
| "auto_approved": total_ads - reviewed_count, | |
| "unreviewed_fraud": unreviewed_fraud, | |
| "network_links_correct": correct_links, | |
| "network_links_incorrect": incorrect_links, | |
| } | |
| feedback_lines = [ | |
| f"Episode complete. Grader score: {grader_score:.3f}/1.000", | |
| f"Verdicts rendered: {reviewed_count}/{total_ads}", | |
| f"Correct decisions: {total_correct}/{reviewed_count}", | |
| f"False positives (legit rejected): {false_positives}", | |
| f"False negatives (fraud approved): {false_negatives}", | |
| f"Unreviewed ads auto-approved: {unreviewed_fraud}", | |
| ] | |
| if self._links: | |
| feedback_lines.append( | |
| f"Network links: {correct_links} correct, {incorrect_links} incorrect" | |
| ) | |
| self._last_feedback = "\n".join(feedback_lines) | |
| return 0.0 | |
| def _build_episode_record(self) -> EpisodeRecord: | |
| """Convert internal state into an EpisodeRecord for the grader.""" | |
| verdict_results = [] | |
| for ad in self._episode.ads: | |
| v = self._verdicts.get(ad.ad_id) | |
| if v: | |
| verdict_results.append(VerdictResult( | |
| ad_id=ad.ad_id, | |
| verdict=v["verdict"], | |
| confidence=v.get("confidence", 0.5), | |
| ground_truth=v["ground_truth"], | |
| auto_approved=v.get("auto_approved", False), | |
| )) | |
| link_results = [ | |
| LinkResult(ad_id_1=l["ad_id_1"], ad_id_2=l["ad_id_2"], correct=l["correct"]) | |
| for l in self._links | |
| ] | |
| ads_metadata = [ | |
| {"ad_id": ad.ad_id, "ground_truth": ad.ground_truth_label, "severity": ad.severity} | |
| for ad in self._episode.ads | |
| ] | |
| return EpisodeRecord( | |
| task_id=self._state.task_id, | |
| total_steps=self._state.step_count, | |
| action_budget=self._episode.task_config.action_budget, | |
| verdicts=verdict_results, | |
| links=link_results, | |
| ads_metadata=ads_metadata, | |
| n_fraud_rings=len(self._episode.fraud_rings), | |
| ring_sizes=[len(r.member_ad_ids) for r in self._episode.fraud_rings], | |
| ) | |
| # ------------------------------------------------------------------ | |
| # Helpers | |
| # ------------------------------------------------------------------ | |
| def _check_done(self) -> bool: | |
| if self._episode is None: | |
| return True | |
| all_reviewed = all( | |
| ad.ad_id in self._verdicts for ad in self._episode.ads | |
| ) | |
| steps_exhausted = self._state.step_count >= self._episode.task_config.action_budget | |
| return all_reviewed or steps_exhausted | |
| def _check_link_correct(self, ad_id_1: str, ad_id_2: str) -> bool: | |
| """Check if two ads share a fraud ring.""" | |
| for ring in self._episode.fraud_rings: | |
| if ad_id_1 in ring.member_ad_ids and ad_id_2 in ring.member_ad_ids: | |
| return True | |
| return False | |
| def _get_ad(self, ad_id: str) -> Optional[Ad]: | |
| if self._episode is None: | |
| return None | |
| for ad in self._episode.ads: | |
| if ad.ad_id == ad_id: | |
| return ad | |
| return None | |
| def _build_observation( | |
| self, | |
| reward: float, | |
| done: bool, | |
| feedback_override: str | None = None, | |
| ) -> AdReviewObservation: | |
| feedback = feedback_override or self._last_feedback | |
| if self._episode is None: | |
| return AdReviewObservation( | |
| done=done, | |
| reward=reward, | |
| queue_summary="No episode loaded.", | |
| current_ad_info="", | |
| investigation_findings="", | |
| verdict_history_summary="", | |
| feedback=feedback, | |
| available_ads=[], | |
| queue_status={}, | |
| queue_may_grow=self._queue_may_grow, | |
| ) | |
| config = self._episode.task_config | |
| pending = [a for a in self._episode.ads if a.ad_id not in self._verdicts] | |
| reviewed = [a for a in self._episode.ads if a.ad_id in self._verdicts] | |
| steps_remaining = max(0, config.action_budget - self._state.step_count) | |
| # Surface the action-budget-vs-pending-ads pressure as a prominent | |
| # feedback banner. We use TWO triggers because they catch | |
| # different failure modes: | |
| # | |
| # * `steps_remaining <= 2 * n_pending` — fires when you have | |
| # less than ~2 actions per pending ad left. Catches the | |
| # "looped on ad_001 for 20 steps and now there isn't time | |
| # to verdict everyone" case the 1.5B Investigator falls into. | |
| # This trigger doesn't depend on investigation budget — even | |
| # with full investigation budget, if step budget is tight, | |
| # verdicts MUST start now. | |
| # | |
| # * `remaining_investigation_budget <= n_pending` — original | |
| # trigger. Catches the case where investigation actions | |
| # have actually been spent on real pulls, not on rejected | |
| # duplicates. | |
| if not done and pending and feedback_override is None: | |
| n_pending = len(pending) | |
| budget = self._state.remaining_budget | |
| steps_left = steps_remaining | |
| steps_pressure = steps_left <= 2 * n_pending | |
| budget_pressure = budget <= n_pending | |
| if steps_pressure or budget_pressure: | |
| # Pick the tighter wording. | |
| if steps_pressure: | |
| pressure_line = ( | |
| f"BUDGET PRESSURE: only {steps_left} step(s) left in this " | |
| f"episode for {n_pending} pending ad(s). Stop investigating " | |
| f"and START VERDICTING — issue verdict actions " | |
| f"(action_type='verdict', verdict in approve/reject/escalate) " | |
| f"on the pending ads using whatever evidence you have. " | |
| f"Unverdict-ed ads auto-approve at audit and tank the score." | |
| ) | |
| else: | |
| pressure_line = ( | |
| f"BUDGET PRESSURE: only {budget} investigation(s) left for " | |
| f"{n_pending} pending ad(s). Stop investigating and START " | |
| f"VERDICTING — issue verdict actions on the pending ads using " | |
| f"whatever evidence you have (approve, reject, or escalate). " | |
| f"Unverdict-ed ads auto-approve at audit time and tank the score." | |
| ) | |
| feedback = ( | |
| f"{pressure_line}\n\n{feedback}" if feedback else pressure_line | |
| ) | |
| queue_summary = ( | |
| f"Task: {config.name} ({config.difficulty})\n" | |
| f"Total ads: {config.queue_size} | " | |
| f"Reviewed: {len(reviewed)} | " | |
| f"Pending: {len(pending)} | " | |
| f"Steps remaining: {steps_remaining}/{config.action_budget} | " | |
| f"Investigation budget: {self._state.remaining_budget} | " | |
| f"Step: {self._state.step_count}" | |
| ) | |
| current_ad_info = "" | |
| if self._focused_ad_id and not done: | |
| ad = self._get_ad(self._focused_ad_id) | |
| if ad and ad.ad_id not in self._verdicts: | |
| signals = ", ".join(ad.initial_risk_signals) if ad.initial_risk_signals else "None" | |
| investigated = self._investigations.get(ad.ad_id, []) | |
| attempts = self._investigation_attempts.get(ad.ad_id, 0) | |
| _all_targets = [ | |
| "advertiser_history", "landing_page", "payment_method", | |
| "targeting_overlap", "campaign_structure", "policy_classifier", | |
| ] | |
| exhausted = [t for t in _all_targets if t in investigated] | |
| fresh = [t for t in _all_targets if t not in investigated] | |
| exhausted_line = ( | |
| f"ALREADY-EXHAUSTED targets for {ad.ad_id} (do NOT repeat): " | |
| f"{', '.join(exhausted) if exhausted else 'none yet'}" | |
| ) | |
| fresh_line = ( | |
| f"FRESH targets for {ad.ad_id} (you may pick one of these or verdict): " | |
| f"{', '.join(fresh) if fresh else 'none — verdict this ad now'}" | |
| ) | |
| # Contextual metadata visible before investigation | |
| profile = self._episode.advertiser_profiles.get(ad.ad_id) | |
| meta_lines = [] | |
| if profile: | |
| meta_lines.append(f"Advertiser country: {profile.country}") | |
| meta_lines.append(f"Account age: {profile.account_age_days} days") | |
| if profile.account_age_days < 30: | |
| meta_lines.append("Flag: New account (< 30 days)") | |
| context_meta = "\n".join(meta_lines) | |
| from ..data.meta_policy_taxonomy import lookup as _meta_lookup | |
| policy_entry = _meta_lookup(ad.category) | |
| meta_policy_line = ( | |
| f"Meta policy lens: {policy_entry.citation_id} — " | |
| f"{policy_entry.section} > {policy_entry.subsection}" | |
| ) | |
| # If the model has spammed this ad past N attempts, | |
| # surface that loud-and-clear at the top of the focus | |
| # block so the next observation pushes harder toward | |
| # verdict instead of silently re-anchoring. | |
| stuck_banner = "" | |
| if attempts >= 3: | |
| stuck_banner = ( | |
| f"STUCK ON {ad.ad_id}: you have attempted {attempts} " | |
| f"investigate actions on this ad. ISSUE A VERDICT NOW.\n" | |
| ) | |
| current_ad_info = ( | |
| f"{stuck_banner}" | |
| f"=== Ad in Focus: {ad.ad_id} ===\n" | |
| f"Category: {ad.category}\n" | |
| f"{meta_policy_line}\n" | |
| f"Ad copy: \"{ad.ad_copy}\"\n" | |
| f"Targeting: {ad.targeting_summary}\n" | |
| f"Initial risk signals: {signals}\n" | |
| f"{context_meta}\n" | |
| f"{exhausted_line}\n" | |
| f"{fresh_line}" | |
| ) | |
| investigation_findings = "" | |
| for ad_id, targets in self._investigations.items(): | |
| for target in targets: | |
| if self._registry is not None: | |
| finding = self._registry.lookup(ad_id, target) | |
| else: | |
| finding = self._episode.investigation_data.get(ad_id, {}).get(target, "") | |
| if finding and not finding.startswith("No data") and not finding.startswith("Unknown"): | |
| investigation_findings += f"\n[{ad_id} / {target}]\n{finding}\n" | |
| manual_verdicts = { | |
| ad_id: v for ad_id, v in self._verdicts.items() | |
| if not v.get("auto_approved") | |
| } | |
| if manual_verdicts: | |
| counts = {"approve": 0, "reject": 0, "escalate": 0} | |
| by_decision = {"approve": [], "reject": [], "escalate": []} | |
| for ad_id, v in manual_verdicts.items(): | |
| counts[v["verdict"]] = counts.get(v["verdict"], 0) + 1 | |
| by_decision[v["verdict"]].append(ad_id) | |
| summary_parts = [f"{c} {k}" for k, c in counts.items() if c > 0] | |
| verdict_lines = [ | |
| f"Reviewed {len(manual_verdicts)} ad(s): {', '.join(summary_parts)}." | |
| ] | |
| for decision in ("reject", "approve", "escalate"): | |
| if by_decision[decision]: | |
| verdict_lines.append( | |
| f" {decision}: {', '.join(by_decision[decision])}" | |
| ) | |
| verdict_history_summary = "\n".join(verdict_lines) | |
| else: | |
| verdict_history_summary = "No verdicts yet." | |
| available_ads = [a.ad_id for a in pending] | |
| queue_status = { | |
| "total_ads": config.queue_size, | |
| "reviewed": len(reviewed), | |
| "pending": len(pending), | |
| "investigation_budget": self._state.remaining_budget, | |
| "steps_remaining": steps_remaining, | |
| "step": self._state.step_count, | |
| "task_id": config.task_id, | |
| } | |
| evidence_ledger = self._build_evidence_ledger() | |
| queue_digest = self._build_queue_digest(pending) | |
| decided_ads = self._build_decided_ads() | |
| return AdReviewObservation( | |
| done=done, | |
| reward=reward, | |
| queue_summary=queue_summary, | |
| current_ad_info=current_ad_info, | |
| investigation_findings=investigation_findings.strip(), | |
| verdict_history_summary=verdict_history_summary, | |
| feedback=feedback, | |
| available_ads=available_ads, | |
| queue_status=queue_status, | |
| queue_may_grow=self._queue_may_grow, | |
| evidence_ledger=evidence_ledger, | |
| queue_digest=queue_digest, | |
| decided_ads=decided_ads, | |
| ) | |
| # Curated columns surfaced in the no-investigation queue digest so | |
| # the Investigator has SOMETHING to triage on for every pending ad, | |
| # not just the focused one. Mix of: | |
| # | |
| # * Discriminative (used for link_accounts decisions when shared | |
| # across ads): payment_type, registrar, domain | |
| # * Decoy / non-discriminative (deliberately included so the | |
| # policy must learn not to weight them): | |
| # category, country, account_age_days | |
| # | |
| # Total of ~6 columns × 12 ads ≈ 720 chars worst case, well within | |
| # prompt budget. payment_id / advertiser_id / targeting_fingerprint | |
| # are NOT exposed here — those are the high-signal columns that | |
| # MUST require an explicit investigate to avoid trivialising the | |
| # task; they remain in the evidence_ledger only. | |
| _QUEUE_DIGEST_MAX_ADS = 12 | |
| def _build_queue_digest( | |
| self, pending_ads: List[Ad] | |
| ) -> List[Dict[str, Any]]: | |
| if self._episode is None or not pending_ads: | |
| return [] | |
| lp_map = getattr(self._episode, "landing_pages", {}) or {} | |
| profiles = getattr(self._episode, "advertiser_profiles", {}) or {} | |
| rows: List[Dict[str, Any]] = [] | |
| for ad in pending_ads[: self._QUEUE_DIGEST_MAX_ADS]: | |
| row: Dict[str, Any] = { | |
| "ad_id": ad.ad_id, | |
| "category": ad.category, | |
| } | |
| profile = profiles.get(ad.ad_id) | |
| if profile is not None: | |
| row["country"] = profile.country | |
| row["account_age_days"] = profile.account_age_days | |
| row["payment_type"] = profile.payment_method_type | |
| lp = lp_map.get(ad.ad_id) | |
| if lp is not None: | |
| row["domain"] = getattr(lp, "domain", None) | |
| row["registrar"] = getattr(lp, "registrar", None) | |
| rows.append({k: v for k, v in row.items() if v is not None}) | |
| return rows | |
| def _build_evidence_ledger(self) -> Dict[str, Dict[str, Any]]: | |
| """Assemble a per-ad structured evidence table for the Investigator. | |
| Delegates to :func:`build_evidence_ledger` so the Referee can reuse | |
| the exact same extraction logic (with a different ``ad_ids`` | |
| selection) when building the Fraudster's ``my_proposal_signals``. | |
| """ | |
| if self._episode is None: | |
| return {} | |
| candidate_ad_ids: set[str] = set(self._investigations.keys()) | |
| if self._focused_ad_id: | |
| candidate_ad_ids.add(self._focused_ad_id) | |
| return build_evidence_ledger( | |
| episode=self._episode, | |
| registry=self._registry, | |
| ad_ids=candidate_ad_ids, | |
| investigations=self._investigations, | |
| ) | |
| def _build_decided_ads(self) -> list[Dict[str, Any]]: | |
| """Build a per-decided-ad summary with verdict + key signals. | |
| Each entry carries the verdict, confidence, and a curated mix of | |
| discriminative + decoy parameters from the evidence ledger. This | |
| gives the Investigator structured memory of past decisions so it | |
| can detect cross-ad collisions for link_accounts. | |
| """ | |
| if self._episode is None: | |
| return [] | |
| decided_ad_ids = [ | |
| ad_id for ad_id in self._verdicts | |
| if not self._verdicts[ad_id].get("auto_approved") | |
| ] | |
| if not decided_ad_ids: | |
| return [] | |
| ledger = build_evidence_ledger( | |
| episode=self._episode, | |
| registry=self._registry, | |
| ad_ids=decided_ad_ids, | |
| investigations=self._investigations, | |
| ) | |
| rows: list[Dict[str, Any]] = [] | |
| for ad_id in decided_ad_ids: | |
| v = self._verdicts[ad_id] | |
| entry: Dict[str, Any] = { | |
| "ad_id": ad_id, | |
| "verdict": v.get("verdict", "?"), | |
| "confidence": v.get("confidence", 0.5), | |
| } | |
| signals = ledger.get(ad_id, {}) | |
| entry.update(signals) | |
| rows.append(entry) | |
| return rows | |
| # ------------------------------------------------------------------ | |
| # Referee integration hooks | |
| # ------------------------------------------------------------------ | |
| def notify_queue_grew(self, new_ad_id: str) -> None: | |
| """ | |
| Called by the Referee after `extend_episode_with_proposal` adds a | |
| new ad to the shared episode + registry. Updates the Investigator's | |
| view of queue size and refocuses on the new ad if the Investigator | |
| is idle. | |
| """ | |
| if self._episode is None: | |
| return | |
| self._state.total_ads = len(self._episode.ads) | |
| if self._focused_ad_id is None or self._focused_ad_id in self._verdicts: | |
| self._focused_ad_id = new_ad_id | |
| def episode(self) -> Optional[GeneratedEpisode]: | |
| """Read-only access to the loaded episode (used by the Referee).""" | |
| return self._episode | |
| def registry(self) -> Optional[InvestigationToolRegistry]: | |
| """Read-only access to the shared tool registry.""" | |
| return self._registry | |
| def verdicts(self) -> Dict[str, Dict[str, Any]]: | |
| """Read-only snapshot of verdicts recorded so far (Referee/auditor).""" | |
| return dict(self._verdicts) | |
| def investigations(self) -> Dict[str, List[str]]: | |
| """Read-only snapshot of investigation targets pulled per ad.""" | |
| return {k: list(v) for k, v in self._investigations.items()} | |
| def links(self) -> List[Dict[str, Any]]: | |
| """Read-only snapshot of recorded network links.""" | |
| return list(self._links) | |
| # Backwards-compatible alias. Round 1 code, tests, clients, and external | |
| # integrations import `AdFraudEnvironment` directly; keeping the symbol | |
| # means the rename is zero-breakage. | |
| AdFraudEnvironment = InvestigatorEnvironment | |