""" Fraud network (ring) generation for Task 3 using networkx. Generates complex fraud ring topologies, each named after a published Meta Adversarial Threat Report (Coordinated Inauthentic Behaviour / CIB) case study: - Clique - Ghana DigitSol-style: small troll-farm where every account amplifies every other (Meta Q3 2020 Adversarial Threat Report). - Chain - Benin Digited-style: relay pattern where A promotes B, B promotes C, but A never directly touches C (Meta Q1 2021 Adversarial Threat Report). - Hub-spoke - China-Russia-style: one master account funds and controls many satellite accounts (Meta Q3 2022 Adversarial Threat Report). Individual ads in a ring may look borderline; the signal is in the connections. Each edge in the graph carries the signal type that connects the two ads. """ from __future__ import annotations import random from dataclasses import dataclass, field from typing import Dict, List, Set, Tuple import networkx as nx @dataclass class FraudRing: ring_id: str member_ad_ids: List[str] shared_signals: Dict[str, str] # signal_type -> shared_value topology: str = "clique" # clique, chain, hub_spoke case_name: str = "" # e.g. "Ghana DigitSol-style" provenance: str = "" # e.g. "Meta Q3 2020 Adversarial Threat Report" @property def size(self) -> int: return len(self.member_ad_ids) RING_CASE_STUDIES: List[Dict[str, str]] = [ { "topology": "clique", "case_name": "Ghana DigitSol-style", "provenance": "Meta Q3 2020 Adversarial Threat Report", "summary": ( "Troll-farm ring where every account amplifies every other; " "all members share payment / creative / targeting fingerprints." ), }, { "topology": "chain", "case_name": "Benin Digited-style", "provenance": "Meta Q1 2021 Adversarial Threat Report", "summary": ( "Relay ring where A promotes B, B promotes C, but A never directly " "touches C. Transitive reasoning is required to surface the full " "network." ), }, { "topology": "hub_spoke", "case_name": "China-Russia-style hub", "provenance": "Meta Q3 2022 Adversarial Threat Report", "summary": ( "Hub-and-spoke ring: one master advertiser funds and controls many " "satellite accounts that share the master's payment and registrar." ), }, ] _RING_TOPOLOGIES = [cs["topology"] for cs in RING_CASE_STUDIES] _SIGNAL_POOL_KEYS = ["payment_method", "domain_registrar", "creative_template", "targeting_overlap"] _REGISTRAR_CHOICES = ["Njalla (privacy)", "Epik", "NameSilo", "Tucows (privacy proxy)"] _TARGETING_CHOICES = [ "Men 25-45, crypto+investing, US+UK+AU", "Adults 18-35, tech+gaming, worldwide", "Women 25-55, health+beauty, US+CA", "Adults 30-60, finance+real-estate, US+UK", "Adults 20-40, e-commerce+dropshipping, US+EU", ] def _make_signal_pool(rng: random.Random, ring_index: int) -> Dict[str, str]: """Generate a pool of shared signal values for one ring.""" return { "payment_method": f"pmt_ring_{rng.randint(10000, 99999)}", "domain_registrar": rng.choice(_REGISTRAR_CHOICES), "creative_template": f"tmpl_{rng.randint(1000, 9999)}", "targeting_overlap": rng.choice(_TARGETING_CHOICES), } def generate_fraud_networks( rng: random.Random, n_rings: int, available_fraud_ad_ids: List[str], ) -> Tuple[List[FraudRing], Dict[str, List[str]]]: """ Generate fraud ring structures with complex topologies. Returns: rings: list of FraudRing objects ad_to_rings: mapping from ad_id to list of ring_ids it belongs to """ G = nx.Graph() rings: List[FraudRing] = [] ad_to_rings: Dict[str, List[str]] = {} remaining = list(available_fraud_ad_ids) rng.shuffle(remaining) for i in range(n_rings): if len(remaining) < 3: break # Reserve 3 ads per still-to-come ring so we always fit n_rings rings, # which is what makes the "all three CIB topologies every episode" # storytelling claim true at task_3. remaining_rings = n_rings - i - 1 reserved = 3 * remaining_rings budget = max(3, len(remaining) - reserved) ring_size = rng.randint(3, min(5, budget, len(remaining))) members = remaining[:ring_size] remaining = remaining[ring_size:] # Rotate through the Meta CIB case studies deterministically so that # every task_3 episode showcases at least one clique, one chain, and # one hub-spoke pattern when n_rings >= 3. case_study = RING_CASE_STUDIES[i % len(RING_CASE_STUDIES)] topology = case_study["topology"] signal_pool = _make_signal_pool(rng, i) signal_keys = list(_SIGNAL_POOL_KEYS) rng.shuffle(signal_keys) n_shared = rng.randint(2, len(signal_keys)) shared_signals = {k: signal_pool[k] for k in signal_keys[:n_shared]} _add_edges_for_topology(G, members, shared_signals, topology, rng) ring_id = f"ring_{i}" ring = FraudRing( ring_id=ring_id, member_ad_ids=members, shared_signals=shared_signals, topology=topology, case_name=case_study["case_name"], provenance=case_study["provenance"], ) rings.append(ring) for ad_id in members: ad_to_rings.setdefault(ad_id, []).append(ring_id) G.add_node(ad_id, ring_id=ring_id) # Optionally create bridge nodes between rings for extra complexity if len(rings) >= 2 and remaining: _add_bridge_ads(G, rings, remaining, ad_to_rings, rng) return rings, ad_to_rings def _add_edges_for_topology( G: nx.Graph, members: List[str], shared_signals: Dict[str, str], topology: str, rng: random.Random, ) -> None: """Add edges to the graph based on the ring topology.""" signal_types = list(shared_signals.keys()) if topology == "clique": for i, a in enumerate(members): for b in members[i + 1:]: signal = rng.choice(signal_types) G.add_edge(a, b, signal_type=signal, signal_value=shared_signals[signal]) elif topology == "chain": for idx in range(len(members) - 1): signal = signal_types[idx % len(signal_types)] G.add_edge( members[idx], members[idx + 1], signal_type=signal, signal_value=shared_signals[signal], ) elif topology == "hub_spoke": hub = members[0] for spoke in members[1:]: signal = rng.choice(signal_types) G.add_edge(hub, spoke, signal_type=signal, signal_value=shared_signals[signal]) def _add_bridge_ads( G: nx.Graph, rings: List[FraudRing], remaining: List[str], ad_to_rings: Dict[str, List[str]], rng: random.Random, ) -> None: """Optionally link two rings via a shared bridge ad from the remaining pool.""" if len(remaining) < 1 or len(rings) < 2: return bridge_ad = remaining.pop(0) r1, r2 = rings[0], rings[1] bridge_to_r1 = rng.choice(r1.member_ad_ids) bridge_to_r2 = rng.choice(r2.member_ad_ids) r1.member_ad_ids.append(bridge_ad) ad_to_rings.setdefault(bridge_ad, []).extend([r1.ring_id, r2.ring_id]) sig_key = rng.choice(list(r1.shared_signals.keys())) G.add_edge(bridge_ad, bridge_to_r1, signal_type=sig_key, signal_value=r1.shared_signals[sig_key]) sig_key2 = rng.choice(list(r2.shared_signals.keys())) G.add_edge(bridge_ad, bridge_to_r2, signal_type=sig_key2, signal_value=r2.shared_signals[sig_key2]) def get_ring_shared_signal_text(ring: FraudRing) -> str: """Describe the shared signals in a ring (for grader/debug use).""" header_tail = f"topology={ring.topology}" if ring.case_name: header_tail = f"{ring.case_name} {ring.topology}" lines = [ f"Fraud Ring {ring.ring_id} ({ring.size} members, {header_tail}):" ] if ring.provenance: lines.append(f" Modelled after: {ring.provenance}") lines.append(f" Members: {', '.join(ring.member_ad_ids)}") lines.append(" Shared signals:") for signal_type, value in ring.shared_signals.items(): lines.append(f" - {signal_type}: {value}") return "\n".join(lines) def build_ground_truth_graph(rings: List[FraudRing]) -> nx.Graph: """Reconstruct the full ground truth network graph from rings. Used by graders to compute the expected set of edges. """ G = nx.Graph() for ring in rings: for i, a in enumerate(ring.member_ad_ids): G.add_node(a, ring_id=ring.ring_id) for b in ring.member_ad_ids[i + 1:]: G.add_edge(a, b, ring_id=ring.ring_id) return G