AI_Text_Authenticator / text_auth_app.py
satyaki-mitra's picture
Updated UI
69256da
# DEPENDENCIES
import os
import time
import json
import uvicorn
import numpy as np
from typing import Any
from typing import List
from typing import Dict
from typing import Union
from pathlib import Path
from fastapi import File
from fastapi import Form
from loguru import logger
from pydantic import Field
from typing import Optional
from fastapi import FastAPI
from fastapi import Request
from datetime import datetime
from fastapi import UploadFile
from pydantic import BaseModel
from fastapi import HTTPException
from fastapi import BackgroundTasks
from config.settings import settings
from utils.logger import central_logger
from utils.logger import log_api_request
from detector.attribution import AIModel
from config.threshold_config import Domain
from fastapi.responses import JSONResponse
from fastapi.responses import HTMLResponse
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from utils.logger import log_detection_event
from detector.attribution import ModelAttributor
from detector.highlighter import TextHighlighter
from processors.language_detector import Language
from detector.orchestrator import DetectionResult
from detector.attribution import AttributionResult
from fastapi.middleware.cors import CORSMiddleware
from processors.text_processor import TextProcessor
from reporter.report_generator import ReportGenerator
from detector.orchestrator import DetectionOrchestrator
from processors.domain_classifier import DomainClassifier
from processors.language_detector import LanguageDetector
from processors.document_extractor import DocumentExtractor
from reporter.reasoning_generator import ReasoningGenerator
# ==================== CUSTOM SERIALIZATION ====================
class NumpyJSONEncoder(json.JSONEncoder):
"""
Custom JSON encoder that handles NumPy types and custom objects
"""
def default(self, obj: Any) -> Any:
"""
Convert non-serializable objects to JSON-serializable types
"""
# NumPy types
if (isinstance(obj, (np.float32, np.float64))):
return float(obj)
elif (isinstance(obj, (np.int32, np.int64, np.int8, np.uint8))):
return int(obj)
elif (isinstance(obj, np.ndarray)):
return obj.tolist()
elif (isinstance(obj, np.bool_)):
return bool(obj)
elif (hasattr(obj, 'item')):
# numpy scalar types
return obj.item()
# Custom objects with to_dict method
elif (hasattr(obj, 'to_dict')):
return obj.to_dict()
# Pydantic models
elif (hasattr(obj, 'dict')):
return obj.dict()
# Handle other types
elif (isinstance(obj, (set, tuple))):
return list(obj)
return super().default(obj)
class NumpyJSONResponse(JSONResponse):
"""
Custom JSON response that handles NumPy types
"""
def render(self, content: Any) -> bytes:
"""
Render content with NumPy type handling
"""
return json.dumps(obj = content,
ensure_ascii = False,
allow_nan = False,
indent = None,
separators = (",", ":"),
cls = NumpyJSONEncoder,
).encode("utf-8")
def convert_numpy_types(obj: Any) -> Any:
"""
Recursively convert numpy types to Python native types
Arguments:
----------
obj : Any Python object that may contain NumPy types
Returns:
--------
Object with all NumPy types converted to native Python types
"""
if (obj is None):
return None
# Handle dictionaries
if (isinstance(obj, dict)):
return {key: convert_numpy_types(value) for key, value in obj.items()}
# Handle lists, tuples, sets
elif (isinstance(obj, (list, tuple, set))):
return [convert_numpy_types(item) for item in obj]
# Handle NumPy types
elif (isinstance(obj, (np.float32, np.float64))):
return float(obj)
elif (isinstance(obj, (np.int32, np.int64, np.int8, np.uint8))):
return int(obj)
elif (isinstance(obj, np.ndarray)):
return obj.tolist()
elif (isinstance(obj, np.bool_)):
return bool(obj)
# numpy scalar types
elif (hasattr(obj, 'item')):
return obj.item()
# Handle custom objects with to_dict method
elif (hasattr(obj, 'to_dict')):
return convert_numpy_types(obj.to_dict())
# Handle Pydantic models
elif (hasattr(obj, 'dict')):
return convert_numpy_types(obj.dict())
# Return as-is for other types (str, int, float, bool, etc.)
else:
return obj
def safe_serialize_response(data: Any) -> Any:
"""
Safely serialize response data ensuring all types are JSON-compatible
Arguments:
----------
data : Response data to serialize
Returns:
--------
Fully serializable data structure
"""
return convert_numpy_types(data)
# ==================== PYDANTIC DATACLASS MODELS ====================
class SerializableBaseModel(BaseModel):
"""
Base model with enhanced serialization for NumPy types
"""
def dict(self, *args, **kwargs) -> Dict[str, Any]:
"""
Override dict method to handle NumPy types
"""
data = super().dict(*args, **kwargs)
return convert_numpy_types(data)
def json(self, *args, **kwargs) -> str:
"""
Override json method to handle NumPy types
"""
data = self.dict(*args, **kwargs)
return json.dumps(data, cls=NumpyJSONEncoder, *args, **kwargs)
class TextAnalysisRequest(SerializableBaseModel):
"""
Request model for text analysis
"""
text : str = Field(..., min_length = 50, max_length = 50000, description = "Text to analyze")
domain : Optional[str] = Field(None, description = "Override automatic domain detection")
enable_attribution : bool = Field(True, description = "Enable AI model attribution")
enable_highlighting : bool = Field(True, description = "Generate sentence highlighting")
skip_expensive_metrics : bool = Field(False, description = "Skip computationally expensive metrics")
use_sentence_level : bool = Field(True, description = "Use sentence-level analysis for highlighting")
include_metrics_summary : bool = Field(True, description = "Include metrics summary in highlights")
generate_report : bool = Field(False, description = "Generate detailed PDF/JSON report")
class TextAnalysisResponse(SerializableBaseModel):
"""
Response model for text analysis
"""
status : str
analysis_id : str
detection_result : Dict[str, Any]
attribution : Optional[Dict[str, Any]] = None
highlighted_html : Optional[str] = None
reasoning : Optional[Dict[str, Any]] = None
report_files : Optional[Dict[str, str]] = None
processing_time : float
timestamp : str
class BatchAnalysisRequest(SerializableBaseModel):
"""
Request model for batch analysis
"""
texts : List[str] = Field(..., min_items = 1, max_items = 100)
domain : Optional[str] = None
enable_attribution : bool = False
skip_expensive_metrics : bool = True
generate_reports : bool = False
class BatchAnalysisResult(SerializableBaseModel):
"""
Individual batch analysis result
"""
index : int
status : str
detection : Optional[Dict[str, Any]] = None
attribution : Optional[Dict[str, Any]] = None
reasoning : Optional[Dict[str, Any]] = None
report_files : Optional[Dict[str, str]] = None
error : Optional[str] = None
class BatchAnalysisResponse(SerializableBaseModel):
"""
Batch analysis response
"""
status : str
batch_id : str
total : int
successful : int
failed : int
results : List[BatchAnalysisResult]
processing_time : float
timestamp : str
class FileAnalysisResponse(SerializableBaseModel):
"""
File analysis response
"""
status : str
analysis_id : str
file_info : Dict[str, Any]
detection_result : Dict[str, Any]
attribution : Optional[Dict[str, Any]] = None
highlighted_html : Optional[str] = None
reasoning : Optional[Dict[str, Any]] = None
report_files : Optional[Dict[str, str]] = None
processing_time : float
timestamp : str
class HealthCheckResponse(SerializableBaseModel):
"""
Health check response
"""
status : str
version : str
uptime : float
models_loaded : Dict[str, bool]
class ReportGenerationResponse(SerializableBaseModel):
"""
Report generation response
"""
status : str
analysis_id : str
reports : Dict[str, str]
timestamp : str
class ErrorResponse(SerializableBaseModel):
"""
Error response model
"""
status : str
error : str
timestamp : str
# ==================== FASTAPI APPLICATION ====================
app = FastAPI(title = "TEXT-AUTH AI Detection API",
description = "API for detecting AI-generated text",
version = "1.0.0",
docs_url = "/api/docs",
redoc_url = "/api/redoc",
default_response_class = NumpyJSONResponse,
)
# CORS Configuration
app.add_middleware(CORSMiddleware,
allow_origins = settings.CORS_ORIGINS,
allow_credentials = True,
allow_methods = ["*"],
allow_headers = ["*"],
)
# Mount static files
ui_static_path = Path(__file__).parent / "ui" / "static"
if ui_static_path.exists():
app.mount("/static", StaticFiles(directory = str(ui_static_path)), name = "static")
# Global instances
orchestrator : Optional[DetectionOrchestrator] = None
attributor : Optional[ModelAttributor] = None
highlighter : Optional[TextHighlighter] = None
reporter : Optional[ReportGenerator] = None
reasoning_generator: Optional[ReasoningGenerator] = None
document_extractor : Optional[DocumentExtractor] = None
# App state
app_start_time = time.time()
initialization_status = {"orchestrator" : False,
"attributor" : False,
"highlighter" : False,
"reporter" : False,
"reasoning_generator" : False,
"document_extractor" : False,
}
# ==================== APPLICATION LIFECYCLE ====================
@app.on_event("startup")
async def startup_event():
"""
Initialize all components on startup
"""
global orchestrator, attributor, highlighter, reporter, reasoning_generator, document_extractor
global initialization_status
# Initialize centralized logging first
if not central_logger.initialize():
raise RuntimeError("Failed to initialize logging system")
logger.info("=" * 80)
logger.info("TEXT-AUTH API Starting Up...")
logger.info("=" * 80)
try:
# Initialize Detection Orchestrator
logger.info("Initializing Detection Orchestrator...")
orchestrator = DetectionOrchestrator(enable_language_detection = True,
parallel_execution = False,
skip_expensive_metrics = False,
)
if orchestrator.initialize():
initialization_status["orchestrator"] = True
logger.success("✓ Detection Orchestrator initialized")
else:
logger.warning("⚠ Detection Orchestrator initialization incomplete")
# Initialize Model Attributor
logger.info("Initializing Model Attributor...")
attributor = ModelAttributor()
if attributor.initialize():
initialization_status["attributor"] = True
logger.success("✓ Model Attributor initialized")
else:
logger.warning("⚠ Model Attributor initialization incomplete")
# Initialize Text Highlighter
logger.info("Initializing Text Highlighter...")
highlighter = TextHighlighter()
initialization_status["highlighter"] = True
logger.success("✓ Text Highlighter initialized")
# Initialize Report Generator
logger.info("Initializing Report Generator...")
reporter = ReportGenerator()
initialization_status["reporter"] = True
logger.success("✓ Report Generator initialized")
# Initialize Reasoning Generator
logger.info("Initializing Reasoning Generator...")
reasoning_generator = ReasoningGenerator()
initialization_status["reasoning_generator"] = True
logger.success("✓ Reasoning Generator initialized")
# Initialize Document Extractor
logger.info("Initializing Document Extractor...")
document_extractor = DocumentExtractor()
initialization_status["document_extractor"] = True
logger.success("✓ Document Extractor initialized")
logger.info("=" * 80)
logger.success("TEXT-AUTH API Ready!")
logger.info(f"Server: {settings.HOST}:{settings.PORT}")
logger.info(f"Environment: {settings.ENVIRONMENT}")
logger.info(f"Device: {settings.DEVICE}")
logger.info("=" * 80)
except Exception as e:
logger.error(f"Startup failed: {e}")
raise
# Cleanup in shutdown
@app.on_event("shutdown")
async def shutdown_event():
"""
Cleanup on shutdown
"""
central_logger.cleanup()
logger.info("Shutdown complete")
# ==================== UTILITY FUNCTIONS ====================
def _get_domain_description(domain: Domain) -> str:
"""
Get description for a domain
"""
descriptions = {Domain.GENERAL : "General content without specific domain",
Domain.ACADEMIC : "Academic papers, essays, research",
Domain.CREATIVE : "Creative writing, fiction, poetry",
Domain.AI_ML : "AI/ML research papers, technical content",
Domain.SOFTWARE_DEV : "Software development, code, documentation",
Domain.TECHNICAL_DOC : "Technical documentation, manuals, specs",
Domain.ENGINEERING : "Engineering documents, technical reports",
Domain.SCIENCE : "Scientific papers, research articles",
Domain.BUSINESS : "Business documents, reports, proposals",
Domain.LEGAL : "Legal documents, contracts, court filings",
Domain.MEDICAL : "Medical documents, clinical notes, research",
Domain.JOURNALISM : "News articles, journalistic content",
Domain.MARKETING : "Marketing copy, advertisements, campaigns",
Domain.SOCIAL_MEDIA : "Social media posts, blogs, casual writing",
Domain.BLOG_PERSONAL : "Personal blogs, diary entries",
Domain.TUTORIAL : "Tutorials, how-to guides, educational content",
}
return descriptions.get(domain, "")
def _parse_domain(domain_str: Optional[str]) -> Optional[Domain]:
"""
Parse domain string to Domain enum with comprehensive alias support
"""
if not domain_str:
return None
# First try exact match
try:
return Domain(domain_str.lower())
except ValueError:
# Comprehensive domain mapping with aliases for all 16 domains
domain_mapping = {'general' : Domain.GENERAL,
'default' : Domain.GENERAL,
'generic' : Domain.GENERAL,
'academic' : Domain.ACADEMIC,
'education' : Domain.ACADEMIC,
'research' : Domain.ACADEMIC,
'university' : Domain.ACADEMIC,
'scholarly' : Domain.ACADEMIC,
'creative' : Domain.CREATIVE,
'fiction' : Domain.CREATIVE,
'literature' : Domain.CREATIVE,
'story' : Domain.CREATIVE,
'narrative' : Domain.CREATIVE,
'ai_ml' : Domain.AI_ML,
'ai' : Domain.AI_ML,
'machinelearning' : Domain.AI_ML,
'ml' : Domain.AI_ML,
'artificialintelligence' : Domain.AI_ML,
'neural' : Domain.AI_ML,
'software_dev' : Domain.SOFTWARE_DEV,
'software' : Domain.SOFTWARE_DEV,
'code' : Domain.SOFTWARE_DEV,
'programming' : Domain.SOFTWARE_DEV,
'development' : Domain.SOFTWARE_DEV,
'dev' : Domain.SOFTWARE_DEV,
'technical_doc' : Domain.TECHNICAL_DOC,
'technical' : Domain.TECHNICAL_DOC,
'tech' : Domain.TECHNICAL_DOC,
'documentation' : Domain.TECHNICAL_DOC,
'docs' : Domain.TECHNICAL_DOC,
'manual' : Domain.TECHNICAL_DOC,
'engineering' : Domain.ENGINEERING,
'engineer' : Domain.ENGINEERING,
'technical_engineering' : Domain.ENGINEERING,
'science' : Domain.SCIENCE,
'scientific' : Domain.SCIENCE,
'research_science' : Domain.SCIENCE,
'business' : Domain.BUSINESS,
'corporate' : Domain.BUSINESS,
'commercial' : Domain.BUSINESS,
'enterprise' : Domain.BUSINESS,
'legal' : Domain.LEGAL,
'law' : Domain.LEGAL,
'contract' : Domain.LEGAL,
'court' : Domain.LEGAL,
'juridical' : Domain.LEGAL,
'medical' : Domain.MEDICAL,
'healthcare' : Domain.MEDICAL,
'clinical' : Domain.MEDICAL,
'medicine' : Domain.MEDICAL,
'health' : Domain.MEDICAL,
'journalism' : Domain.JOURNALISM,
'news' : Domain.JOURNALISM,
'reporting' : Domain.JOURNALISM,
'media' : Domain.JOURNALISM,
'press' : Domain.JOURNALISM,
'marketing' : Domain.MARKETING,
'advertising' : Domain.MARKETING,
'promotional' : Domain.MARKETING,
'brand' : Domain.MARKETING,
'sales' : Domain.MARKETING,
'social_media' : Domain.SOCIAL_MEDIA,
'social' : Domain.SOCIAL_MEDIA,
'casual' : Domain.SOCIAL_MEDIA,
'informal' : Domain.SOCIAL_MEDIA,
'posts' : Domain.SOCIAL_MEDIA,
'blog_personal' : Domain.BLOG_PERSONAL,
'blog' : Domain.BLOG_PERSONAL,
'personal' : Domain.BLOG_PERSONAL,
'diary' : Domain.BLOG_PERSONAL,
'lifestyle' : Domain.BLOG_PERSONAL,
'tutorial' : Domain.TUTORIAL,
'guide' : Domain.TUTORIAL,
'howto' : Domain.TUTORIAL,
'instructional' : Domain.TUTORIAL,
'educational' : Domain.TUTORIAL,
'walkthrough' : Domain.TUTORIAL,
}
normalized_domain = domain_str.lower().strip()
if normalized_domain in domain_mapping:
return domain_mapping[normalized_domain]
# Try to match with underscores/spaces variations
normalized_with_underscores = normalized_domain.replace(' ', '_')
if normalized_with_underscores in domain_mapping:
return domain_mapping[normalized_with_underscores]
# Try partial matching for more flexibility
for alias, domain_enum in domain_mapping.items():
if normalized_domain in alias or alias in normalized_domain:
return domain_enum
return None
def _validate_file_extension(filename: str) -> str:
"""
Validate file extension and return normalized extension
"""
file_extension = Path(filename).suffix.lower()
allowed_extensions = ['.txt',
'.pdf',
'.docx',
'.doc',
'.md',
]
if file_extension not in allowed_extensions:
raise HTTPException(status_code = 400,
detail = f"Unsupported file type. Allowed: {', '.join(allowed_extensions)}",
)
return file_extension
def _generate_reasoning(detection_result: DetectionResult, attribution_result: Optional[AttributionResult] = None) -> Dict[str, Any]:
"""
Generate detailed reasoning for detection results
"""
if not reasoning_generator:
return {}
try:
reasoning = reasoning_generator.generate(ensemble_result = detection_result.ensemble_result,
metric_results = detection_result.metric_results,
domain = detection_result.domain_prediction.primary_domain,
attribution_result = attribution_result,
text_length = detection_result.processed_text.word_count,
)
return safe_serialize_response(reasoning.to_dict())
except Exception as e:
logger.warning(f"Reasoning generation failed: {e}")
return {}
def _generate_reports(detection_result: DetectionResult, attribution_result: Optional[AttributionResult] = None,
highlighted_sentences: Optional[List] = None, analysis_id: str = None) -> Dict[str, str]:
"""
Generate reports for detection results
"""
if not reporter:
return {}
try:
report_files = reporter.generate_complete_report(detection_result = detection_result,
attribution_result = attribution_result,
highlighted_sentences = highlighted_sentences,
formats = ["json", "pdf"],
filename_prefix = analysis_id or f"report_{int(time.time() * 1000)}",
)
return report_files
except Exception as e:
logger.warning(f"Report generation failed: {e}")
return {}
# ==================== ROOT & HEALTH ENDPOINTS ====================
@app.get("/", response_class = HTMLResponse)
async def root():
"""
Serve the main web interface
"""
# Serve the updated index.html directly from the current directory
index_path = Path(__file__).parent / "index.html"
if index_path.exists():
with open(index_path, 'r', encoding='utf-8') as f:
return HTMLResponse(content=f.read())
# Fallback to static directory if exists
ui_static_path = Path(__file__).parent / "ui" / "static"
index_path = ui_static_path / "index.html"
if index_path.exists():
with open(index_path, 'r', encoding='utf-8') as f:
return HTMLResponse(content=f.read())
return HTMLResponse(content = """
<html>
<head><title>TEXT-AUTH API</title></head>
<body style="font-family: sans-serif; padding: 50px; text-align: center;">
<h1>🔍 TEXT-AUTH API</h1>
<p>AI Text Detection Platform v2.0</p>
<p><a href="/api/docs">API Documentation</a></p>
<p><a href="/health">Health Check</a></p>
</body>
</html>
"""
)
@app.get("/health", response_model = HealthCheckResponse)
async def health_check():
"""
Health check endpoint
"""
return HealthCheckResponse(status = "healthy" if all(initialization_status.values()) else "degraded",
version = "2.0.0",
uptime = time.time() - app_start_time,
models_loaded = initialization_status,
)
# ==================== ANALYSIS ENDPOINTS ====================
@app.post("/api/analyze", response_model = TextAnalysisResponse)
async def analyze_text(request: TextAnalysisRequest):
"""
Analyze text for AI generation
"""
if not orchestrator:
raise HTTPException(status_code=503, detail="Service not initialized")
start_time = time.time()
analysis_id = f"analysis_{int(time.time() * 1000)}"
try:
# Parse domain if provided
domain = _parse_domain(request.domain)
if (request.domain and not domain):
raise HTTPException(status_code = 400,
detail = f"Invalid domain. Valid options: {[d.value for d in Domain]}",
)
# Run detection analysis
logger.info(f"[{analysis_id}] Analyzing text ({len(request.text)} chars)")
detection_result = orchestrator.analyze(text = request.text,
domain = domain,
skip_expensive = request.skip_expensive_metrics,
)
# Convert detection result to ensure serializability
detection_dict = safe_serialize_response(detection_result.to_dict())
# Attribution (if enabled)
attribution_result = None
attribution_dict = None
if (request.enable_attribution and attributor):
try:
logger.info(f"[{analysis_id}] Running attribution...")
attribution_result = attributor.attribute(text = request.text,
processed_text = detection_result.processed_text,
metric_results = detection_result.metric_results,
domain = detection_result.domain_prediction.primary_domain,
)
attribution_dict = safe_serialize_response(attribution_result.to_dict())
except Exception as e:
logger.warning(f"Attribution failed: {e}")
# Highlighting (if enabled)
highlighted_sentences = None
highlighted_html = None
if request.enable_highlighting and highlighter:
try:
logger.info(f"[{analysis_id}] Generating highlights...")
highlighted_sentences = highlighter.generate_highlights(text = request.text,
metric_results = detection_result.metric_results,
ensemble_result = detection_result.ensemble_result,
use_sentence_level = request.use_sentence_level,
)
# Set include_legend=False to prevent duplicate legends
highlighted_html = highlighter.generate_html(highlighted_sentences = highlighted_sentences,
include_legend = False, # UI already has its own legend
include_metrics = request.include_metrics_summary,
)
except Exception as e:
logger.warning(f"Highlighting failed: {e}")
# Generate reasoning
reasoning_dict = _generate_reasoning(detection_result, attribution_result)
# Generate reports (if requested)
report_files = {}
if request.generate_report:
try:
logger.info(f"[{analysis_id}] Generating reports...")
report_files = _generate_reports(detection_result = detection_result,
attribution_result = attribution_result,
highlighted_sentences = highlighted_sentences,
analysis_id = analysis_id,
)
except Exception as e:
logger.warning(f"Report generation failed: {e}")
processing_time = time.time() - start_time
# Log the detection event
log_detection_event(analysis_id = analysis_id,
text_length = len(request.text),
verdict = detection_result.ensemble_result.final_verdict,
confidence = detection_result.ensemble_result.overall_confidence,
domain = detection_result.domain_prediction.primary_domain.value,
processing_time = processing_time,
enable_attribution = request.enable_attribution,
enable_highlighting = request.enable_highlighting,
)
return TextAnalysisResponse(status = "success",
analysis_id = analysis_id,
detection_result = detection_dict,
attribution = attribution_dict,
highlighted_html = highlighted_html,
reasoning = reasoning_dict,
report_files = report_files,
processing_time = processing_time,
timestamp = datetime.now().isoformat(),
)
except HTTPException:
central_logger.log_error("TextAnalysisError",
f"Analysis failed for request",
{"text_length": len(request.text)},
e,
)
raise
except Exception as e:
logger.error(f"[{analysis_id}] Analysis failed: {e}")
raise HTTPException(status_code = 500,
detail = str(e),
)
@app.post("/api/analyze/file", response_model = FileAnalysisResponse)
async def analyze_file(file: UploadFile = File(...), domain: Optional[str] = Form(None), enable_attribution: bool = Form(True), skip_expensive_metrics: bool = Form(False),
use_sentence_level: bool = Form(True), include_metrics_summary: bool = Form(True), generate_report: bool = Form(False)):
"""
Analyze uploaded document (PDF, DOCX, TXT)
"""
if not document_extractor or not orchestrator:
raise HTTPException(status_code=503, detail="Service not initialized")
start_time = time.time()
analysis_id = f"file_{int(time.time() * 1000)}"
try:
# Validate file
file_ext = _validate_file_extension(file.filename)
# Read and extract text
logger.info(f"[{analysis_id}] Extracting text from {file.filename}")
file_bytes = await file.read()
extracted_doc = document_extractor.extract_from_bytes(file_bytes = file_bytes,
filename = file.filename,
)
if not extracted_doc.is_success or not extracted_doc.text:
raise HTTPException(status_code = 400,
detail = f"Text extraction failed: {extracted_doc.error_message}"
)
logger.info(f"[{analysis_id}] Extracted {len(extracted_doc.text)} characters")
# Parse domain and analyze
domain_enum = _parse_domain(domain)
detection_result = orchestrator.analyze(text = extracted_doc.text,
domain = domain_enum,
skip_expensive = skip_expensive_metrics,
)
# Convert to serializable dict
detection_dict = safe_serialize_response(detection_result.to_dict())
# Attribution
attribution_result = None
attribution_dict = None
if (enable_attribution and attributor):
try:
attribution_result = attributor.attribute(text = extracted_doc.text,
processed_text = detection_result.processed_text,
metric_results = detection_result.metric_results,
domain = detection_result.domain_prediction.primary_domain,
)
attribution_dict = safe_serialize_response(attribution_result.to_dict())
except Exception as e:
logger.warning(f"Attribution failed: {e}")
# Highlighting
highlighted_sentences = None
highlighted_html = None
if highlighter:
try:
highlighted_sentences = highlighter.generate_highlights(text = extracted_doc.text,
metric_results = detection_result.metric_results,
ensemble_result = detection_result.ensemble_result,
use_sentence_level = use_sentence_level,
)
# Set include_legend=False to prevent duplicate legends
highlighted_html = highlighter.generate_html(highlighted_sentences = highlighted_sentences,
include_legend = False, # UI already has its own legend
include_metrics = include_metrics_summary,
)
except Exception as e:
logger.warning(f"Highlighting failed: {e}")
# Generate reasoning
reasoning_dict = _generate_reasoning(detection_result, attribution_result)
# Generate reports (if requested)
report_files = dict()
if generate_report:
try:
logger.info(f"[{analysis_id}] Generating reports...")
report_files = _generate_reports(detection_result = detection_result,
attribution_result = attribution_result,
highlighted_sentences = highlighted_sentences,
analysis_id = analysis_id,
)
except Exception as e:
logger.warning(f"Report generation failed: {e}")
processing_time = time.time() - start_time
return FileAnalysisResponse(status = "success",
analysis_id = analysis_id,
file_info = {"filename" : file.filename,
"file_type" : file_ext,
"pages" : extracted_doc.page_count,
"extraction_method" : extracted_doc.extraction_method,
"highlighted_html" : highlighted_html is not None,
},
detection_result = detection_dict,
attribution = attribution_dict,
highlighted_html = highlighted_html,
reasoning = reasoning_dict,
report_files = report_files,
processing_time = processing_time,
timestamp = datetime.now().isoformat(),
)
except HTTPException:
raise
except Exception as e:
logger.error(f"[{analysis_id}] File analysis failed: {e}")
raise HTTPException(status_code = 500,
detail = str(e),
)
@app.post("/api/analyze/batch", response_model = BatchAnalysisResponse)
async def batch_analyze(request: BatchAnalysisRequest):
"""
Analyze multiple texts in batch
Limits : 1-100 texts per request
"""
if not orchestrator:
raise HTTPException(status_code = 503,
detail = "Service not initialized",
)
if (len(request.texts) > 100):
raise HTTPException(status_code = 400,
detail = "Maximum 100 texts per batch",
)
start_time = time.time()
batch_id = f"batch_{int(time.time() * 1000)}"
try:
# Parse domain
domain = _parse_domain(request.domain)
logger.info(f"[{batch_id}] Processing {len(request.texts)} texts")
results = []
for i, text in enumerate(request.texts):
try:
detection_result = orchestrator.analyze(text = text,
domain = domain,
skip_expensive = request.skip_expensive_metrics,
)
# Convert to serializable dict
detection_dict = safe_serialize_response(detection_result.to_dict())
# Attribution if enabled
attribution_result = None
attribution_dict = None
if request.enable_attribution and attributor:
try:
attribution_result = attributor.attribute(text = text,
processed_text = detection_result.processed_text,
metric_results = detection_result.metric_results,
domain = detection_result.domain_prediction.primary_domain,
)
attribution_dict = safe_serialize_response(attribution_result.to_dict())
except Exception:
pass
# Generate reasoning
reasoning_dict = _generate_reasoning(detection_result, attribution_result)
# Generate reports if requested
report_files = {}
if request.generate_reports:
try:
report_files = _generate_reports(detection_result = detection_result,
attribution_result = attribution_result,
analysis_id = f"{batch_id}_{i}"
)
except Exception:
pass
results.append(BatchAnalysisResult(index = i,
status = "success",
detection = detection_dict,
attribution = attribution_dict,
reasoning = reasoning_dict,
report_files = report_files,
)
)
except Exception as e:
logger.error(f"[{batch_id}] Text {i} failed: {e}")
results.append(BatchAnalysisResult(index = i,
status = "error",
error = str(e),
)
)
processing_time = time.time() - start_time
success_count = sum(1 for r in results if r.status == "success")
logger.success(f"[{batch_id}] Batch complete: {success_count}/{len(request.texts)} successful")
return BatchAnalysisResponse(status = "success",
batch_id = batch_id,
total = len(request.texts),
successful = success_count,
failed = len(request.texts) - success_count,
results = results,
processing_time = processing_time,
timestamp = datetime.now().isoformat(),
)
except Exception as e:
logger.error(f"[{batch_id}] Batch analysis failed: {e}")
raise HTTPException(status_code = 500,
detail = str(e),
)
# ==================== REPORT GENERATION ENDPOINTS ====================
@app.post("/api/report/generate", response_model = ReportGenerationResponse)
async def generate_report(background_tasks: BackgroundTasks, analysis_id: str = Form(...), text: str = Form(...), formats: str = Form("json,pdf"),
include_highlights: bool = Form(True)):
"""
Generate detailed report for an analysis
"""
if not orchestrator or not reporter:
raise HTTPException(status_code=503, detail="Service not initialized")
try:
# Parse formats
requested_formats = [f.strip() for f in formats.split(',')]
valid_formats = ['json', 'pdf'] # Only JSON and PDF supported now
for fmt in requested_formats:
if fmt not in valid_formats:
raise HTTPException(status_code = 400,
detail = f"Invalid format '{fmt}'. Valid: {', '.join(valid_formats)}",
)
# Analyze text
logger.info(f"Generating report for {analysis_id}")
detection_result = orchestrator.analyze(text = text)
# Attribution
attribution_result = None
if attributor:
try:
attribution_result = attributor.attribute(text = text,
processed_text = detection_result.processed_text,
metric_results = detection_result.metric_results,
domain = detection_result.domain_prediction.primary_domain,
)
except Exception as e:
logger.warning(f"Attribution failed: {e}")
# Generate highlights for PDF reports if requested
highlighted_sentences = None
if (include_highlights and highlighter and 'pdf' in requested_formats):
try:
highlighted_sentences = highlighter.generate_highlights(text = text,
metric_results = detection_result.metric_results,
ensemble_result = detection_result.ensemble_result,
)
except Exception as e:
logger.warning(f"Highlight generation for report failed: {e}")
# Generate reports
report_files = reporter.generate_complete_report(detection_result = detection_result,
attribution_result = attribution_result,
highlighted_sentences = highlighted_sentences,
formats = requested_formats,
filename_prefix = analysis_id,
)
# Extract only the filename from the full path for the response
report_filenames = dict()
for fmt, full_path in report_files.items():
# Get the filename part
report_filenames[fmt] = Path(full_path).name
return ReportGenerationResponse(status = "success",
analysis_id = analysis_id,
reports = report_filenames,
timestamp = datetime.now().isoformat(),
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Report generation failed: {e}")
raise HTTPException(status_code = 500,
detail = str(e),
)
@app.get("/api/report/download/{filename}")
async def download_report(filename: str):
"""
Download a generated report
"""
if not reporter:
raise HTTPException(status_code = 503,
detail = "Service not initialized",
)
file_path = reporter.output_dir / filename
if not file_path.exists():
raise HTTPException(status_code = 404,
detail = "Report not found",
)
return FileResponse(path = str(file_path),
filename = filename,
media_type = "application/octet-stream",
)
# ==================== UTILITY ENDPOINTS ====================
@app.get("/api/domains")
async def list_domains():
"""
List all supported domains
"""
domains_list = list()
for domain in Domain:
domains_list.append({"value" : domain.value,
"name" : domain.value.replace('_', ' ').title(),
"description" : _get_domain_description(domain),
})
return {"domains": domains_list}
@app.get("/api/models")
async def list_ai_models():
"""
List all AI models that can be attributed
"""
return {"models" : [{"value" : model.value,
"name" : model.value.replace('-', ' ').replace('_', ' ').title(),
}
for model in AIModel if model not in [AIModel.HUMAN, AIModel.UNKNOWN]
]
}
# ==================== ERROR HANDLERS ====================
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
"""
Handle HTTP exceptions
"""
return NumpyJSONResponse(status_code = exc.status_code,
content = ErrorResponse(status = "error",
error = exc.detail,
timestamp = datetime.now().isoformat(),
).dict()
)
@app.exception_handler(Exception)
async def general_exception_handler(request, exc):
"""
Handle general exceptions
"""
logger.error(f"Unhandled exception: {exc}")
return NumpyJSONResponse(status_code = 500,
content = ErrorResponse(status = "error",
error = "Internal server error",
timestamp = datetime.now().isoformat(),
).dict()
)
# Add middleware for API request logging
@app.middleware("http")
async def log_requests(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
log_api_request(method = request.method,
path = request.url.path,
status_code = response.status_code,
duration = process_time,
ip = request.client.host if request.client else None,
)
return response
# ==================== MAIN ====================
if __name__ == "__main__":
# Configure logging
log_level = settings.LOG_LEVEL.lower()
logger.info("Starting TEXT-AUTH API Server...")
uvicorn.run("text_auth_app:app",
host = settings.HOST,
port = settings.PORT,
reload = settings.DEBUG,
log_level = log_level,
workers = 1 if settings.DEBUG else settings.WORKERS,
)