Spaces:
Sleeping
Sleeping
File size: 12,733 Bytes
26bf1c9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 | """
Llama Guard 3 / Purple Llama mock classifier data.
Mocks Meta's open safety classifier (https://github.com/meta-llama/PurpleLlama)
for ad content. Used by the Investigator's `policy_classifier` investigation
target and exposed as a REST endpoint at `POST /api/v1/tools/policy_classifier`.
Design goals:
- **Deterministic per ad_id**: same input β same output, for judge reproducibility.
- **Ground-truth correlated**: fraud ads are labelled "unsafe" ~75% of the time;
legit ads are false-flagged ~5% of the time. The classifier is noisy on
purpose β an Investigator that over-trusts it will still make mistakes.
- **Surface-marker grounded**: uses regex heuristics (urgency, fake authority,
get-rich-quick patterns) modelled after real Meta T&S enforcement signals so
the explanation text is specific enough for the Investigator's rationale to
cite.
The classifier is intentionally a mock: it's not loading the real Llama Guard 3
weights (which would blow the container size). The shape of the output exactly
matches what a real Llama Guard 3-8B call would return so switching to the real
model is a one-function swap.
"""
from __future__ import annotations
import hashlib
import random
import re
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
# --------------------------------------------------------------------------
# Llama Guard 3 hazard taxonomy (public categories).
# Source: https://github.com/meta-llama/PurpleLlama/blob/main/Llama-Guard3/8B/MODEL_CARD.md
# --------------------------------------------------------------------------
LLAMA_GUARD_CATEGORIES: Dict[str, str] = {
"S1": "violent_crimes",
"S2": "non_violent_crimes",
"S3": "sex_related_crimes",
"S4": "child_sexual_exploitation",
"S5": "defamation",
"S6": "specialized_advice",
"S7": "privacy",
"S8": "intellectual_property",
"S9": "indiscriminate_weapons",
"S10": "hate",
"S11": "suicide_self_harm",
"S12": "sexual_content",
"S13": "elections",
}
# --------------------------------------------------------------------------
# CounterFeint-specific "TS-Fraud" head β surface markers a production T&S
# classifier would stack on top of Llama Guard for ad-fraud specifically.
# --------------------------------------------------------------------------
TS_FRAUD_MARKERS: Dict[str, str] = {
"high_pressure_urgency": "Act-now / limited-time / expires-in manipulation",
"fake_authority_claim": "Doctor-recommended / FDA-approved / certified (unverified)",
"unrealistic_guarantee": "100% guaranteed / risk-free / no-questions-asked",
"exclusivity_manipulation": "Secret / hidden / exclusive-access framing",
"get_rich_quick": "Earn $X daily / make $X from home",
"phishing_credential_request": "Login / verify / click-here patterns",
"counterfeit_indicator": "Replica / dupe / authentic-looking",
}
_URGENCY_RE = re.compile(
r"\b(act\s+now|limited\s+time|expires?\s+(?:in|soon|today)|"
r"only\s+\d+\s+(?:left|spots|hours?|days?)|hurry|last\s+chance|"
r"while\s+supplies?\s+last|ends?\s+(?:tonight|soon))\b",
re.IGNORECASE,
)
_AUTHORITY_RE = re.compile(
r"\b(fda[\- ]approved|doctor[\- ]recommended|clinically\s+proven|"
r"certified(?:\s+by|\s+professional)?|endorsed\s+by|"
r"board[\- ]certified|medical[\- ]grade)\b",
re.IGNORECASE,
)
_GUARANTEE_RE = re.compile(
r"\b(100%?\s*(?:guaranteed|money[\- ]?back)|risk[\- ]free|"
r"no\s+questions?\s+asked|satisfaction\s+guaranteed|guaranteed\s+results?)\b",
re.IGNORECASE,
)
_EXCLUSIVITY_RE = re.compile(
r"\b(secret|hidden|exclusive\s+access|members?\s+only|"
r"invitation\s+only|insider|behind[\- ]the[\- ]scenes)\b",
re.IGNORECASE,
)
_GET_RICH_RE = re.compile(
r"(?:earn|make)\s*\$?\d+[kKmM]?\s*(?:daily|per\s+day|a\s+day|"
r"weekly|per\s+week|from\s+home|doing\s+nothing)",
re.IGNORECASE,
)
_PHISHING_RE = re.compile(
r"\b(verify\s+(?:your|account)|login\s+(?:required|to)|"
r"click\s+here\s+to\s+(?:claim|verify|unlock)|"
r"confirm\s+(?:your|account|identity)|password\s+reset)\b",
re.IGNORECASE,
)
_COUNTERFEIT_RE = re.compile(
r"\b(replica|dupe|authentic[\- ]looking|copy\s+of|knockoff|"
r"inspired\s+by\s+(?:the\s+)?original|designer[\- ]inspired)\b",
re.IGNORECASE,
)
MARKER_PATTERNS: List[Tuple[str, "re.Pattern[str]"]] = [
("high_pressure_urgency", _URGENCY_RE),
("fake_authority_claim", _AUTHORITY_RE),
("unrealistic_guarantee", _GUARANTEE_RE),
("exclusivity_manipulation", _EXCLUSIVITY_RE),
("get_rich_quick", _GET_RICH_RE),
("phishing_credential_request", _PHISHING_RE),
("counterfeit_indicator", _COUNTERFEIT_RE),
]
# --------------------------------------------------------------------------
# Fraud-type β Llama Guard category mapping.
# Drives *which* LG category fires when a given fraud type is flagged unsafe.
# Keys match the values produced by `counterfeint/data/fraud_patterns.py`.
# --------------------------------------------------------------------------
FRAUD_TYPE_TO_LG_CATEGORIES: Dict[str, List[str]] = {
"fake_giveaway": ["S2"],
"counterfeit": ["S2", "S8"],
"miracle_cure": ["S2", "S6"],
"advance_fee_scam": ["S2", "S7"],
"fake_crypto": ["S2"],
"fake_endorsement": ["S2", "S5"],
"brand_impersonation": ["S5", "S8"],
"gray_area": ["S2"],
"coordinated_network": ["S2"],
}
_DEFAULT_FRAUD_LG: List[str] = ["S2"]
# Ground-truth probabilities: how reliably the classifier agrees with the
# true label. Noisy on purpose β the Investigator has to corroborate with
# other tools, not blindly trust policy_classifier.
P_UNSAFE_GIVEN_FRAUD: float = 0.75
P_UNSAFE_GIVEN_LEGIT: float = 0.05
@dataclass
class PolicyClassifierResult:
"""Structured mock Llama Guard 3 output."""
ad_id: str
verdict: str # "unsafe" | "safe"
confidence: float # 0.0..1.0
triggered_lg_categories: List[str] = field(default_factory=list)
triggered_fraud_markers: List[str] = field(default_factory=list)
explanation: str = ""
def to_dict(self) -> Dict[str, object]:
return {
"ad_id": self.ad_id,
"verdict": self.verdict,
"confidence": round(self.confidence, 3),
"triggered_lg_categories": [
{"code": code, "name": LLAMA_GUARD_CATEGORIES[code]}
for code in self.triggered_lg_categories
],
"triggered_fraud_markers": [
{"code": m, "description": TS_FRAUD_MARKERS[m]}
for m in self.triggered_fraud_markers
],
"explanation": self.explanation,
}
def to_investigation_text(self) -> str:
"""Render as a multi-line block suitable for the Investigator's findings."""
lines = [f"Llama Guard 3 Classification for {self.ad_id}:"]
lines.append(f" Verdict: {self.verdict} (confidence {self.confidence:.2f})")
if self.triggered_lg_categories:
cats_str = ", ".join(
f"{k} ({LLAMA_GUARD_CATEGORIES[k]})"
for k in self.triggered_lg_categories
)
lines.append(f" Triggered LG categories: {cats_str}")
else:
lines.append(" Triggered LG categories: none")
if self.triggered_fraud_markers:
markers_str = ", ".join(
f"TS-Fraud ({m})" for m in self.triggered_fraud_markers
)
lines.append(f" Triggered custom markers: {markers_str}")
else:
lines.append(" Triggered custom markers: none")
lines.append(f" Policy explanation: {self.explanation}")
return "\n".join(lines)
def _seeded_rng(ad_id: str, salt: str = "lg3") -> random.Random:
"""Deterministic per-ad RNG seed (so same ad_id β same verdict every time)."""
h = hashlib.sha256(f"{salt}:{ad_id}".encode("utf-8")).digest()
seed = int.from_bytes(h[:8], "big")
return random.Random(seed)
def detect_fraud_markers(text: str) -> List[str]:
"""Scan ad_copy + landing_page text for TS-Fraud markers (regex-based)."""
matches: List[str] = []
for name, pattern in MARKER_PATTERNS:
if pattern.search(text):
matches.append(name)
return matches
def classify_ad(
ad_id: str,
ad_copy: str,
landing_page_text: str = "",
ground_truth_label: Optional[str] = None,
fraud_type: Optional[str] = None,
) -> PolicyClassifierResult:
"""Deterministic mock Llama Guard 3 classification.
Parameters
----------
ad_id :
Identifier β used as RNG seed so same ad always yields the same output.
ad_copy :
Ad body text (scanned for surface markers).
landing_page_text :
Optional landing page blurb, also scanned for surface markers.
ground_truth_label :
"fraud" | "legit" | "escalate" | None. If provided (internal
episode generation), biases the verdict toward the label with the
P_UNSAFE_GIVEN_* noise rates. If None (e.g. external REST endpoint
call without a label), falls back to surface-marker heuristic.
fraud_type :
Optional. Drives *which* LG category is triggered when the verdict
is "unsafe" for a ground-truth fraud ad.
"""
rng = _seeded_rng(ad_id)
combined = f"{ad_copy}\n{landing_page_text}"
surface_markers = detect_fraud_markers(combined)
if ground_truth_label == "fraud":
unsafe = rng.random() < P_UNSAFE_GIVEN_FRAUD
elif ground_truth_label == "legit":
unsafe = rng.random() < P_UNSAFE_GIVEN_LEGIT
elif ground_truth_label == "escalate":
# Escalate is genuinely ambiguous β 50/50 with slight skew to unsafe
# if surface markers exist.
base = 0.35 + 0.15 * min(len(surface_markers), 3)
unsafe = rng.random() < base
else:
# No ground-truth hint (public API / external curl). Relies purely on
# surface markers: 2+ β unsafe; 1 β coin flip biased unsafe; 0 β safe.
if len(surface_markers) >= 2:
unsafe = True
elif len(surface_markers) == 1:
unsafe = rng.random() < 0.35
else:
unsafe = False
if unsafe:
if ground_truth_label == "fraud" and fraud_type:
lg_cats = list(
FRAUD_TYPE_TO_LG_CATEGORIES.get(fraud_type, _DEFAULT_FRAUD_LG)
)
else:
# Infer from surface markers.
lg_cats = ["S2"]
if "phishing_credential_request" in surface_markers:
lg_cats.append("S7")
if "counterfeit_indicator" in surface_markers:
lg_cats.append("S8")
# Dedup while preserving order.
seen = set()
lg_cats = [c for c in lg_cats if not (c in seen or seen.add(c))]
triggered_markers = list(surface_markers)
if not triggered_markers and ground_truth_label == "fraud":
# Latent pattern match β classifier fires on embeddings even with
# no surface regex hits. Pick a random marker for readability.
triggered_markers = [rng.choice(list(TS_FRAUD_MARKERS.keys()))]
confidence = rng.uniform(0.68, 0.92)
primary_label = LLAMA_GUARD_CATEGORIES[lg_cats[0]]
markers_str = (
", ".join(triggered_markers) if triggered_markers else "latent embedding match"
)
explanation = (
f"Model flags {primary_label} pattern at confidence {confidence:.2f}. "
f"Signals: {markers_str}."
)
return PolicyClassifierResult(
ad_id=ad_id,
verdict="unsafe",
confidence=confidence,
triggered_lg_categories=lg_cats,
triggered_fraud_markers=triggered_markers,
explanation=explanation,
)
else:
confidence = rng.uniform(0.55, 0.90)
markers_str = (
", ".join(surface_markers) if surface_markers else "none"
)
explanation = (
f"No high-confidence policy violations. Surface signals: {markers_str}."
)
return PolicyClassifierResult(
ad_id=ad_id,
verdict="safe",
confidence=confidence,
triggered_lg_categories=[],
triggered_fraud_markers=list(surface_markers),
explanation=explanation,
)
|