import os import re import json import time import sys import asyncio import socket import random import logging import warnings import unicodedata import email from email.policy import default from typing import List, Dict, Optional, Any from urllib.parse import urlparse # Third-party imports import httpx import uvicorn import joblib import torch import numpy as np import pandas as pd from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from groq import AsyncGroq, RateLimitError, APIError from dotenv import load_dotenv from bs4 import BeautifulSoup, MarkupResemblesLocatorWarning from playwright.async_api import async_playwright # Local imports import config from models import get_ml_models, get_dl_models, FinetunedBERT from feature_extraction import process_row load_dotenv() sys.path.append(os.path.join(config.BASE_DIR, 'Message_model')) # Attempt to import the local semantic model try: from predict import PhishingPredictor except ImportError: PhishingPredictor = None # ================================================================================== # š ULTRA-AESTHETIC LOGGING SETUP (VISUAL OVERHAUL) # ================================================================================== class UltraColorFormatter(logging.Formatter): # ANSI Color Codes GREY = "\x1b[38;5;240m" CYAN = "\x1b[36m" NEON_BLUE = "\x1b[38;5;39m" NEON_GREEN = "\x1b[38;5;82m" NEON_PURPLE = "\x1b[38;5;129m" YELLOW = "\x1b[33m" ORANGE = "\x1b[38;5;208m" RED = "\x1b[31m" BOLD_RED = "\x1b[31;1m" WHITE_BOLD = "\x1b[37;1m" RESET = "\x1b[0m" # Custom Formats based on Level FORMATS = { logging.DEBUG: GREY + " š [DEBUG] %(message)s" + RESET, logging.INFO: "%(message)s" + RESET, # Info handles its own coloring in code logging.WARNING: ORANGE + " ā ļø [WARNING] %(message)s" + RESET, logging.ERROR: RED + " ā [ERROR] %(message)s" + RESET, logging.CRITICAL: BOLD_RED + "\nšØ [CRITICAL] %(message)s\n" + RESET } def format(self, record): log_fmt = self.FORMATS.get(record.levelno) formatter = logging.Formatter(log_fmt) return formatter.format(record) logger = logging.getLogger("PhishingAPI") logger.setLevel(logging.INFO) ch = logging.StreamHandler(sys.stdout) ch.setFormatter(UltraColorFormatter()) if logger.hasHandlers(): logger.handlers.clear() logger.addHandler(ch) # --- VISUAL HELPER FUNCTIONS --- def log_section(title): logger.info(f"\n{UltraColorFormatter.NEON_PURPLE}ā{'ā'*70}ā") logger.info(f"{UltraColorFormatter.NEON_PURPLE}ā {UltraColorFormatter.WHITE_BOLD}{title.center(68)}{UltraColorFormatter.NEON_PURPLE} ā") logger.info(f"{UltraColorFormatter.NEON_PURPLE}ā{'ā'*70}ā{UltraColorFormatter.RESET}") def log_step(icon, text): logger.info(f"{UltraColorFormatter.CYAN} {icon} {text}{UltraColorFormatter.RESET}") def log_substep(text, value=""): val_str = f": {UltraColorFormatter.NEON_GREEN}{value}{UltraColorFormatter.RESET}" if value else "" logger.info(f"{UltraColorFormatter.GREY} āā {text}{val_str}") def log_success(text): logger.info(f"{UltraColorFormatter.NEON_GREEN} ā {text}{UltraColorFormatter.RESET}") def log_metric(label, value, warning=False): color = UltraColorFormatter.ORANGE if warning else UltraColorFormatter.NEON_BLUE logger.info(f" {color}š {label}: {UltraColorFormatter.WHITE_BOLD}{value}{UltraColorFormatter.RESET}") # ================================================================================== # --- CONFIGURATION --- MAX_INPUT_CHARS = 4000 MAX_CONCURRENT_REQUESTS = 5 MAX_URLS_TO_ANALYZE = 15 LLM_MAX_RETRIES = 3 app = FastAPI( title="Phishing Detection API (Robust Ensemble)", description="Multilingual phishing detection using Weighted Ensemble (ML/DL) + LLM Semantic Analysis + Live Scraping", version="2.6.0" ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) request_semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS) # --- DATA MODELS --- class MessageInput(BaseModel): sender: Optional[str] = "" subject: Optional[str] = "" text: Optional[str] = "" metadata: Optional[Dict] = {} class PredictionResponse(BaseModel): confidence: float reasoning: str highlighted_text: str final_decision: str suggestion: str # --- UTILITIES --- class SmartAPIKeyRotator: def __init__(self): keys_str = os.environ.get('GROQ_API_KEYS', '') self.keys = [k.strip() for k in keys_str.split(',') if k.strip()] if not self.keys: single_key = os.environ.get('GROQ_API_KEY') if single_key: self.keys = [single_key] if not self.keys: logger.critical("CRITICAL: No GROQ_API_KEYS found in environment variables!") else: log_substep("API Key Rotator", f"Initialized with {len(self.keys)} keys") self.clients = [AsyncGroq(api_key=k) for k in self.keys] self.num_keys = len(self.clients) self.current_index = 0 def get_client_and_rotate(self): if not self.clients: return None client = self.clients[self.current_index] self.current_index = (self.current_index + 1) % self.num_keys return client # Global Model Placeholders ml_models = {} dl_models = {} bert_model = None semantic_model = None key_rotator: Optional[SmartAPIKeyRotator] = None ip_cache = {} def clean_and_parse_json(text: str) -> Dict: try: return json.loads(text) except json.JSONDecodeError: pass text = re.sub(r"json\s*", "", text, flags=re.IGNORECASE) text = re.sub(r"", "", text) try: start = text.find('{') end = text.rfind('}') if start != -1 and end != -1: json_str = text[start:end+1] return json.loads(json_str) except Exception: pass logger.error(f"Failed to parse JSON from LLM response: {text[:50]}...") return {} class EnsembleScorer: WEIGHTS = {'ml': 0.30, 'dl': 0.20, 'bert': 0.20, 'semantic': 0.10, 'network': 0.20} @staticmethod def calculate_technical_score(predictions: Dict, network_data: List[Dict], urls: List[str]) -> Dict: score_accum = 0.0 weight_accum = 0.0 details = [] log_step("š§®", "Calculating Ensemble Weights") # ML Scores ml_scores = [p['raw_score'] for k, p in predictions.items() if k in ['logistic', 'svm', 'xgboost']] if ml_scores: avg_ml = np.mean(ml_scores) score_accum += avg_ml * EnsembleScorer.WEIGHTS['ml'] * 100 weight_accum += EnsembleScorer.WEIGHTS['ml'] details.append(f"ML Consensus: {avg_ml:.2f}") log_substep("ML Models Consensus", f"{avg_ml:.4f} (Weight: {EnsembleScorer.WEIGHTS['ml']})") # DL Scores dl_scores = [p['raw_score'] for k, p in predictions.items() if k in ['attention_blstm', 'rcnn']] if dl_scores: avg_dl = np.mean(dl_scores) score_accum += avg_dl * EnsembleScorer.WEIGHTS['dl'] * 100 weight_accum += EnsembleScorer.WEIGHTS['dl'] details.append(f"DL Consensus: {avg_dl:.2f}") log_substep("Deep Learning Consensus", f"{avg_dl:.4f} (Weight: {EnsembleScorer.WEIGHTS['dl']})") # BERT if 'bert' in predictions: bert_s = predictions['bert']['raw_score'] score_accum += bert_s * EnsembleScorer.WEIGHTS['bert'] * 100 weight_accum += EnsembleScorer.WEIGHTS['bert'] details.append(f"BERT Score: {bert_s:.2f}") log_substep("BERT Finetuned", f"{bert_s:.4f} (Weight: {EnsembleScorer.WEIGHTS['bert']})") # Semantic if 'semantic' in predictions: sem_s = predictions['semantic']['raw_score'] score_accum += sem_s * EnsembleScorer.WEIGHTS['semantic'] * 100 weight_accum += EnsembleScorer.WEIGHTS['semantic'] log_substep("Semantic Analysis", f"{sem_s:.4f} (Weight: {EnsembleScorer.WEIGHTS['semantic']})") # Network net_risk = 0.0 net_reasons = [] for net_info in network_data: if net_info.get('proxy') or net_info.get('hosting'): net_risk += 40 net_reasons.append("Hosted/Proxy IP") org = str(net_info.get('org', '')).lower() isp = str(net_info.get('isp', '')).lower() suspicious_hosts = ['hostinger', 'namecheap', 'digitalocean', 'hetzner', 'ovh', 'flokinet'] if any(x in org or x in isp for x in suspicious_hosts): net_risk += 20 net_reasons.append(f"Cheap Cloud Provider ({org[:15]}...)") net_risk = min(net_risk, 100) score_accum += net_risk * EnsembleScorer.WEIGHTS['network'] weight_accum += EnsembleScorer.WEIGHTS['network'] log_substep("Network Risk Calculated", f"{net_risk:.2f} (Weight: {EnsembleScorer.WEIGHTS['network']})") if net_reasons: details.append(f"Network Penalties: {', '.join(list(set(net_reasons)))}") if weight_accum == 0: final_score = 50.0 else: final_score = score_accum / weight_accum return { "score": min(max(final_score, 0), 100), "details": "; ".join(details), "network_risk": net_risk } def load_models(): global ml_models, dl_models, bert_model, semantic_model, key_rotator log_section("SYSTEM STARTUP: LOADING ASSETS") models_dir = config.MODELS_DIR # Load ML for model_name in ['logistic', 'svm', 'xgboost']: try: path = os.path.join(models_dir, f'{model_name}.joblib') if os.path.exists(path): ml_models[model_name] = joblib.load(path) log_substep(f"ML Model Loaded", model_name) except Exception: pass # Load DL for model_name in ['attention_blstm', 'rcnn']: try: path = os.path.join(models_dir, f'{model_name}.pt') if os.path.exists(path): template = get_dl_models(input_dim=len(config.NUMERICAL_FEATURES)) model = template[model_name] model.load_state_dict(torch.load(path, map_location='cpu')) model.eval() dl_models[model_name] = model log_substep(f"DL Model Loaded", model_name) except Exception: pass # Load BERT bert_path = os.path.join(config.BASE_DIR, 'finetuned_bert') if os.path.exists(bert_path): try: bert_model = FinetunedBERT(bert_path) log_substep("BERT Model", "Loaded Successfully") except Exception: pass # Load Semantic sem_path = os.path.join(config.BASE_DIR, 'Message_model', 'final_semantic_model') if os.path.exists(sem_path) and PhishingPredictor: try: semantic_model = PhishingPredictor(model_path=sem_path) log_substep("Semantic Model", "Loaded Successfully") except Exception: pass key_rotator = SmartAPIKeyRotator() # --- UPDATED PARSING LOGIC --- def extract_visible_text_and_links(raw_email: str) -> tuple: """ Parse a full raw email using Python's email library and extract: - extracted_text (merged plain text + HTML text + metadata) - links (list of all URLs found anywhere) """ log_step("šØ", "Parsing Email MIME Structure") if not raw_email: logger.warning("Parsing received empty email input") return "", [] extracted_text_parts = [] links = set() # Attempt 1: Try parsing as a standard MIME Email message try: msg = email.message_from_string(raw_email, policy=default) # Extract basic metadata if available metadata = { "from": msg.get("From", ""), "to": msg.get("To", ""), "subject": msg.get("Subject", "") } for k, v in metadata.items(): if v: extracted_text_parts.append(f"{k.capitalize()}: {v}") log_substep(f"Metadata [{k}]", v[:50] + "..." if len(v) > 50 else v) part_count = 0 for part in msg.walk(): part_count += 1 content_type = part.get_content_type() content_disposition = str(part.get("Content-Disposition") or "") try: if content_type == "text/plain": text_data = part.get_payload(decode=True) if text_data: text_str = text_data.decode(part.get_content_charset() or "utf-8", errors="ignore") extracted_text_parts.append(text_str) links.update(re.findall(r'https?://\S+', text_str)) elif content_type == "text/html": html_data = part.get_payload(decode=True) if html_data: html_str = html_data.decode(part.get_content_charset() or "utf-8", errors="ignore") soup = BeautifulSoup(html_str, "html.parser") extracted_text_parts.append(soup.get_text(separator="\n")) for a in soup.find_all("a", href=True): links.add(a["href"]) for img in soup.find_all("img", src=True): links.add(img["src"]) elif "attachment" in content_disposition.lower() or "inline" in content_disposition.lower(): filename = part.get_filename() if filename: extracted_text_parts.append(f"[Attachment found: {filename}]") log_substep("Attachment", filename) except Exception as e: logger.warning(f"Error parsing email part: {e}") except Exception as e: logger.error(f"Email Parsing Failed: {e}") # Combine extracted parts extracted_text = "\n".join(extracted_text_parts).strip() # --- CRITICAL FIX FOR RAW HTML PAYLOADS --- # If MIME parsing failed to extract text (extracted_text is empty), # but the input looks like HTML, force a BeautifulSoup clean. if not extracted_text: if "").replace('"', "") if link.startswith("http://") or link.startswith("https://"): cleaned_links.append(link) log_success(f"Parsed Content. Extracted {len(cleaned_links)} unique URLs.") return extracted_text, cleaned_links async def extract_url_features(urls: List[str]) -> pd.DataFrame: if not urls: return pd.DataFrame() log_step("š§¬", f"Extracting Features for {len(urls)} URLs") df = pd.DataFrame({'url': urls}) whois_cache, ssl_cache = {}, {} tasks = [asyncio.to_thread(process_row, row, whois_cache, ssl_cache) for _, row in df.iterrows()] feature_list_raw = await asyncio.gather(*tasks, return_exceptions=True) feature_list = [] for i, f in enumerate(feature_list_raw): if isinstance(f, Exception): logger.error(f"Feature extraction error on {urls[i]}: {f}") feature_list.append({}) else: feature_list.append(f) log_substep("Feature Extraction", "Complete") return pd.concat([df, pd.DataFrame(feature_list)], axis=1) def get_model_predictions(features_df: pd.DataFrame, message_text: str) -> Dict: predictions = {} num_feats = config.NUMERICAL_FEATURES cat_feats = config.CATEGORICAL_FEATURES if not features_df.empty: try: log_step("š¤", "Running Machine Learning Inference") X = features_df[num_feats + cat_feats].copy() X[num_feats] = X[num_feats].fillna(-1) X[cat_feats] = X[cat_feats].fillna('N/A') # ML Models for name, model in ml_models.items(): try: probas = model.predict_proba(X)[:, 1] raw_score = float(np.max(probas)) predictions[name] = {'raw_score': raw_score} log_substep(f"ML: {name.ljust(10)}", f"{raw_score:.4f}") except: predictions[name] = {'raw_score': 0.5} # DL Models if dl_models: X_num = torch.tensor(X[num_feats].values.astype(np.float32)) with torch.no_grad(): for name, model in dl_models.items(): try: out = model(X_num) raw_score = float(torch.max(out).item()) predictions[name] = {'raw_score': raw_score} log_substep(f"DL: {name.ljust(10)}", f"{raw_score:.4f}") except: predictions[name] = {'raw_score': 0.5} # BERT if bert_model: try: scores = bert_model.predict_proba(features_df['url'].tolist()) avg_score = float(np.mean([s[1] for s in scores])) predictions['bert'] = {'raw_score': avg_score} log_substep("BERT Inference", f"{avg_score:.4f}") except: pass except Exception as e: logger.error(f"Feature Pipeline Error: {e}") if semantic_model and message_text: try: log_step("š§ ", "Running Semantic Text Analysis") res = semantic_model.predict(message_text) predictions['semantic'] = {'raw_score': float(res['phishing_probability'])} log_substep("Semantic Prob", f"{res['phishing_probability']:.4f}") except: pass return predictions async def get_network_data_raw(urls: List[str]) -> List[Dict]: data = [] unique_hosts = set() for url_str in urls: try: parsed = urlparse(url_str if url_str.startswith(('http', 'https')) else f"http://{url_str}") if parsed.hostname: unique_hosts.add(parsed.hostname) except: pass target_hosts = list(unique_hosts)[:5] log_step("š", f"Geo-Locating Hosts: {target_hosts}") async with httpx.AsyncClient(timeout=3.0) as client: for host in target_hosts: if host in ip_cache: data.append(ip_cache[host]) log_substep(f"Cache Hit", host) continue try: ip = await asyncio.to_thread(socket.gethostbyname, host) resp = await client.get(f"http://ip-api.com/json/{ip}?fields=status,message,country,isp,org,as,proxy,hosting") if resp.status_code == 200: geo = resp.json() if geo.get('status') == 'success': geo['ip'] = ip geo['host'] = host data.append(geo) ip_cache[host] = geo log_substep(f"Resolved {host}", f"{geo.get('org', 'Unknown')} [{geo.get('country', 'UNK')}]") except Exception: log_substep(f"Failed to resolve", host) await asyncio.sleep(0.2) return data async def scrape_landing_page(urls: list[str]) -> dict: # Cap URLs to 10 urls = urls[:10] results = {} async def scrape_single(url: str): nonlocal results try: async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context( user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36" ) page = await context.new_page() try: target_url = url if url.startswith(("http", "https")) else f"http://{url}" await page.goto(target_url, timeout=10000, wait_until="domcontentloaded") content = await page.content() soup = BeautifulSoup(content, "html.parser") for tag in soup(["script", "style", "nav", "footer", "svg", "noscript"]): tag.decompose() text = soup.get_text(separator=" ", strip=True) text = unicodedata.normalize("NFKC", text) results[url] = text[:300] except Exception as e: results[url] = f"Error accessing page: {str(e)}" finally: await browser.close() except Exception as e: results[url] = f"Scraping failed: {str(e)}" # Run all tasks concurrently tasks = [scrape_single(u) for u in urls] await asyncio.gather(*tasks) return results # --- SYSTEM PROMPT --- SYSTEM_PROMPT = """You are the 'Maverick', an elite, autonomous Cybersecurity Judge. Your sole purpose is to analyze the provided Evidence Dossier and return a JSON object. **Core Rules:** 1. **The "One Bad Link" Rule:** If the email contains **ANY** suspicious or malicious URL, the Final Decision MUST be "phishing" (100% Confidence), even if other links are legitimate. 2. **Prioritize Ground Truth:** You must prioritize **Scraped Content** (e.g., a page asking for credentials) and **Network Data** (e.g., a Bank hosted on DigitalOcean) over the Technical Score. 3. **Override Authority:** Even if the 'Technical Ensemble Score' is low (e.g., 20/100), if you find a Critical Threat in the Scraped Data or Forensic Scan, you MUST override with a High Score (90-100). 4. **Suspicious Indicators:** - **Scraped Data:** Login forms on non-official domains, "Verify Identity" text, urgency. - **Network:** Mismatch between Sender Domain and Hosting (e.g., Microsoft email hosted on Namecheap). - **Forensics:** Hidden H1 tags, Typosquatting (paypa1.com), Mismatched hrefs. 5. **Confidence score:** -Give confidence score between 0-100 based on all the evidences and the decision being made. The score >50 should be given if the email seems phishing and <50 should be given if the email seems legitimate. 6. **Final Decision:** -Based on the evidences and confidence score, give the final decision , if the final score seems phishing then give final decision as phishing and if the final score seems legitimate then give final decision as legitimate. **8 ROBUST FEW-SHOT EXAMPLES:** **Example 1: Phishing (Credential Harvesting - Scraped Data Override)** **Input:** Sender: security-alert@microsoft-online-verify.com Subject: Action Required: Unusual Sign-in Activity Detected Technical Score: 35 / 100 Network Intelligence: Host: 162.241.2.1 | Org: Unified Layer (Cheap Hosting) | ISP: Bluehost | Proxy: False Scraped Content: "Microsoft 365. Sign in to your account. Email, phone, or Skype. No account? Create one. Can't access your account? Sign-in options. Terms of Use Privacy & Cookies. Ā© Microsoft 2025. NOTE: This page is for authorized users only." Forensic Scan: Link: http://microsoft-online-verify.com/login.php Message: "Microsoft Security Alert We detected a sign-in attempt from a new device or location. **Account:** user@example.com **Date:** Fri, Nov 28, 2025 10:23 AM GMT **Location:** Moscow, Russia **IP Address:** 103.22.14.2 **Browser:** Firefox on Windows 10 If this wasn't you, your account may have been compromised. Please **verify your identity immediately** to secure your account and avoid permanent suspension. [Secure My Account] Thanks, The Microsoft Account Team" **Correct Decision:** {{ "confidence": 99.0, "reasoning": "CRITICAL OVERRIDE. The Scraped Data mimics a Microsoft 365 Login portal ('Sign in to your account'), but the Network Data confirms the site is hosted on 'Unified Layer/Bluehost', NOT Microsoft's official Azure infrastructure. This is a classic credential harvesting attack using a fake security alert.", "highlighted_text": "Please @@verify your identity immediately@@ to secure your account and avoid permanent suspension. @@[Secure My Account]@@", "final_decision": "phishing", "suggestion": "Do not enter credentials. This is a fake login page hosted on non-Microsoft servers." }} **Example 2: Phishing (Hidden Malicious URL - Forensic Override)** **Input:** Sender: hr-updates@wipro.com Subject: MANDATORY: Updated Employee Handbook & Compliance Policy 2025 Technical Score: 45 / 100 Network Intelligence: Host: docs.google.com (Google LLC) Scraped Content: "Google Docs. Sign in. Employee Handbook 2025.pdf. You need permission. Request access. Switch accounts. Google Workspace." Forensic Scan: CRITICAL: Found hidden URL in H1 tag: 'http://bit.ly/malware-redirect-payload' Message: "Dear Team, As part of our annual compliance audit (ISO 27001), all employees are required to review and sign the updated Employee Handbook for the fiscal year 2025. Please access the document via the secure Google Docs link below: [docs.google.com/handbook-2025](Link) Failure to acknowledge this document by Friday may result in a temporary suspension of network access. Regards, HR Compliance Team Wipro Limited" **Correct Decision:** {{ "confidence": 98.0, "reasoning": "Phishing. While the visible body text points to a legitimate Google Docs URL, the email contains a hidden malicious URL ('bit.ly/malware-redirect-payload') embedded in the HTML header tags. This is a sophisticated evasion tactic designed to bypass filters while tricking the user.", "highlighted_text": "Please access the document via the secure Google Docs link below: [docs.google.com/handbook-2025] @@(Hidden Header URL Detected)@@", "final_decision": "phishing", "suggestion": "Do not click. A hidden malicious payload was detected in the email structure." }} **Example 3: Phishing (Typosquatting & Urgency)** **Input:** Sender: support@paypa1-resolution.com Subject: URGENT: Wallet Suspended - Case ID #99283-AX Technical Score: 88 / 100 Network Intelligence: Host: paypa1-resolution.com | Org: Namecheap Inc | ISP: Namecheap Scraped Content: "PayPal. Security Challenge. Enter your credit card number to verify ownership. Expiration Date. CVV. Billing Address. Submit. Copyright 1999-2025 PayPal. All rights reserved." Forensic Scan: Link: https://paypa1-resolution.com/verify-identity Message: "Hello Customer, Your PayPal wallet has been temporarily suspended due to suspicious transactions totaling $400.00 USD to 'Global-Tech-Solutions Ltd'. To restore full access to your funds, you must **verify your payment method immediately**. Failure to do so within 24 hours will result in the permanent closure of your account and forfeiture of remaining balance. [Resolve Issue Now] Thank you for being a valued customer." **Correct Decision:** {{ "confidence": 99.0, "reasoning": "Phishing. Typosquatting detected ('paypa1' instead of 'paypal'). The Scraped Data confirms the landing page asks for credit card details (CVV/Expiry), and the domain is registered via Namecheap, not PayPal's official infrastructure. High urgency and threat of 'forfeiture' are clear indicators.", "highlighted_text": "Your PayPal wallet has been temporarily @@suspended@@. To restore full access, you must @@verify your payment method immediately@@. Failure to do so within 24 hours will result in @@permanent closure@@.", "final_decision": "phishing", "suggestion": "Delete immediately. This is an impersonation attack stealing financial data." }} **Example 4: Legitimate (Internal Corporate Email)** **Input:** Sender: admin@internal.daiict.ac.in Subject: Scheduled Maintenance - Server Room B - Sunday Nov 30 Technical Score: 15 / 100 Network Intelligence: Host: internal.daiict.ac.in | Org: DA-IICT | ISP: NKN (National Knowledge Network) | Proxy: False Scraped Content: "DA-IICT Intranet Login. Authorized Personnel Only. User ID. Password. Forgot Password? IT Helpdesk Extension: 4444." Forensic Scan: Link: https://internal.daiict.ac.in/wiki/maintenance-schedule Message: "Dear Faculty and Staff, The central server room (Room B) will be undergoing scheduled maintenance on Sunday, Nov 30, from 2:00 AM to 4:00 AM IST. **Services affected:** 1. Moodle (LMS) 2. Faculty Webmail 3. Student Portal Please ensure all work is saved before this time. For the full maintenance schedule and alternative access points, please check the wiki: https://internal.daiict.ac.in/wiki/maintenance-schedule Regards, IT Services" **Correct Decision:** {{ "confidence": 5.0, "reasoning": "Legitimate. The sender domain, link domain, and network infrastructure (DA-IICT/NKN) all align perfectly. The scraped content matches an internal login page. The content is informational with specific details and lacks suspicious urgency or external hosting.", "highlighted_text": "The central server room (Room B) will be undergoing scheduled maintenance on Sunday. Please check the wiki: https://internal.daiict.ac.in/wiki/maintenance-schedule", "final_decision": "legitimate", "suggestion": "Safe internal communication regarding maintenance." }} **Example 5: Phishing (Legitimate Cloud Abuse - Google Forms)** **Input:** Sender: recruitment-officer.john.doe@gmail.com Subject: Job Offer: Senior Python Developer - Document Upload Technical Score: 60 / 100 Network Intelligence: Host: forms.gle (Google LLC) Scraped Content: "Untitled Form. Job Application - Senior Dev. Upload your Passport Front and Back. Upload SSN Card. Upload Drivers License. Submit. Never submit passwords through Google Forms." Forensic Scan: Link: https://forms.gle/xyz123abc Message: "Dear Candidate, Congratulations! We are pleased to offer you the Senior Python Developer position at TechSolutions Inc. To proceed with your background check and contract generation, we need you to upload the following documents immediately: 1. Scanned Copy of Passport 2. Social Security Number (SSN) Card 3. Driver's License Please upload them to our secure portal here: [Link to Google Form] We need this by EOD today." **Correct Decision:** {{ "confidence": 92.0, "reasoning": "Phishing. Although the domain is legitimate (Google Forms), the Scraped Content reveals it is soliciting highly sensitive PII (Passport/SSN). Legitimate companies do not collect SSNs via public Google Forms. The use of a generic Gmail address for a 'Recruitment Officer' is also a red flag.", "highlighted_text": "To proceed with your background check... please @@upload the following documents immediately@@: 1. Scanned Copy of Passport 2. Social Security Number (SSN) Card. Please upload them to our secure portal here: @@[Link to Google Form]@@", "final_decision": "phishing", "suggestion": "Do not upload sensitive ID documents to public forms. This is likely identity theft." }} **Example 6: Legitimate (Transactional Alert)** **Input:** Sender: alerts@hdfcbank.net Subject: Transaction Alert: INR 5,000.00 Debited Technical Score: 8 / 100 Network Intelligence: Host: hdfcbank.net | Org: HDFC Bank Ltd | ISP: Sify Technologies Scraped Content: N/A (No Links) Forensic Scan: No URLs found. Message: "Dear Customer, INR 5,000.00 was debited from your A/c XX1234 on 28-Nov-2025. **Info:** UPI-12345-AmazonPay **Available Bal:** INR 42,000.00 If this transaction was not done by you, please call our 24x7 PhoneBanking number 1800-202-6161 immediately to block your card/account. Warm Regards, HDFC Bank" **Correct Decision:** {{ "confidence": 2.0, "reasoning": "Legitimate. This is a standard text-only transactional alert. The sender domain matches HDFC Bank's official domain, and the network data confirms it. The phone number is a standard support line. There are no suspicious links.", "highlighted_text": "INR 5,000.00 was debited from your A/c XX1234 on 28-Nov-2025. Info: UPI-12345-AmazonPay.", "final_decision": "legitimate", "suggestion": "Safe transactional alert. No action needed unless the transaction is unrecognized." }} **Example 7: Phishing (CEO Fraud / BEC - No Links)** **Input:** Sender: ceo.work.private@gmail.com (Spoofed Name: "Elon Musk") Subject: Urgent Task - Confidential - DO NOT IGNORE Technical Score: 75 / 100 Network Intelligence: Host: mail.google.com (Google LLC) Scraped Content: N/A Forensic Scan: No URLs found. Message: "Akshat, I am currently in a closed-door meeting with the board of investors and cannot talk on the phone. I need a favor. I need you to purchase 5 Apple Gift Cards ($100 each) for a client gift. It is urgent and needs to be done in the next 30 minutes. I will reimburse you personally by this evening. Do not mention this to anyone else yet. Reply with the codes here as soon as you have them. Elon." **Correct Decision:** {{ "confidence": 90.0, "reasoning": "Phishing (BEC). Classic Business Email Compromise. The Sender is using a generic Gmail address to impersonate a C-level executive. The request involves financial urgency (Gift Cards), secrecy ('closed-door meeting', 'do not mention'), and bypasses standard procurement channels.", "highlighted_text": "I need you to @@purchase 5 Apple Gift Cards@@ ($100 each) for a client gift. It is urgent... @@Reply with the codes here@@ as soon as you have them.", "final_decision": "phishing", "suggestion": "Do not reply. Verify this request with the CEO via a different, verified channel (Slack/Phone/Corporate Email)." }} **Example 8: Legitimate (Marketing with Trackers)** **Input:** Sender: newsletter@coursera.org Subject: Recommended for you: Python for Everybody Specialization Technical Score: 20 / 100 Network Intelligence: Host: links.coursera.org | Org: Coursera Inc | ISP: Amazon.com Scraped Content: "Coursera. Master Python. Enroll for Free. Starts Nov 29. Financial Aid available. Top Instructors. University of Michigan. 4.8 Stars (120k ratings)." Forensic Scan: Link: https://links.coursera.org/track/click?id=12345&user=akshat Message: "Hi Student, Based on your interest in Data Science, we found a course you might like: **Python for Everybody Specialization** Offered by University of Michigan. Start learning today and build job-ready skills. [Enroll Now] See you in class, The Coursera Team 381 E. Evelyn Ave, Mountain View, CA 94041" **Correct Decision:** {{ "confidence": 10.0, "reasoning": "Legitimate. Standard marketing email from a known education platform. Network data confirms the link tracking domain belongs to Coursera (hosted on AWS). Scraped content is consistent with the offer. Address matches public records.", "highlighted_text": "Based on your interest in Data Science, we found a course you might like: Python for Everybody Specialization. [Enroll Now]", "final_decision": "legitimate", "suggestion": "Safe marketing email." }}""" async def get_groq_decision(ensemble_result: Dict, network_data: List[Dict], landing_page_text: str, cleaned_text: str, original_raw_html: str, readable_display_text: str, sender: str, subject: str): net_str = "No Network Data" if network_data: net_str = "\n".join([ f"- Host: {d.get('host')} | IP: {d.get('ip')} | Org: {d.get('org')} | ISP: {d.get('isp')} | Hosting/Proxy: {d.get('hosting') or d.get('proxy')}" for d in network_data ]) log_step("š", "Starting Forensic HTML Scan") forensic_report = [] try: soup = BeautifulSoup(original_raw_html, 'html.parser') # A. Scan Forms for form in soup.find_all('form'): action = form.get('action') if action: forensic_report.append(f"CRITICAL: Found URL in