CounterFeint / server /environment.py
QuantumTransformer's picture
Upload folder using huggingface_hub
26bf1c9 verified
"""
Core Ad Fraud Investigation Environment (Investigator role).
Implements the OpenEnv Environment interface for the Investigator agent:
reviewing a queue of ads, investigating them, and rendering verdicts under
a budget constraint.
In Round 1 this environment is used standalone (`AdFraudEnvironment` alias
preserves backwards compatibility). In Round 2 it is driven by the
`RefereeEnvironment`, which pre-generates episodes and supplies a shared
`InvestigationToolRegistry` so Fraudster-proposed ads are reachable through
the same investigation code path.
"""
from __future__ import annotations
import logging
from typing import Any, Dict, List, Optional
from uuid import uuid4
from openenv.core.env_server.interfaces import Environment
try:
from ..data.ad_generator import (
TASK_CONFIGS,
Ad,
GeneratedEpisode,
generate_episode,
)
from ..data.tool_registry import InvestigationToolRegistry
from ..models import AdFraudState, AdReviewAction, AdReviewObservation
from ..graders.base_grader import EpisodeRecord, LinkResult, VerdictResult, grade_episode
from .evidence_ledger import build_evidence_ledger
except ImportError:
from data.ad_generator import (
TASK_CONFIGS,
Ad,
GeneratedEpisode,
generate_episode,
)
from data.tool_registry import InvestigationToolRegistry
from models import AdFraudState, AdReviewAction, AdReviewObservation
from graders.base_grader import EpisodeRecord, LinkResult, VerdictResult, grade_episode
from server.evidence_ledger import build_evidence_ledger
logger = logging.getLogger(__name__)
# Module-level store so the /grader endpoint can read the last score.
_last_grader_result: Dict[str, Any] = {}
def get_last_grader_result() -> Dict[str, Any]:
return dict(_last_grader_result)
class InvestigatorEnvironment(
Environment[AdReviewAction, AdReviewObservation, AdFraudState]
):
"""
Ad fraud investigation environment (Investigator role).
Each episode is a review session: the agent processes a queue of N ads
within a limited action budget, choosing what to investigate and when
to render verdicts. Unreviewed ads auto-approve at episode end.
`reset()` accepts optional `episode` and `registry` kwargs so the
Referee (or a test harness) can inject a pre-built `GeneratedEpisode`
plus a shared `InvestigationToolRegistry`. Without them, the
environment generates its own synthetic episode and a fresh registry
(the Round 1 behaviour).
"""
SUPPORTS_CONCURRENT_SESSIONS = True
def __init__(self) -> None:
super().__init__()
self._state = AdFraudState(episode_id=str(uuid4()), step_count=0)
self._episode: Optional[GeneratedEpisode] = None
self._registry: Optional[InvestigationToolRegistry] = None
self._verdicts: Dict[str, Dict[str, Any]] = {}
self._links: List[Dict[str, Any]] = []
self._investigations: Dict[str, List[str]] = {}
# Total `investigate` attempts per ad — INCLUDING ones that the
# env rejects (duplicate target, hit cap, etc.).
self._investigation_attempts: Dict[str, int] = {}
self._cumulative_reward: float = 0.0
self._done = False
self._last_feedback = ""
self._focused_ad_id: Optional[str] = None
self._queue_may_grow: bool = False
# ------------------------------------------------------------------
# OpenEnv interface
# ------------------------------------------------------------------
def reset(
self,
seed: int | None = None,
episode_id: str | None = None,
**kwargs: Any,
) -> AdReviewObservation:
task_id = kwargs.get("task_id", "task_1")
if task_id not in TASK_CONFIGS:
task_id = "task_1"
injected_episode: Optional[GeneratedEpisode] = kwargs.get("episode")
injected_registry: Optional[InvestigationToolRegistry] = kwargs.get("registry")
self._queue_may_grow = bool(kwargs.get("queue_may_grow", False))
if injected_episode is not None:
self._episode = injected_episode
else:
effective_seed = (
seed if seed is not None else hash(uuid4()) & 0xFFFFFFFF
)
self._episode = generate_episode(effective_seed, task_id)
if injected_registry is not None:
self._registry = injected_registry
else:
self._registry = InvestigationToolRegistry.from_episode(self._episode)
config = self._episode.task_config
self._state = AdFraudState(
episode_id=episode_id or str(uuid4()),
step_count=0,
task_id=task_id,
total_ads=config.queue_size,
reviewed_count=0,
remaining_budget=config.action_budget,
verdicts={},
grader_score=None,
)
self._verdicts = {}
self._links = []
self._investigations = {}
self._investigation_attempts = {}
self._cumulative_reward = 0.0
self._done = False
self._last_feedback = "Episode started. Review the ad queue and begin your investigation."
self._focused_ad_id = self._episode.ads[0].ad_id if self._episode.ads else None
return self._build_observation(reward=0.0, done=False)
def step(
self,
action: AdReviewAction,
timeout_s: float | None = None,
**kwargs: Any,
) -> AdReviewObservation:
if self._done:
return self._build_observation(
reward=0.0, done=True,
feedback_override="Episode is already complete. Call reset() to start a new episode.",
)
if self._episode is None:
return self._build_observation(
reward=0.0, done=False,
feedback_override="Environment not initialized. Call reset() first.",
)
self._state.step_count += 1
ad_ids = {a.ad_id for a in self._episode.ads}
if action.ad_id not in ad_ids:
self._last_feedback = f"Invalid ad_id '{action.ad_id}'. Valid IDs: {', '.join(sorted(ad_ids))}"
return self._build_observation(reward=-0.05, done=False)
if action.action_type == "investigate":
reward = self._handle_investigate(action)
elif action.action_type == "verdict":
reward = self._handle_verdict(action)
elif action.action_type == "link_accounts":
reward = self._handle_link(action)
else:
self._last_feedback = f"Unknown action_type '{action.action_type}'."
reward = -0.05
self._cumulative_reward += reward
done = self._check_done()
if done and not self._done:
end_reward = self._handle_episode_end()
reward += end_reward
self._cumulative_reward += end_reward
self._done = True
self._state.remaining_budget = max(0, self._state.remaining_budget)
self._state.reviewed_count = len(self._verdicts)
self._state.verdicts = {
ad_id: v.get("verdict", "") for ad_id, v in self._verdicts.items()
}
return self._build_observation(reward=reward, done=self._done)
@property
def state(self) -> AdFraudState:
return self._state
# ------------------------------------------------------------------
# Action handlers
# ------------------------------------------------------------------
# Hard cap on TOTAL investigate attempts per ad — including the ones
# the env rejects (duplicate target, ad already verdicted, etc.).
_MAX_INVESTIGATION_ATTEMPTS_PER_AD = 5
_MAX_INVESTIGATIONS_PER_AD = 3
def _rotate_focus_off(self, current_ad_id: str) -> Optional[str]:
"""Move focus to the next pending ad that is NOT ``current_ad_id``.
Returns the new focus ad_id, or ``None`` if there is no other
pending ad. Used by ``_handle_investigate`` when the model is
stuck looping on a single ad — rotating focus changes the
``Ad in Focus`` block in the next observation and breaks the
prompt-anchoring effect that's keeping the 1.5B Investigator
glued to one ad.
"""
if self._episode is None:
return None
pending_others = [
a.ad_id for a in self._episode.ads
if a.ad_id not in self._verdicts and a.ad_id != current_ad_id
]
if pending_others:
self._focused_ad_id = pending_others[0]
return pending_others[0]
return None
def _handle_investigate(self, action: AdReviewAction) -> float:
ad_id = action.ad_id
# Count EVERY attempt, including the ones we're about to reject —
# this is the primary loop-breaking signal.
attempts = self._investigation_attempts.get(ad_id, 0) + 1
self._investigation_attempts[ad_id] = attempts
if self._state.remaining_budget <= 0:
self._last_feedback = "No budget remaining. You must render verdicts on remaining ads or end the episode."
return -0.02
if action.investigation_target is None:
self._last_feedback = "investigation_target is required for action_type='investigate'."
return -0.05
if ad_id in self._verdicts:
new_focus = self._rotate_focus_off(ad_id)
self._last_feedback = (
f"You already rendered a verdict on {ad_id}. "
+ (
f"Focus moved to {new_focus} — investigate or verdict that ad next."
if new_focus else
"All other ads are also verdicted; emit verdict actions for any remaining pending ads."
)
)
return -0.02
# Hard cap on TOTAL attempts (catches the duplicate-target spam
# loop). Fires BEFORE the duplicate-target check so we hit the
# cap first when the model is just spamming the same target.
if attempts > self._MAX_INVESTIGATION_ATTEMPTS_PER_AD:
new_focus = self._rotate_focus_off(ad_id)
done_targets = self._investigations.get(ad_id, [])
self._last_feedback = (
f"REJECTED: {ad_id} has been probed {attempts} times — that's the cap. "
f"STOP TRYING TO INVESTIGATE {ad_id}. "
f"Issue a verdict on {ad_id} NOW (action_type='verdict', verdict in "
"{approve, reject, escalate}). "
+ (
f"Successful pulls on {ad_id} so far: {', '.join(done_targets) or 'none'}. "
if done_targets else ""
)
+ (
f"Focus moved to {new_focus}; investigate that ad if you'd rather "
"build evidence on a fresh ad first."
if new_focus else
"No other pending ads — verdict the remaining pending ads."
)
)
return -0.10
prev = self._investigations.setdefault(ad_id, [])
target = action.investigation_target
if target in prev:
# Duplicate-target case. Don't spend budget. Surface the
# un-pulled targets so the model has a concrete next pick.
allowed = {
"advertiser_history", "landing_page", "payment_method",
"targeting_overlap", "campaign_structure", "policy_classifier",
}
remaining = sorted(allowed - set(prev))
self._last_feedback = (
f"You already investigated '{target}' for {ad_id} "
f"(attempt {attempts}/{self._MAX_INVESTIGATION_ATTEMPTS_PER_AD}). "
f"Either issue a verdict on {ad_id} now, OR pick a fresh target from "
f"[{', '.join(remaining) if remaining else '(none left — verdict it)'}], "
"OR investigate a different ad_id from the pending queue."
)
return -0.02
# Sub-cap on successful pulls (tighter than the attempt cap).
# Forces verdict after 3 successful investigations.
if len(prev) >= self._MAX_INVESTIGATIONS_PER_AD:
new_focus = self._rotate_focus_off(ad_id)
self._last_feedback = (
f"{ad_id} has reached the {self._MAX_INVESTIGATIONS_PER_AD}-investigation cap "
f"(already pulled: {', '.join(prev)}). Issue a verdict on {ad_id} now "
"(action_type='verdict' with verdict in {approve, reject, escalate})."
+ (f" Focus moved to {new_focus}." if new_focus else "")
)
return -0.05
self._state.remaining_budget -= 1
prev.append(target)
# Note: focus is updated to the ad we just investigated only when
# the Investigator hasn't already accumulated >=2 investigations
# on it. Past 2 investigations on the same ad we keep focus on
# the EXISTING focused ad (which may be a different one) so the
# prompt doesn't re-anchor the model to an ad it should be
# rendering a verdict on, NOT investigating further. This nudges
# the policy toward "investigate twice → verdict → move on"
# instead of looping investigations on a single ad.
if len(prev) <= 2 or self._focused_ad_id is None:
self._focused_ad_id = ad_id
if self._registry is not None:
findings = self._registry.lookup(ad_id, target)
else:
findings = self._episode.investigation_data.get(ad_id, {}).get(
target, "No data available for this investigation type."
)
# Escalating per-investigate penalty so a runaway investigate
# loop on a single ad gets progressively more expensive — pushes
# the policy toward issuing a verdict once it has enough signal,
# rather than burning steps re-checking the same ad.
n_targets = len(prev)
if n_targets <= 2:
penalty = -0.02
elif n_targets == 3:
penalty = -0.05
else:
penalty = -0.10
feedback_lines = [
f"Investigation complete: {target} for {ad_id}.",
f"--- Findings ---\n{findings}",
]
if n_targets >= 2:
feedback_lines.append(
f"Note: you have now investigated {ad_id} {n_targets}x — "
"issue a verdict on it instead of more investigations."
)
self._last_feedback = "\n".join(feedback_lines)
return penalty
def _handle_verdict(self, action: AdReviewAction) -> float:
ad_id = action.ad_id
if ad_id in self._verdicts:
self._last_feedback = f"You already rendered a verdict on {ad_id}."
return -0.02
if action.verdict is None:
self._last_feedback = "verdict field is required for action_type='verdict'."
return -0.05
confidence = action.confidence if action.confidence is not None else 0.5
ad = self._get_ad(ad_id)
ground_truth = ad.ground_truth_label if ad else "legit"
severity = ad.severity if ad else 0.0
self._verdicts[ad_id] = {
"verdict": action.verdict,
"confidence": confidence,
"ground_truth": ground_truth,
}
reward = self._compute_verdict_reward(action.verdict, ground_truth, severity, confidence)
pending = [a.ad_id for a in self._episode.ads if a.ad_id not in self._verdicts]
self._last_feedback = (
f"Verdict recorded for {ad_id}: {action.verdict} "
f"(confidence: {confidence:.2f}). "
f"{len(pending)} ad(s) remaining in queue."
)
if pending:
self._focused_ad_id = pending[0]
return reward
def _handle_link(self, action: AdReviewAction) -> float:
if action.linked_ad_id is None:
self._last_feedback = "linked_ad_id is required for action_type='link_accounts'."
return -0.05
ad_ids = {a.ad_id for a in self._episode.ads}
if action.linked_ad_id not in ad_ids:
self._last_feedback = f"Invalid linked_ad_id '{action.linked_ad_id}'."
return -0.05
if action.ad_id == action.linked_ad_id:
self._last_feedback = "Cannot link an ad to itself."
return -0.05
link_key = tuple(sorted([action.ad_id, action.linked_ad_id]))
existing = {tuple(sorted([l["ad_id_1"], l["ad_id_2"]])) for l in self._links}
if link_key in existing:
self._last_feedback = f"Link between {action.ad_id} and {action.linked_ad_id} already recorded."
return -0.02
is_correct = self._check_link_correct(action.ad_id, action.linked_ad_id)
self._links.append({
"ad_id_1": action.ad_id,
"ad_id_2": action.linked_ad_id,
"reason": action.link_reason or "",
"correct": is_correct,
})
self._last_feedback = (
f"Network link recorded: {action.ad_id} <-> {action.linked_ad_id}. "
f"Reason: {action.link_reason or 'not specified'}."
)
return 0.4 if is_correct else -0.25
# ------------------------------------------------------------------
# Reward computation
# ------------------------------------------------------------------
def _compute_verdict_reward(
self, verdict: str, ground_truth: str, severity: float, confidence: float
) -> float:
if verdict == "reject" and ground_truth == "fraud":
return 0.3 + 0.1 * severity
elif verdict == "approve" and ground_truth == "legit":
return 0.1
elif verdict == "escalate" and ground_truth == "escalate":
return 0.15
elif verdict == "reject" and ground_truth == "legit":
return -0.35
elif verdict == "approve" and ground_truth == "fraud":
return -0.5
elif verdict == "escalate":
return -0.05
elif verdict == "approve" and ground_truth == "escalate":
return -0.15
elif verdict == "reject" and ground_truth == "escalate":
return -0.1
else:
return -0.05
def _handle_episode_end(self) -> float:
"""Apply end-of-episode adjustments for unreviewed ads, then delegate to graders."""
unreviewed_fraud = 0
for ad in self._episode.ads:
if ad.ad_id not in self._verdicts:
self._verdicts[ad.ad_id] = {
"verdict": "approve",
"confidence": 0.0,
"ground_truth": ad.ground_truth_label,
"auto_approved": True,
}
if ad.ground_truth_label == "fraud":
unreviewed_fraud += 1
record = self._build_episode_record()
grader_score = grade_episode(record)
self._state.grader_score = grader_score
reviewed_count = len([v for v in self._verdicts.values() if not v.get("auto_approved")])
total_ads = len(self._episode.ads)
total_correct = sum(
1 for v in self._verdicts.values()
if not v.get("auto_approved")
and (
(v["verdict"] == "reject" and v["ground_truth"] == "fraud")
or (v["verdict"] == "approve" and v["ground_truth"] == "legit")
or (v["verdict"] == "escalate" and v["ground_truth"] == "escalate")
)
)
false_positives = sum(
1 for v in self._verdicts.values()
if not v.get("auto_approved")
and v["verdict"] == "reject" and v["ground_truth"] == "legit"
)
false_negatives = sum(
1 for v in self._verdicts.values()
if not v.get("auto_approved")
and v["verdict"] == "approve" and v["ground_truth"] == "fraud"
)
correct_links = sum(1 for l in self._links if l.get("correct"))
incorrect_links = sum(1 for l in self._links if not l.get("correct"))
global _last_grader_result
_last_grader_result = {
"task_id": self._state.task_id,
"grader_score": grader_score,
"episode_id": self._state.episode_id,
"total_steps": self._state.step_count,
"verdicts_rendered": reviewed_count,
"correct_decisions": total_correct,
"false_positives": false_positives,
"false_negatives": false_negatives,
"auto_approved": total_ads - reviewed_count,
"unreviewed_fraud": unreviewed_fraud,
"network_links_correct": correct_links,
"network_links_incorrect": incorrect_links,
}
feedback_lines = [
f"Episode complete. Grader score: {grader_score:.3f}/1.000",
f"Verdicts rendered: {reviewed_count}/{total_ads}",
f"Correct decisions: {total_correct}/{reviewed_count}",
f"False positives (legit rejected): {false_positives}",
f"False negatives (fraud approved): {false_negatives}",
f"Unreviewed ads auto-approved: {unreviewed_fraud}",
]
if self._links:
feedback_lines.append(
f"Network links: {correct_links} correct, {incorrect_links} incorrect"
)
self._last_feedback = "\n".join(feedback_lines)
return 0.0
def _build_episode_record(self) -> EpisodeRecord:
"""Convert internal state into an EpisodeRecord for the grader."""
verdict_results = []
for ad in self._episode.ads:
v = self._verdicts.get(ad.ad_id)
if v:
verdict_results.append(VerdictResult(
ad_id=ad.ad_id,
verdict=v["verdict"],
confidence=v.get("confidence", 0.5),
ground_truth=v["ground_truth"],
auto_approved=v.get("auto_approved", False),
))
link_results = [
LinkResult(ad_id_1=l["ad_id_1"], ad_id_2=l["ad_id_2"], correct=l["correct"])
for l in self._links
]
ads_metadata = [
{"ad_id": ad.ad_id, "ground_truth": ad.ground_truth_label, "severity": ad.severity}
for ad in self._episode.ads
]
return EpisodeRecord(
task_id=self._state.task_id,
total_steps=self._state.step_count,
action_budget=self._episode.task_config.action_budget,
verdicts=verdict_results,
links=link_results,
ads_metadata=ads_metadata,
n_fraud_rings=len(self._episode.fraud_rings),
ring_sizes=[len(r.member_ad_ids) for r in self._episode.fraud_rings],
)
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _check_done(self) -> bool:
if self._episode is None:
return True
all_reviewed = all(
ad.ad_id in self._verdicts for ad in self._episode.ads
)
steps_exhausted = self._state.step_count >= self._episode.task_config.action_budget
return all_reviewed or steps_exhausted
def _check_link_correct(self, ad_id_1: str, ad_id_2: str) -> bool:
"""Check if two ads share a fraud ring."""
for ring in self._episode.fraud_rings:
if ad_id_1 in ring.member_ad_ids and ad_id_2 in ring.member_ad_ids:
return True
return False
def _get_ad(self, ad_id: str) -> Optional[Ad]:
if self._episode is None:
return None
for ad in self._episode.ads:
if ad.ad_id == ad_id:
return ad
return None
def _build_observation(
self,
reward: float,
done: bool,
feedback_override: str | None = None,
) -> AdReviewObservation:
feedback = feedback_override or self._last_feedback
if self._episode is None:
return AdReviewObservation(
done=done,
reward=reward,
queue_summary="No episode loaded.",
current_ad_info="",
investigation_findings="",
verdict_history_summary="",
feedback=feedback,
available_ads=[],
queue_status={},
queue_may_grow=self._queue_may_grow,
)
config = self._episode.task_config
pending = [a for a in self._episode.ads if a.ad_id not in self._verdicts]
reviewed = [a for a in self._episode.ads if a.ad_id in self._verdicts]
steps_remaining = max(0, config.action_budget - self._state.step_count)
# Surface the action-budget-vs-pending-ads pressure as a prominent
# feedback banner. We use TWO triggers because they catch
# different failure modes:
#
# * `steps_remaining <= 2 * n_pending` — fires when you have
# less than ~2 actions per pending ad left. Catches the
# "looped on ad_001 for 20 steps and now there isn't time
# to verdict everyone" case the 1.5B Investigator falls into.
# This trigger doesn't depend on investigation budget — even
# with full investigation budget, if step budget is tight,
# verdicts MUST start now.
#
# * `remaining_investigation_budget <= n_pending` — original
# trigger. Catches the case where investigation actions
# have actually been spent on real pulls, not on rejected
# duplicates.
if not done and pending and feedback_override is None:
n_pending = len(pending)
budget = self._state.remaining_budget
steps_left = steps_remaining
steps_pressure = steps_left <= 2 * n_pending
budget_pressure = budget <= n_pending
if steps_pressure or budget_pressure:
# Pick the tighter wording.
if steps_pressure:
pressure_line = (
f"BUDGET PRESSURE: only {steps_left} step(s) left in this "
f"episode for {n_pending} pending ad(s). Stop investigating "
f"and START VERDICTING — issue verdict actions "
f"(action_type='verdict', verdict in approve/reject/escalate) "
f"on the pending ads using whatever evidence you have. "
f"Unverdict-ed ads auto-approve at audit and tank the score."
)
else:
pressure_line = (
f"BUDGET PRESSURE: only {budget} investigation(s) left for "
f"{n_pending} pending ad(s). Stop investigating and START "
f"VERDICTING — issue verdict actions on the pending ads using "
f"whatever evidence you have (approve, reject, or escalate). "
f"Unverdict-ed ads auto-approve at audit time and tank the score."
)
feedback = (
f"{pressure_line}\n\n{feedback}" if feedback else pressure_line
)
queue_summary = (
f"Task: {config.name} ({config.difficulty})\n"
f"Total ads: {config.queue_size} | "
f"Reviewed: {len(reviewed)} | "
f"Pending: {len(pending)} | "
f"Steps remaining: {steps_remaining}/{config.action_budget} | "
f"Investigation budget: {self._state.remaining_budget} | "
f"Step: {self._state.step_count}"
)
current_ad_info = ""
if self._focused_ad_id and not done:
ad = self._get_ad(self._focused_ad_id)
if ad and ad.ad_id not in self._verdicts:
signals = ", ".join(ad.initial_risk_signals) if ad.initial_risk_signals else "None"
investigated = self._investigations.get(ad.ad_id, [])
attempts = self._investigation_attempts.get(ad.ad_id, 0)
_all_targets = [
"advertiser_history", "landing_page", "payment_method",
"targeting_overlap", "campaign_structure", "policy_classifier",
]
exhausted = [t for t in _all_targets if t in investigated]
fresh = [t for t in _all_targets if t not in investigated]
exhausted_line = (
f"ALREADY-EXHAUSTED targets for {ad.ad_id} (do NOT repeat): "
f"{', '.join(exhausted) if exhausted else 'none yet'}"
)
fresh_line = (
f"FRESH targets for {ad.ad_id} (you may pick one of these or verdict): "
f"{', '.join(fresh) if fresh else 'none — verdict this ad now'}"
)
# Contextual metadata visible before investigation
profile = self._episode.advertiser_profiles.get(ad.ad_id)
meta_lines = []
if profile:
meta_lines.append(f"Advertiser country: {profile.country}")
meta_lines.append(f"Account age: {profile.account_age_days} days")
if profile.account_age_days < 30:
meta_lines.append("Flag: New account (< 30 days)")
context_meta = "\n".join(meta_lines)
from ..data.meta_policy_taxonomy import lookup as _meta_lookup
policy_entry = _meta_lookup(ad.category)
meta_policy_line = (
f"Meta policy lens: {policy_entry.citation_id} — "
f"{policy_entry.section} > {policy_entry.subsection}"
)
# If the model has spammed this ad past N attempts,
# surface that loud-and-clear at the top of the focus
# block so the next observation pushes harder toward
# verdict instead of silently re-anchoring.
stuck_banner = ""
if attempts >= 3:
stuck_banner = (
f"STUCK ON {ad.ad_id}: you have attempted {attempts} "
f"investigate actions on this ad. ISSUE A VERDICT NOW.\n"
)
current_ad_info = (
f"{stuck_banner}"
f"=== Ad in Focus: {ad.ad_id} ===\n"
f"Category: {ad.category}\n"
f"{meta_policy_line}\n"
f"Ad copy: \"{ad.ad_copy}\"\n"
f"Targeting: {ad.targeting_summary}\n"
f"Initial risk signals: {signals}\n"
f"{context_meta}\n"
f"{exhausted_line}\n"
f"{fresh_line}"
)
investigation_findings = ""
for ad_id, targets in self._investigations.items():
for target in targets:
if self._registry is not None:
finding = self._registry.lookup(ad_id, target)
else:
finding = self._episode.investigation_data.get(ad_id, {}).get(target, "")
if finding and not finding.startswith("No data") and not finding.startswith("Unknown"):
investigation_findings += f"\n[{ad_id} / {target}]\n{finding}\n"
manual_verdicts = {
ad_id: v for ad_id, v in self._verdicts.items()
if not v.get("auto_approved")
}
if manual_verdicts:
counts = {"approve": 0, "reject": 0, "escalate": 0}
by_decision = {"approve": [], "reject": [], "escalate": []}
for ad_id, v in manual_verdicts.items():
counts[v["verdict"]] = counts.get(v["verdict"], 0) + 1
by_decision[v["verdict"]].append(ad_id)
summary_parts = [f"{c} {k}" for k, c in counts.items() if c > 0]
verdict_lines = [
f"Reviewed {len(manual_verdicts)} ad(s): {', '.join(summary_parts)}."
]
for decision in ("reject", "approve", "escalate"):
if by_decision[decision]:
verdict_lines.append(
f" {decision}: {', '.join(by_decision[decision])}"
)
verdict_history_summary = "\n".join(verdict_lines)
else:
verdict_history_summary = "No verdicts yet."
available_ads = [a.ad_id for a in pending]
queue_status = {
"total_ads": config.queue_size,
"reviewed": len(reviewed),
"pending": len(pending),
"investigation_budget": self._state.remaining_budget,
"steps_remaining": steps_remaining,
"step": self._state.step_count,
"task_id": config.task_id,
}
evidence_ledger = self._build_evidence_ledger()
queue_digest = self._build_queue_digest(pending)
decided_ads = self._build_decided_ads()
return AdReviewObservation(
done=done,
reward=reward,
queue_summary=queue_summary,
current_ad_info=current_ad_info,
investigation_findings=investigation_findings.strip(),
verdict_history_summary=verdict_history_summary,
feedback=feedback,
available_ads=available_ads,
queue_status=queue_status,
queue_may_grow=self._queue_may_grow,
evidence_ledger=evidence_ledger,
queue_digest=queue_digest,
decided_ads=decided_ads,
)
# Curated columns surfaced in the no-investigation queue digest so
# the Investigator has SOMETHING to triage on for every pending ad,
# not just the focused one. Mix of:
#
# * Discriminative (used for link_accounts decisions when shared
# across ads): payment_type, registrar, domain
# * Decoy / non-discriminative (deliberately included so the
# policy must learn not to weight them):
# category, country, account_age_days
#
# Total of ~6 columns × 12 ads ≈ 720 chars worst case, well within
# prompt budget. payment_id / advertiser_id / targeting_fingerprint
# are NOT exposed here — those are the high-signal columns that
# MUST require an explicit investigate to avoid trivialising the
# task; they remain in the evidence_ledger only.
_QUEUE_DIGEST_MAX_ADS = 12
def _build_queue_digest(
self, pending_ads: List[Ad]
) -> List[Dict[str, Any]]:
if self._episode is None or not pending_ads:
return []
lp_map = getattr(self._episode, "landing_pages", {}) or {}
profiles = getattr(self._episode, "advertiser_profiles", {}) or {}
rows: List[Dict[str, Any]] = []
for ad in pending_ads[: self._QUEUE_DIGEST_MAX_ADS]:
row: Dict[str, Any] = {
"ad_id": ad.ad_id,
"category": ad.category,
}
profile = profiles.get(ad.ad_id)
if profile is not None:
row["country"] = profile.country
row["account_age_days"] = profile.account_age_days
row["payment_type"] = profile.payment_method_type
lp = lp_map.get(ad.ad_id)
if lp is not None:
row["domain"] = getattr(lp, "domain", None)
row["registrar"] = getattr(lp, "registrar", None)
rows.append({k: v for k, v in row.items() if v is not None})
return rows
def _build_evidence_ledger(self) -> Dict[str, Dict[str, Any]]:
"""Assemble a per-ad structured evidence table for the Investigator.
Delegates to :func:`build_evidence_ledger` so the Referee can reuse
the exact same extraction logic (with a different ``ad_ids``
selection) when building the Fraudster's ``my_proposal_signals``.
"""
if self._episode is None:
return {}
candidate_ad_ids: set[str] = set(self._investigations.keys())
if self._focused_ad_id:
candidate_ad_ids.add(self._focused_ad_id)
return build_evidence_ledger(
episode=self._episode,
registry=self._registry,
ad_ids=candidate_ad_ids,
investigations=self._investigations,
)
def _build_decided_ads(self) -> list[Dict[str, Any]]:
"""Build a per-decided-ad summary with verdict + key signals.
Each entry carries the verdict, confidence, and a curated mix of
discriminative + decoy parameters from the evidence ledger. This
gives the Investigator structured memory of past decisions so it
can detect cross-ad collisions for link_accounts.
"""
if self._episode is None:
return []
decided_ad_ids = [
ad_id for ad_id in self._verdicts
if not self._verdicts[ad_id].get("auto_approved")
]
if not decided_ad_ids:
return []
ledger = build_evidence_ledger(
episode=self._episode,
registry=self._registry,
ad_ids=decided_ad_ids,
investigations=self._investigations,
)
rows: list[Dict[str, Any]] = []
for ad_id in decided_ad_ids:
v = self._verdicts[ad_id]
entry: Dict[str, Any] = {
"ad_id": ad_id,
"verdict": v.get("verdict", "?"),
"confidence": v.get("confidence", 0.5),
}
signals = ledger.get(ad_id, {})
entry.update(signals)
rows.append(entry)
return rows
# ------------------------------------------------------------------
# Referee integration hooks
# ------------------------------------------------------------------
def notify_queue_grew(self, new_ad_id: str) -> None:
"""
Called by the Referee after `extend_episode_with_proposal` adds a
new ad to the shared episode + registry. Updates the Investigator's
view of queue size and refocuses on the new ad if the Investigator
is idle.
"""
if self._episode is None:
return
self._state.total_ads = len(self._episode.ads)
if self._focused_ad_id is None or self._focused_ad_id in self._verdicts:
self._focused_ad_id = new_ad_id
@property
def episode(self) -> Optional[GeneratedEpisode]:
"""Read-only access to the loaded episode (used by the Referee)."""
return self._episode
@property
def registry(self) -> Optional[InvestigationToolRegistry]:
"""Read-only access to the shared tool registry."""
return self._registry
@property
def verdicts(self) -> Dict[str, Dict[str, Any]]:
"""Read-only snapshot of verdicts recorded so far (Referee/auditor)."""
return dict(self._verdicts)
@property
def investigations(self) -> Dict[str, List[str]]:
"""Read-only snapshot of investigation targets pulled per ad."""
return {k: list(v) for k, v in self._investigations.items()}
@property
def links(self) -> List[Dict[str, Any]]:
"""Read-only snapshot of recorded network links."""
return list(self._links)
# Backwards-compatible alias. Round 1 code, tests, clients, and external
# integrations import `AdFraudEnvironment` directly; keeping the symbol
# means the rename is zero-breakage.
AdFraudEnvironment = InvestigatorEnvironment