""" Routes for data processing and manipulation (Audio/Image). """ import time from collections import defaultdict from fastapi import APIRouter, File, UploadFile, Form, HTTPException, Request, Depends, status from fastapi.responses import StreamingResponse from pydantic import ValidationError import json import logging from app.schemas import AudioAugmentationOptions from app.services.audio_processor import process_audio router = APIRouter(prefix="/api/process", tags=["Data Processing"]) logger = logging.getLogger(__name__) # Rate limiter for heavy processing endpoints _process_rate_store: dict[str, list[float]] = defaultdict(list) _PROCESS_RATE_WINDOW = 60 _PROCESS_RATE_MAX = 10 _PROCESS_MAX_IPS = 10_000 async def _process_rate_limit(request: Request) -> None: client_ip = request.client.host if request.client else "unknown" now = time.time() cutoff = now - _PROCESS_RATE_WINDOW if len(_process_rate_store) > _PROCESS_MAX_IPS: stale = [ip for ip, ts in _process_rate_store.items() if all(t <= cutoff for t in ts)] for ip in stale: del _process_rate_store[ip] hits = [t for t in _process_rate_store[client_ip] if t > cutoff] if len(hits) >= _PROCESS_RATE_MAX: raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail={"code": "rate_limit_exceeded", "message": "Too many processing requests. Please wait."} ) hits.append(now) _process_rate_store[client_ip] = hits @router.post("/audio", dependencies=[Depends(_process_rate_limit)]) async def process_audio_endpoint( file: UploadFile = File(...), options: str = Form(default="{}") ): """ Process an audio file with the given augmentation options. Returns the processed WAV file. options is a JSON string; if missing or empty, defaults to all-off. """ MAX_PAYLOAD_BYTES = 30 * 1024 * 1024 # 30 MB logger.info(f"Received audio processing request for file: {file.filename}") # Parse options JSON string into validated Pydantic model raw_options = options.strip() if options else "{}" if not raw_options: raw_options = "{}" try: parsed_options = AudioAugmentationOptions.model_validate_json(raw_options) except (ValidationError, json.JSONDecodeError) as e: raise HTTPException( status_code=422, detail={"code": "invalid_options", "message": f"Invalid options format: {e}"} ) if not file.content_type or not file.content_type.startswith("audio/"): raise HTTPException(status_code=400, detail={"code": "invalid_file_type", "message": "Invalid file type. Must be audio."}) try: # Read file in chunks to enforce size limit before full allocation chunks = [] total_read = 0 chunk_size = 1024 * 1024 # 1 MB chunks while True: chunk = await file.read(chunk_size) if not chunk: break total_read += len(chunk) if total_read > MAX_PAYLOAD_BYTES: raise HTTPException(status_code=413, detail={"code": "file_too_large", "message": f"File too large. Maximum size is {MAX_PAYLOAD_BYTES // (1024*1024)} MB."}) chunks.append(chunk) content = b"".join(chunks) # Process audio processed_audio = process_audio(content, parsed_options) # Return as downloadable file filename = f"processed_{file.filename}.wav" return StreamingResponse( processed_audio, media_type="audio/wav", headers={"Content-Disposition": f"attachment; filename={filename}"} ) except ValueError as e: raise HTTPException(status_code=400, detail={"code": "validation_error", "message": str(e)}) except Exception as e: logger.error(f"Unexpected error in audio processing: {e}", exc_info=True) raise HTTPException(status_code=500, detail={"code": "internal_error", "message": "Internal server error during audio processing"})