""" Llama Guard 3 / Purple Llama mock classifier data. Mocks Meta's open safety classifier (https://github.com/meta-llama/PurpleLlama) for ad content. Used by the Investigator's `policy_classifier` investigation target and exposed as a REST endpoint at `POST /api/v1/tools/policy_classifier`. Design goals: - **Deterministic per ad_id**: same input → same output, for judge reproducibility. - **Ground-truth correlated**: fraud ads are labelled "unsafe" ~75% of the time; legit ads are false-flagged ~5% of the time. The classifier is noisy on purpose — an Investigator that over-trusts it will still make mistakes. - **Surface-marker grounded**: uses regex heuristics (urgency, fake authority, get-rich-quick patterns) modelled after real Meta T&S enforcement signals so the explanation text is specific enough for the Investigator's rationale to cite. The classifier is intentionally a mock: it's not loading the real Llama Guard 3 weights (which would blow the container size). The shape of the output exactly matches what a real Llama Guard 3-8B call would return so switching to the real model is a one-function swap. """ from __future__ import annotations import hashlib import random import re from dataclasses import dataclass, field from typing import Dict, List, Optional, Tuple # -------------------------------------------------------------------------- # Llama Guard 3 hazard taxonomy (public categories). # Source: https://github.com/meta-llama/PurpleLlama/blob/main/Llama-Guard3/8B/MODEL_CARD.md # -------------------------------------------------------------------------- LLAMA_GUARD_CATEGORIES: Dict[str, str] = { "S1": "violent_crimes", "S2": "non_violent_crimes", "S3": "sex_related_crimes", "S4": "child_sexual_exploitation", "S5": "defamation", "S6": "specialized_advice", "S7": "privacy", "S8": "intellectual_property", "S9": "indiscriminate_weapons", "S10": "hate", "S11": "suicide_self_harm", "S12": "sexual_content", "S13": "elections", } # -------------------------------------------------------------------------- # CounterFeint-specific "TS-Fraud" head — surface markers a production T&S # classifier would stack on top of Llama Guard for ad-fraud specifically. # -------------------------------------------------------------------------- TS_FRAUD_MARKERS: Dict[str, str] = { "high_pressure_urgency": "Act-now / limited-time / expires-in manipulation", "fake_authority_claim": "Doctor-recommended / FDA-approved / certified (unverified)", "unrealistic_guarantee": "100% guaranteed / risk-free / no-questions-asked", "exclusivity_manipulation": "Secret / hidden / exclusive-access framing", "get_rich_quick": "Earn $X daily / make $X from home", "phishing_credential_request": "Login / verify / click-here patterns", "counterfeit_indicator": "Replica / dupe / authentic-looking", } _URGENCY_RE = re.compile( r"\b(act\s+now|limited\s+time|expires?\s+(?:in|soon|today)|" r"only\s+\d+\s+(?:left|spots|hours?|days?)|hurry|last\s+chance|" r"while\s+supplies?\s+last|ends?\s+(?:tonight|soon))\b", re.IGNORECASE, ) _AUTHORITY_RE = re.compile( r"\b(fda[\- ]approved|doctor[\- ]recommended|clinically\s+proven|" r"certified(?:\s+by|\s+professional)?|endorsed\s+by|" r"board[\- ]certified|medical[\- ]grade)\b", re.IGNORECASE, ) _GUARANTEE_RE = re.compile( r"\b(100%?\s*(?:guaranteed|money[\- ]?back)|risk[\- ]free|" r"no\s+questions?\s+asked|satisfaction\s+guaranteed|guaranteed\s+results?)\b", re.IGNORECASE, ) _EXCLUSIVITY_RE = re.compile( r"\b(secret|hidden|exclusive\s+access|members?\s+only|" r"invitation\s+only|insider|behind[\- ]the[\- ]scenes)\b", re.IGNORECASE, ) _GET_RICH_RE = re.compile( r"(?:earn|make)\s*\$?\d+[kKmM]?\s*(?:daily|per\s+day|a\s+day|" r"weekly|per\s+week|from\s+home|doing\s+nothing)", re.IGNORECASE, ) _PHISHING_RE = re.compile( r"\b(verify\s+(?:your|account)|login\s+(?:required|to)|" r"click\s+here\s+to\s+(?:claim|verify|unlock)|" r"confirm\s+(?:your|account|identity)|password\s+reset)\b", re.IGNORECASE, ) _COUNTERFEIT_RE = re.compile( r"\b(replica|dupe|authentic[\- ]looking|copy\s+of|knockoff|" r"inspired\s+by\s+(?:the\s+)?original|designer[\- ]inspired)\b", re.IGNORECASE, ) MARKER_PATTERNS: List[Tuple[str, "re.Pattern[str]"]] = [ ("high_pressure_urgency", _URGENCY_RE), ("fake_authority_claim", _AUTHORITY_RE), ("unrealistic_guarantee", _GUARANTEE_RE), ("exclusivity_manipulation", _EXCLUSIVITY_RE), ("get_rich_quick", _GET_RICH_RE), ("phishing_credential_request", _PHISHING_RE), ("counterfeit_indicator", _COUNTERFEIT_RE), ] # -------------------------------------------------------------------------- # Fraud-type → Llama Guard category mapping. # Drives *which* LG category fires when a given fraud type is flagged unsafe. # Keys match the values produced by `counterfeint/data/fraud_patterns.py`. # -------------------------------------------------------------------------- FRAUD_TYPE_TO_LG_CATEGORIES: Dict[str, List[str]] = { "fake_giveaway": ["S2"], "counterfeit": ["S2", "S8"], "miracle_cure": ["S2", "S6"], "advance_fee_scam": ["S2", "S7"], "fake_crypto": ["S2"], "fake_endorsement": ["S2", "S5"], "brand_impersonation": ["S5", "S8"], "gray_area": ["S2"], "coordinated_network": ["S2"], } _DEFAULT_FRAUD_LG: List[str] = ["S2"] # Ground-truth probabilities: how reliably the classifier agrees with the # true label. Noisy on purpose — the Investigator has to corroborate with # other tools, not blindly trust policy_classifier. P_UNSAFE_GIVEN_FRAUD: float = 0.75 P_UNSAFE_GIVEN_LEGIT: float = 0.05 @dataclass class PolicyClassifierResult: """Structured mock Llama Guard 3 output.""" ad_id: str verdict: str # "unsafe" | "safe" confidence: float # 0.0..1.0 triggered_lg_categories: List[str] = field(default_factory=list) triggered_fraud_markers: List[str] = field(default_factory=list) explanation: str = "" def to_dict(self) -> Dict[str, object]: return { "ad_id": self.ad_id, "verdict": self.verdict, "confidence": round(self.confidence, 3), "triggered_lg_categories": [ {"code": code, "name": LLAMA_GUARD_CATEGORIES[code]} for code in self.triggered_lg_categories ], "triggered_fraud_markers": [ {"code": m, "description": TS_FRAUD_MARKERS[m]} for m in self.triggered_fraud_markers ], "explanation": self.explanation, } def to_investigation_text(self) -> str: """Render as a multi-line block suitable for the Investigator's findings.""" lines = [f"Llama Guard 3 Classification for {self.ad_id}:"] lines.append(f" Verdict: {self.verdict} (confidence {self.confidence:.2f})") if self.triggered_lg_categories: cats_str = ", ".join( f"{k} ({LLAMA_GUARD_CATEGORIES[k]})" for k in self.triggered_lg_categories ) lines.append(f" Triggered LG categories: {cats_str}") else: lines.append(" Triggered LG categories: none") if self.triggered_fraud_markers: markers_str = ", ".join( f"TS-Fraud ({m})" for m in self.triggered_fraud_markers ) lines.append(f" Triggered custom markers: {markers_str}") else: lines.append(" Triggered custom markers: none") lines.append(f" Policy explanation: {self.explanation}") return "\n".join(lines) def _seeded_rng(ad_id: str, salt: str = "lg3") -> random.Random: """Deterministic per-ad RNG seed (so same ad_id → same verdict every time).""" h = hashlib.sha256(f"{salt}:{ad_id}".encode("utf-8")).digest() seed = int.from_bytes(h[:8], "big") return random.Random(seed) def detect_fraud_markers(text: str) -> List[str]: """Scan ad_copy + landing_page text for TS-Fraud markers (regex-based).""" matches: List[str] = [] for name, pattern in MARKER_PATTERNS: if pattern.search(text): matches.append(name) return matches def classify_ad( ad_id: str, ad_copy: str, landing_page_text: str = "", ground_truth_label: Optional[str] = None, fraud_type: Optional[str] = None, ) -> PolicyClassifierResult: """Deterministic mock Llama Guard 3 classification. Parameters ---------- ad_id : Identifier — used as RNG seed so same ad always yields the same output. ad_copy : Ad body text (scanned for surface markers). landing_page_text : Optional landing page blurb, also scanned for surface markers. ground_truth_label : "fraud" | "legit" | "escalate" | None. If provided (internal episode generation), biases the verdict toward the label with the P_UNSAFE_GIVEN_* noise rates. If None (e.g. external REST endpoint call without a label), falls back to surface-marker heuristic. fraud_type : Optional. Drives *which* LG category is triggered when the verdict is "unsafe" for a ground-truth fraud ad. """ rng = _seeded_rng(ad_id) combined = f"{ad_copy}\n{landing_page_text}" surface_markers = detect_fraud_markers(combined) if ground_truth_label == "fraud": unsafe = rng.random() < P_UNSAFE_GIVEN_FRAUD elif ground_truth_label == "legit": unsafe = rng.random() < P_UNSAFE_GIVEN_LEGIT elif ground_truth_label == "escalate": # Escalate is genuinely ambiguous → 50/50 with slight skew to unsafe # if surface markers exist. base = 0.35 + 0.15 * min(len(surface_markers), 3) unsafe = rng.random() < base else: # No ground-truth hint (public API / external curl). Relies purely on # surface markers: 2+ → unsafe; 1 → coin flip biased unsafe; 0 → safe. if len(surface_markers) >= 2: unsafe = True elif len(surface_markers) == 1: unsafe = rng.random() < 0.35 else: unsafe = False if unsafe: if ground_truth_label == "fraud" and fraud_type: lg_cats = list( FRAUD_TYPE_TO_LG_CATEGORIES.get(fraud_type, _DEFAULT_FRAUD_LG) ) else: # Infer from surface markers. lg_cats = ["S2"] if "phishing_credential_request" in surface_markers: lg_cats.append("S7") if "counterfeit_indicator" in surface_markers: lg_cats.append("S8") # Dedup while preserving order. seen = set() lg_cats = [c for c in lg_cats if not (c in seen or seen.add(c))] triggered_markers = list(surface_markers) if not triggered_markers and ground_truth_label == "fraud": # Latent pattern match — classifier fires on embeddings even with # no surface regex hits. Pick a random marker for readability. triggered_markers = [rng.choice(list(TS_FRAUD_MARKERS.keys()))] confidence = rng.uniform(0.68, 0.92) primary_label = LLAMA_GUARD_CATEGORIES[lg_cats[0]] markers_str = ( ", ".join(triggered_markers) if triggered_markers else "latent embedding match" ) explanation = ( f"Model flags {primary_label} pattern at confidence {confidence:.2f}. " f"Signals: {markers_str}." ) return PolicyClassifierResult( ad_id=ad_id, verdict="unsafe", confidence=confidence, triggered_lg_categories=lg_cats, triggered_fraud_markers=triggered_markers, explanation=explanation, ) else: confidence = rng.uniform(0.55, 0.90) markers_str = ( ", ".join(surface_markers) if surface_markers else "none" ) explanation = ( f"No high-confidence policy violations. Surface signals: {markers_str}." ) return PolicyClassifierResult( ad_id=ad_id, verdict="safe", confidence=confidence, triggered_lg_categories=[], triggered_fraud_markers=list(surface_markers), explanation=explanation, )