""" Scripted Fraudster policies (baselines + adversarial opponents). All three policies consume a `FraudsterObservation` dict (as delivered over the WebSocket) and return a `FraudsterAction` Pydantic instance. ScriptedFraudster deterministic: propose 2 ads round 1, 1 ad thereafter, commit_final once all proposals are used. ReactiveFraudster adapts category + content based on which of its prior proposals were rejected and which investigation targets the Investigator kept pulling. GibberishFraudster low-plausibility negative control: proposes random gibberish ad copy (for sanity-checking plausibility grader). """ from __future__ import annotations import random import string from typing import Any, Dict, List, Optional from ..models import FraudsterAction from ._base import PolicyBase # --------------------------------------------------------------------------- # Shared helpers # --------------------------------------------------------------------------- _LEGIT_CAMOUFLAGE_CATEGORIES = ( "ecommerce", "saas", "local_service", "education", "fitness", ) _FRAUD_CATEGORIES = ( "fake_giveaway", "fake_crypto", "fake_weight_loss", "gray_area_supplements", "fake_emergency", ) _CAMOUFLAGE_ADS = [ ( "Boost your small business inventory tracking with CloudLedger — 14-day free trial.", "CloudLedger helps shops manage SKUs, stock levels, and vendor orders.", "saas", "SMB owners, US, interests: accounting / inventory", ), ( "Upgrade your home gym with adjustable dumbbells, now $129 with free shipping.", "Space-saving dumbbells that replace a full rack, ships in 48h.", "fitness", "Adults 25-45, US, home-gym interest", ), ( "Online Python fundamentals course with projects and a live mentor — enroll today.", "8-week beginner Python bootcamp, weekly office hours, job-ready portfolio.", "education", "Career-switchers 22-35", ), ( "Downtown dog-walking service — $18 per walk, insured handlers, book online.", "Reliable neighborhood dog walking with GPS tracking and photo updates.", "local_service", "Urban dog owners, city centers", ), ( "Refurbished iPhone 13 256GB — warranty included, $399 this week only.", "Certified refurbished phones with 12-month warranty and 30-day returns.", "ecommerce", "Adults 18-55, price-sensitive shoppers", ), ] _FRAUD_ADS = [ ( "Free iPhone 15 — first 500 customers only, tap now!", "Giveaway ends at midnight, quick verify your shipping address.", "fake_giveaway", "Anyone, low-income targeting", ), ( "Invest in MoonCoin — guaranteed 50x returns in 30 days, risk-free!", "Join our MoonCoin pre-sale, 10% bonus for first buyers.", "fake_crypto", "Men 18-35, crypto interest", ), ( "Lose 20 pounds in 7 days with this one weird trick doctors hate!", "Revolutionary weight-loss pill that dissolves fat overnight.", "fake_weight_loss", "Women 25-55, weight-loss interest", ), ] def _truncate(text: str, limit: int) -> str: return text if len(text) <= limit else text[: limit - 3] + "..." def _choose_allowed( preferred: str, observation: Dict[str, Any], *, fallback: str = "ecommerce" ) -> str: """Pick `preferred` if it's in the observation's allowed_categories, else fall back.""" allowed: List[str] = observation.get("allowed_categories") or [] if not allowed: return preferred if preferred in allowed: return preferred for cat in _LEGIT_CAMOUFLAGE_CATEGORIES: if cat in allowed: return cat if fallback in allowed: return fallback return allowed[0] # --------------------------------------------------------------------------- # ScriptedFraudster # --------------------------------------------------------------------------- class ScriptedFraudster(PolicyBase): """ Deterministic fraudster. Round 1 proposes two ads (one camouflage, one overt fraud) and ends turn; subsequent rounds propose one more if budget allows; once proposals are exhausted (or on round N) it commits_final. Good for regression tests and as a baseline opponent. """ def __init__(self, *, seed: int = 0) -> None: self._rng = random.Random(seed) self._seed = seed self._turn_actions_taken = 0 def reset(self) -> None: self._rng = random.Random(self._seed) self._turn_actions_taken = 0 def act(self, observation: Dict[str, Any]) -> FraudsterAction: remaining = int(observation.get("proposals_remaining", 0)) actions_left = int(observation.get("actions_left_this_turn", 1)) round_number = int(observation.get("round_number", 1)) if remaining <= 0: return FraudsterAction( action_type="commit_final", rationale="No proposals left; locking in.", ) if actions_left <= 1: ad_copy, blurb, cat, targeting = self._pick_ad(round_number) cat = _choose_allowed(cat, observation) return FraudsterAction( action_type="propose_ad", ad_copy=_truncate(ad_copy, 2000), landing_page_blurb=_truncate(blurb, 2000), category=cat, targeting_summary=targeting, rationale=( "ScriptedFraudster: final slot of the turn, proposing once " "before yielding to Investigator." ), ) ad_copy, blurb, cat, targeting = self._pick_ad(round_number) cat = _choose_allowed(cat, observation) return FraudsterAction( action_type="propose_ad", ad_copy=_truncate(ad_copy, 2000), landing_page_blurb=_truncate(blurb, 2000), category=cat, targeting_summary=targeting, rationale="ScriptedFraudster: executing canonical round plan.", ) def _pick_ad(self, round_number: int): if round_number == 1: return self._rng.choice(_FRAUD_ADS) return self._rng.choice(_CAMOUFLAGE_ADS) # --------------------------------------------------------------------------- # ReactiveFraudster # --------------------------------------------------------------------------- class ReactiveFraudster(PolicyBase): """ Reactive fraudster. Observes the Investigator's prior verdicts and investigation targets, then adapts: * If any of its own proposals were rejected, it switches to camouflage categories and invests in a stronger landing-page blurb. * If the Investigator keeps pulling `landing_page`, the Fraudster beefs up that field; if `advertiser_history`, it adds benign targeting context. * Commits_final early if the Investigator has been soft (high approval rate on prior proposals → try to lock them in before audit). """ def __init__(self, *, seed: int = 0) -> None: self._rng = random.Random(seed) self._seed = seed def reset(self) -> None: self._rng = random.Random(self._seed) def act(self, observation: Dict[str, Any]) -> FraudsterAction: remaining = int(observation.get("proposals_remaining", 0)) actions_left = int(observation.get("actions_left_this_turn", 1)) my_rejected = self._my_rejected_count(observation) my_approved = self._my_approved_count(observation) targets_emphasized = self._dominant_investigation_target(observation) if remaining <= 0: return FraudsterAction( action_type="commit_final", rationale=( f"ReactiveFraudster: budget exhausted; {my_approved} of my " f"proposals approved so far, locking in." ), ) if my_approved >= 2 and self._rng.random() < 0.35: return FraudsterAction( action_type="commit_final", rationale=( "ReactiveFraudster: Investigator is lenient; lock in current " "slate before they harden." ), ) mod = self._maybe_modify_pending(observation, targets_emphasized) if mod is not None: return mod if my_rejected >= 1: ad_copy, blurb, cat, targeting = self._rng.choice(_CAMOUFLAGE_ADS) rationale = ( f"ReactiveFraudster: pivoting to camouflage ({cat}) after " f"{my_rejected} rejections; Investigator pulled " f"{targets_emphasized or 'no signal'}." ) else: ad_copy, blurb, cat, targeting = self._rng.choice(_FRAUD_ADS) rationale = ( "ReactiveFraudster: Investigator hasn't rejected me yet; " "testing a borderline/fraud template." ) if targets_emphasized == "landing_page": blurb = ( "Trusted brand with 12,000+ reviews — verified customer " "testimonials, PCI-DSS compliant checkout, and a 30-day " "money-back guarantee. " + blurb ) elif targets_emphasized == "advertiser_history": targeting = ( targeting + "; advertiser active since 2018 with >3y domain age" ) cat = _choose_allowed(cat, observation) if actions_left <= 1: rationale += " (final slot of this turn)" return FraudsterAction( action_type="propose_ad", ad_copy=_truncate(ad_copy, 2000), landing_page_blurb=_truncate(blurb, 2000), category=cat, targeting_summary=_truncate(targeting, 512), rationale=_truncate(rationale, 2000), ) def _my_rejected_count(self, observation: Dict[str, Any]) -> int: return sum( 1 for v in observation.get("prior_verdicts", []) if v.get("was_my_proposal") and v.get("verdict") == "reject" ) def _my_approved_count(self, observation: Dict[str, Any]) -> int: return sum( 1 for v in observation.get("prior_verdicts", []) if v.get("was_my_proposal") and v.get("verdict") == "approve" ) def _dominant_investigation_target( self, observation: Dict[str, Any] ) -> Optional[str]: targets_used: Dict[str, List[str]] = observation.get( "investigation_targets_used", {} ) counter: Dict[str, int] = {} for targets in targets_used.values(): for t in targets: counter[t] = counter.get(t, 0) + 1 if not counter: return None return max(counter, key=lambda k: counter[k]) def _maybe_modify_pending( self, observation: Dict[str, Any], targets_emphasized: Optional[str] ) -> Optional[FraudsterAction]: if targets_emphasized is None: return None current_queue = observation.get("current_queue", []) for item in current_queue: if not item.get("is_my_proposal"): continue if item.get("status") not in ("pending", "investigating"): continue slot_index = item.get("slot_index") if slot_index is None: continue if self._rng.random() < 0.25: return FraudsterAction( action_type="modify_pending_ad", slot_index=int(slot_index), new_landing_page_blurb=( "Verified business with transparent pricing, real-time " "order tracking, and 30-day returns. " ), rationale=( "ReactiveFraudster: Investigator emphasizing " f"{targets_emphasized}; strengthening pending slot " f"{slot_index}." ), ) return None # --------------------------------------------------------------------------- # GibberishFraudster # --------------------------------------------------------------------------- class GibberishFraudster(PolicyBase): """ Negative control: proposes nonsense ad copy to exercise the plausibility grader. The Auditor's Track-B score should collapse on this policy. """ def __init__(self, *, seed: int = 0) -> None: self._rng = random.Random(seed) self._seed = seed def reset(self) -> None: self._rng = random.Random(self._seed) def act(self, observation: Dict[str, Any]) -> FraudsterAction: remaining = int(observation.get("proposals_remaining", 0)) if remaining <= 0: return FraudsterAction( action_type="commit_final", rationale="GibberishFraudster: budget gone.", ) cat = _choose_allowed( self._rng.choice(_FRAUD_CATEGORIES + _LEGIT_CAMOUFLAGE_CATEGORIES), observation, ) return FraudsterAction( action_type="propose_ad", ad_copy=self._random_gibberish(self._rng.randint(40, 120)), landing_page_blurb=self._random_gibberish(self._rng.randint(20, 80)), category=cat, targeting_summary="adults", rationale="GibberishFraudster: random bytes.", ) def _random_gibberish(self, length: int) -> str: alphabet = string.ascii_lowercase + " " # include whitespace return "".join(self._rng.choice(alphabet) for _ in range(length))