CounterFeint / data /network_generator.py
QuantumTransformer's picture
Upload folder using huggingface_hub
26bf1c9 verified
"""
Fraud network (ring) generation for Task 3 using networkx.
Generates complex fraud ring topologies, each named after a published Meta
Adversarial Threat Report (Coordinated Inauthentic Behaviour / CIB) case study:
- Clique - Ghana DigitSol-style: small troll-farm where every account
amplifies every other (Meta Q3 2020 Adversarial Threat Report).
- Chain - Benin Digited-style: relay pattern where A promotes B, B promotes
C, but A never directly touches C (Meta Q1 2021 Adversarial
Threat Report).
- Hub-spoke - China-Russia-style: one master account funds and controls many
satellite accounts (Meta Q3 2022 Adversarial Threat Report).
Individual ads in a ring may look borderline; the signal is in the connections.
Each edge in the graph carries the signal type that connects the two ads.
"""
from __future__ import annotations
import random
from dataclasses import dataclass, field
from typing import Dict, List, Set, Tuple
import networkx as nx
@dataclass
class FraudRing:
ring_id: str
member_ad_ids: List[str]
shared_signals: Dict[str, str] # signal_type -> shared_value
topology: str = "clique" # clique, chain, hub_spoke
case_name: str = "" # e.g. "Ghana DigitSol-style"
provenance: str = "" # e.g. "Meta Q3 2020 Adversarial Threat Report"
@property
def size(self) -> int:
return len(self.member_ad_ids)
RING_CASE_STUDIES: List[Dict[str, str]] = [
{
"topology": "clique",
"case_name": "Ghana DigitSol-style",
"provenance": "Meta Q3 2020 Adversarial Threat Report",
"summary": (
"Troll-farm ring where every account amplifies every other; "
"all members share payment / creative / targeting fingerprints."
),
},
{
"topology": "chain",
"case_name": "Benin Digited-style",
"provenance": "Meta Q1 2021 Adversarial Threat Report",
"summary": (
"Relay ring where A promotes B, B promotes C, but A never directly "
"touches C. Transitive reasoning is required to surface the full "
"network."
),
},
{
"topology": "hub_spoke",
"case_name": "China-Russia-style hub",
"provenance": "Meta Q3 2022 Adversarial Threat Report",
"summary": (
"Hub-and-spoke ring: one master advertiser funds and controls many "
"satellite accounts that share the master's payment and registrar."
),
},
]
_RING_TOPOLOGIES = [cs["topology"] for cs in RING_CASE_STUDIES]
_SIGNAL_POOL_KEYS = ["payment_method", "domain_registrar", "creative_template", "targeting_overlap"]
_REGISTRAR_CHOICES = ["Njalla (privacy)", "Epik", "NameSilo", "Tucows (privacy proxy)"]
_TARGETING_CHOICES = [
"Men 25-45, crypto+investing, US+UK+AU",
"Adults 18-35, tech+gaming, worldwide",
"Women 25-55, health+beauty, US+CA",
"Adults 30-60, finance+real-estate, US+UK",
"Adults 20-40, e-commerce+dropshipping, US+EU",
]
def _make_signal_pool(rng: random.Random, ring_index: int) -> Dict[str, str]:
"""Generate a pool of shared signal values for one ring."""
return {
"payment_method": f"pmt_ring_{rng.randint(10000, 99999)}",
"domain_registrar": rng.choice(_REGISTRAR_CHOICES),
"creative_template": f"tmpl_{rng.randint(1000, 9999)}",
"targeting_overlap": rng.choice(_TARGETING_CHOICES),
}
def generate_fraud_networks(
rng: random.Random,
n_rings: int,
available_fraud_ad_ids: List[str],
) -> Tuple[List[FraudRing], Dict[str, List[str]]]:
"""
Generate fraud ring structures with complex topologies.
Returns:
rings: list of FraudRing objects
ad_to_rings: mapping from ad_id to list of ring_ids it belongs to
"""
G = nx.Graph()
rings: List[FraudRing] = []
ad_to_rings: Dict[str, List[str]] = {}
remaining = list(available_fraud_ad_ids)
rng.shuffle(remaining)
for i in range(n_rings):
if len(remaining) < 3:
break
# Reserve 3 ads per still-to-come ring so we always fit n_rings rings,
# which is what makes the "all three CIB topologies every episode"
# storytelling claim true at task_3.
remaining_rings = n_rings - i - 1
reserved = 3 * remaining_rings
budget = max(3, len(remaining) - reserved)
ring_size = rng.randint(3, min(5, budget, len(remaining)))
members = remaining[:ring_size]
remaining = remaining[ring_size:]
# Rotate through the Meta CIB case studies deterministically so that
# every task_3 episode showcases at least one clique, one chain, and
# one hub-spoke pattern when n_rings >= 3.
case_study = RING_CASE_STUDIES[i % len(RING_CASE_STUDIES)]
topology = case_study["topology"]
signal_pool = _make_signal_pool(rng, i)
signal_keys = list(_SIGNAL_POOL_KEYS)
rng.shuffle(signal_keys)
n_shared = rng.randint(2, len(signal_keys))
shared_signals = {k: signal_pool[k] for k in signal_keys[:n_shared]}
_add_edges_for_topology(G, members, shared_signals, topology, rng)
ring_id = f"ring_{i}"
ring = FraudRing(
ring_id=ring_id,
member_ad_ids=members,
shared_signals=shared_signals,
topology=topology,
case_name=case_study["case_name"],
provenance=case_study["provenance"],
)
rings.append(ring)
for ad_id in members:
ad_to_rings.setdefault(ad_id, []).append(ring_id)
G.add_node(ad_id, ring_id=ring_id)
# Optionally create bridge nodes between rings for extra complexity
if len(rings) >= 2 and remaining:
_add_bridge_ads(G, rings, remaining, ad_to_rings, rng)
return rings, ad_to_rings
def _add_edges_for_topology(
G: nx.Graph,
members: List[str],
shared_signals: Dict[str, str],
topology: str,
rng: random.Random,
) -> None:
"""Add edges to the graph based on the ring topology."""
signal_types = list(shared_signals.keys())
if topology == "clique":
for i, a in enumerate(members):
for b in members[i + 1:]:
signal = rng.choice(signal_types)
G.add_edge(a, b, signal_type=signal, signal_value=shared_signals[signal])
elif topology == "chain":
for idx in range(len(members) - 1):
signal = signal_types[idx % len(signal_types)]
G.add_edge(
members[idx], members[idx + 1],
signal_type=signal, signal_value=shared_signals[signal],
)
elif topology == "hub_spoke":
hub = members[0]
for spoke in members[1:]:
signal = rng.choice(signal_types)
G.add_edge(hub, spoke, signal_type=signal, signal_value=shared_signals[signal])
def _add_bridge_ads(
G: nx.Graph,
rings: List[FraudRing],
remaining: List[str],
ad_to_rings: Dict[str, List[str]],
rng: random.Random,
) -> None:
"""Optionally link two rings via a shared bridge ad from the remaining pool."""
if len(remaining) < 1 or len(rings) < 2:
return
bridge_ad = remaining.pop(0)
r1, r2 = rings[0], rings[1]
bridge_to_r1 = rng.choice(r1.member_ad_ids)
bridge_to_r2 = rng.choice(r2.member_ad_ids)
r1.member_ad_ids.append(bridge_ad)
ad_to_rings.setdefault(bridge_ad, []).extend([r1.ring_id, r2.ring_id])
sig_key = rng.choice(list(r1.shared_signals.keys()))
G.add_edge(bridge_ad, bridge_to_r1, signal_type=sig_key, signal_value=r1.shared_signals[sig_key])
sig_key2 = rng.choice(list(r2.shared_signals.keys()))
G.add_edge(bridge_ad, bridge_to_r2, signal_type=sig_key2, signal_value=r2.shared_signals[sig_key2])
def get_ring_shared_signal_text(ring: FraudRing) -> str:
"""Describe the shared signals in a ring (for grader/debug use)."""
header_tail = f"topology={ring.topology}"
if ring.case_name:
header_tail = f"{ring.case_name} {ring.topology}"
lines = [
f"Fraud Ring {ring.ring_id} ({ring.size} members, {header_tail}):"
]
if ring.provenance:
lines.append(f" Modelled after: {ring.provenance}")
lines.append(f" Members: {', '.join(ring.member_ad_ids)}")
lines.append(" Shared signals:")
for signal_type, value in ring.shared_signals.items():
lines.append(f" - {signal_type}: {value}")
return "\n".join(lines)
def build_ground_truth_graph(rings: List[FraudRing]) -> nx.Graph:
"""Reconstruct the full ground truth network graph from rings.
Used by graders to compute the expected set of edges.
"""
G = nx.Graph()
for ring in rings:
for i, a in enumerate(ring.member_ad_ids):
G.add_node(a, ring_id=ring.ring_id)
for b in ring.member_ad_ids[i + 1:]:
G.add_edge(a, b, ring_id=ring.ring_id)
return G