File size: 12,733 Bytes
26bf1c9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
"""

Llama Guard 3 / Purple Llama mock classifier data.



Mocks Meta's open safety classifier (https://github.com/meta-llama/PurpleLlama)

for ad content.  Used by the Investigator's `policy_classifier` investigation

target and exposed as a REST endpoint at `POST /api/v1/tools/policy_classifier`.



Design goals:



- **Deterministic per ad_id**: same input β†’ same output, for judge reproducibility.

- **Ground-truth correlated**: fraud ads are labelled "unsafe" ~75% of the time;

  legit ads are false-flagged ~5% of the time.  The classifier is noisy on

  purpose β€” an Investigator that over-trusts it will still make mistakes.

- **Surface-marker grounded**: uses regex heuristics (urgency, fake authority,

  get-rich-quick patterns) modelled after real Meta T&S enforcement signals so

  the explanation text is specific enough for the Investigator's rationale to

  cite.



The classifier is intentionally a mock: it's not loading the real Llama Guard 3

weights (which would blow the container size).  The shape of the output exactly

matches what a real Llama Guard 3-8B call would return so switching to the real

model is a one-function swap.

"""

from __future__ import annotations

import hashlib
import random
import re
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple


# --------------------------------------------------------------------------
# Llama Guard 3 hazard taxonomy (public categories).
# Source: https://github.com/meta-llama/PurpleLlama/blob/main/Llama-Guard3/8B/MODEL_CARD.md
# --------------------------------------------------------------------------
LLAMA_GUARD_CATEGORIES: Dict[str, str] = {
    "S1": "violent_crimes",
    "S2": "non_violent_crimes",
    "S3": "sex_related_crimes",
    "S4": "child_sexual_exploitation",
    "S5": "defamation",
    "S6": "specialized_advice",
    "S7": "privacy",
    "S8": "intellectual_property",
    "S9": "indiscriminate_weapons",
    "S10": "hate",
    "S11": "suicide_self_harm",
    "S12": "sexual_content",
    "S13": "elections",
}


# --------------------------------------------------------------------------
# CounterFeint-specific "TS-Fraud" head β€” surface markers a production T&S
# classifier would stack on top of Llama Guard for ad-fraud specifically.
# --------------------------------------------------------------------------
TS_FRAUD_MARKERS: Dict[str, str] = {
    "high_pressure_urgency": "Act-now / limited-time / expires-in manipulation",
    "fake_authority_claim": "Doctor-recommended / FDA-approved / certified (unverified)",
    "unrealistic_guarantee": "100% guaranteed / risk-free / no-questions-asked",
    "exclusivity_manipulation": "Secret / hidden / exclusive-access framing",
    "get_rich_quick": "Earn $X daily / make $X from home",
    "phishing_credential_request": "Login / verify / click-here patterns",
    "counterfeit_indicator": "Replica / dupe / authentic-looking",
}


_URGENCY_RE = re.compile(
    r"\b(act\s+now|limited\s+time|expires?\s+(?:in|soon|today)|"
    r"only\s+\d+\s+(?:left|spots|hours?|days?)|hurry|last\s+chance|"
    r"while\s+supplies?\s+last|ends?\s+(?:tonight|soon))\b",
    re.IGNORECASE,
)
_AUTHORITY_RE = re.compile(
    r"\b(fda[\- ]approved|doctor[\- ]recommended|clinically\s+proven|"
    r"certified(?:\s+by|\s+professional)?|endorsed\s+by|"
    r"board[\- ]certified|medical[\- ]grade)\b",
    re.IGNORECASE,
)
_GUARANTEE_RE = re.compile(
    r"\b(100%?\s*(?:guaranteed|money[\- ]?back)|risk[\- ]free|"
    r"no\s+questions?\s+asked|satisfaction\s+guaranteed|guaranteed\s+results?)\b",
    re.IGNORECASE,
)
_EXCLUSIVITY_RE = re.compile(
    r"\b(secret|hidden|exclusive\s+access|members?\s+only|"
    r"invitation\s+only|insider|behind[\- ]the[\- ]scenes)\b",
    re.IGNORECASE,
)
_GET_RICH_RE = re.compile(
    r"(?:earn|make)\s*\$?\d+[kKmM]?\s*(?:daily|per\s+day|a\s+day|"
    r"weekly|per\s+week|from\s+home|doing\s+nothing)",
    re.IGNORECASE,
)
_PHISHING_RE = re.compile(
    r"\b(verify\s+(?:your|account)|login\s+(?:required|to)|"
    r"click\s+here\s+to\s+(?:claim|verify|unlock)|"
    r"confirm\s+(?:your|account|identity)|password\s+reset)\b",
    re.IGNORECASE,
)
_COUNTERFEIT_RE = re.compile(
    r"\b(replica|dupe|authentic[\- ]looking|copy\s+of|knockoff|"
    r"inspired\s+by\s+(?:the\s+)?original|designer[\- ]inspired)\b",
    re.IGNORECASE,
)


MARKER_PATTERNS: List[Tuple[str, "re.Pattern[str]"]] = [
    ("high_pressure_urgency", _URGENCY_RE),
    ("fake_authority_claim", _AUTHORITY_RE),
    ("unrealistic_guarantee", _GUARANTEE_RE),
    ("exclusivity_manipulation", _EXCLUSIVITY_RE),
    ("get_rich_quick", _GET_RICH_RE),
    ("phishing_credential_request", _PHISHING_RE),
    ("counterfeit_indicator", _COUNTERFEIT_RE),
]


# --------------------------------------------------------------------------
# Fraud-type β†’ Llama Guard category mapping.
# Drives *which* LG category fires when a given fraud type is flagged unsafe.
# Keys match the values produced by `counterfeint/data/fraud_patterns.py`.
# --------------------------------------------------------------------------
FRAUD_TYPE_TO_LG_CATEGORIES: Dict[str, List[str]] = {
    "fake_giveaway": ["S2"],
    "counterfeit": ["S2", "S8"],
    "miracle_cure": ["S2", "S6"],
    "advance_fee_scam": ["S2", "S7"],
    "fake_crypto": ["S2"],
    "fake_endorsement": ["S2", "S5"],
    "brand_impersonation": ["S5", "S8"],
    "gray_area": ["S2"],
    "coordinated_network": ["S2"],
}
_DEFAULT_FRAUD_LG: List[str] = ["S2"]


# Ground-truth probabilities: how reliably the classifier agrees with the
# true label.  Noisy on purpose β€” the Investigator has to corroborate with
# other tools, not blindly trust policy_classifier.
P_UNSAFE_GIVEN_FRAUD: float = 0.75
P_UNSAFE_GIVEN_LEGIT: float = 0.05


@dataclass
class PolicyClassifierResult:
    """Structured mock Llama Guard 3 output."""

    ad_id: str
    verdict: str  # "unsafe" | "safe"
    confidence: float  # 0.0..1.0
    triggered_lg_categories: List[str] = field(default_factory=list)
    triggered_fraud_markers: List[str] = field(default_factory=list)
    explanation: str = ""

    def to_dict(self) -> Dict[str, object]:
        return {
            "ad_id": self.ad_id,
            "verdict": self.verdict,
            "confidence": round(self.confidence, 3),
            "triggered_lg_categories": [
                {"code": code, "name": LLAMA_GUARD_CATEGORIES[code]}
                for code in self.triggered_lg_categories
            ],
            "triggered_fraud_markers": [
                {"code": m, "description": TS_FRAUD_MARKERS[m]}
                for m in self.triggered_fraud_markers
            ],
            "explanation": self.explanation,
        }

    def to_investigation_text(self) -> str:
        """Render as a multi-line block suitable for the Investigator's findings."""
        lines = [f"Llama Guard 3 Classification for {self.ad_id}:"]
        lines.append(f"  Verdict: {self.verdict} (confidence {self.confidence:.2f})")
        if self.triggered_lg_categories:
            cats_str = ", ".join(
                f"{k} ({LLAMA_GUARD_CATEGORIES[k]})"
                for k in self.triggered_lg_categories
            )
            lines.append(f"  Triggered LG categories: {cats_str}")
        else:
            lines.append("  Triggered LG categories: none")
        if self.triggered_fraud_markers:
            markers_str = ", ".join(
                f"TS-Fraud ({m})" for m in self.triggered_fraud_markers
            )
            lines.append(f"  Triggered custom markers: {markers_str}")
        else:
            lines.append("  Triggered custom markers: none")
        lines.append(f"  Policy explanation: {self.explanation}")
        return "\n".join(lines)


def _seeded_rng(ad_id: str, salt: str = "lg3") -> random.Random:
    """Deterministic per-ad RNG seed (so same ad_id β†’ same verdict every time)."""
    h = hashlib.sha256(f"{salt}:{ad_id}".encode("utf-8")).digest()
    seed = int.from_bytes(h[:8], "big")
    return random.Random(seed)


def detect_fraud_markers(text: str) -> List[str]:
    """Scan ad_copy + landing_page text for TS-Fraud markers (regex-based)."""
    matches: List[str] = []
    for name, pattern in MARKER_PATTERNS:
        if pattern.search(text):
            matches.append(name)
    return matches


def classify_ad(

    ad_id: str,

    ad_copy: str,

    landing_page_text: str = "",

    ground_truth_label: Optional[str] = None,

    fraud_type: Optional[str] = None,

) -> PolicyClassifierResult:
    """Deterministic mock Llama Guard 3 classification.



    Parameters

    ----------

    ad_id :

        Identifier β€” used as RNG seed so same ad always yields the same output.

    ad_copy :

        Ad body text (scanned for surface markers).

    landing_page_text :

        Optional landing page blurb, also scanned for surface markers.

    ground_truth_label :

        "fraud" | "legit" | "escalate" | None.  If provided (internal

        episode generation), biases the verdict toward the label with the

        P_UNSAFE_GIVEN_* noise rates.  If None (e.g. external REST endpoint

        call without a label), falls back to surface-marker heuristic.

    fraud_type :

        Optional.  Drives *which* LG category is triggered when the verdict

        is "unsafe" for a ground-truth fraud ad.

    """
    rng = _seeded_rng(ad_id)
    combined = f"{ad_copy}\n{landing_page_text}"
    surface_markers = detect_fraud_markers(combined)

    if ground_truth_label == "fraud":
        unsafe = rng.random() < P_UNSAFE_GIVEN_FRAUD
    elif ground_truth_label == "legit":
        unsafe = rng.random() < P_UNSAFE_GIVEN_LEGIT
    elif ground_truth_label == "escalate":
        # Escalate is genuinely ambiguous β†’ 50/50 with slight skew to unsafe
        # if surface markers exist.
        base = 0.35 + 0.15 * min(len(surface_markers), 3)
        unsafe = rng.random() < base
    else:
        # No ground-truth hint (public API / external curl).  Relies purely on
        # surface markers: 2+ β†’ unsafe; 1 β†’ coin flip biased unsafe; 0 β†’ safe.
        if len(surface_markers) >= 2:
            unsafe = True
        elif len(surface_markers) == 1:
            unsafe = rng.random() < 0.35
        else:
            unsafe = False

    if unsafe:
        if ground_truth_label == "fraud" and fraud_type:
            lg_cats = list(
                FRAUD_TYPE_TO_LG_CATEGORIES.get(fraud_type, _DEFAULT_FRAUD_LG)
            )
        else:
            # Infer from surface markers.
            lg_cats = ["S2"]
            if "phishing_credential_request" in surface_markers:
                lg_cats.append("S7")
            if "counterfeit_indicator" in surface_markers:
                lg_cats.append("S8")
            # Dedup while preserving order.
            seen = set()
            lg_cats = [c for c in lg_cats if not (c in seen or seen.add(c))]

        triggered_markers = list(surface_markers)
        if not triggered_markers and ground_truth_label == "fraud":
            # Latent pattern match β€” classifier fires on embeddings even with
            # no surface regex hits.  Pick a random marker for readability.
            triggered_markers = [rng.choice(list(TS_FRAUD_MARKERS.keys()))]

        confidence = rng.uniform(0.68, 0.92)
        primary_label = LLAMA_GUARD_CATEGORIES[lg_cats[0]]
        markers_str = (
            ", ".join(triggered_markers) if triggered_markers else "latent embedding match"
        )
        explanation = (
            f"Model flags {primary_label} pattern at confidence {confidence:.2f}. "
            f"Signals: {markers_str}."
        )
        return PolicyClassifierResult(
            ad_id=ad_id,
            verdict="unsafe",
            confidence=confidence,
            triggered_lg_categories=lg_cats,
            triggered_fraud_markers=triggered_markers,
            explanation=explanation,
        )
    else:
        confidence = rng.uniform(0.55, 0.90)
        markers_str = (
            ", ".join(surface_markers) if surface_markers else "none"
        )
        explanation = (
            f"No high-confidence policy violations. Surface signals: {markers_str}."
        )
        return PolicyClassifierResult(
            ad_id=ad_id,
            verdict="safe",
            confidence=confidence,
            triggered_lg_categories=[],
            triggered_fraud_markers=list(surface_markers),
            explanation=explanation,
        )