crowncode-backend / app /services /youtube_analysis.py
Rthur2003's picture
fix: improve error handling and logging in YouTubeAnalysisService
c5dd8f6
"""
YouTube analysis orchestration for CrownCode with enhanced logging.
"""
from __future__ import annotations
import asyncio
import os
from pathlib import Path
import tempfile
import time
import uuid
from typing import List
from .external_clients import ClientResponse, MusicAIDetectorClient, SesAnaliziClient
from .preview_model import create_preview_result
from .url_parser import parse_youtube_url
from .youtube_downloader import YouTubeDownloadError, YouTubeDownloader
from .logging_config import get_logger
from ..schemas import AnalysisSummary, ServiceResult, YouTubeAnalyzeResponse, YouTubeSource
logger = get_logger(__name__)
def _unique_strings(values: List[str]) -> List[str]:
return list(dict.fromkeys(values))
def _download_error_codes(error_code: str) -> List[str]:
if error_code == "youtube_authentication_required":
return ["youtube_authentication_required", "youtube_analysis_failed"]
return ["youtube_analysis_failed"]
def _preview_summary(video_id: str, warnings: List[str]) -> AnalysisSummary:
result = create_preview_result(video_id, warnings)
return AnalysisSummary(
is_ai_generated=result["is_ai_generated"],
confidence=result["confidence"],
decision_source=result["decision_source"],
model_version=result["model_version"],
indicators=result["indicators"],
)
class YouTubeAnalysisService:
def __init__(self) -> None:
timeout_sec = float(os.getenv("CROWNCODE_API_TIMEOUT_SEC", "30"))
self.music_ai = MusicAIDetectorClient(timeout_sec=timeout_sec)
self.ses_analizi = SesAnaliziClient(timeout_sec=timeout_sec)
self.auth_threshold = float(os.getenv("SES_ANALIZI_THRESHOLD", "0.5"))
async def analyze(self, url: str, include_raw: bool = False) -> YouTubeAnalyzeResponse:
request_id = uuid.uuid4().hex
logger.info(f"Starting analysis for request {request_id}")
warnings: List[str] = []
errors: List[str] = []
timings = {"download_sec": 0.0, "analysis_sec": 0.0, "total_sec": 0.0}
start_total = time.monotonic()
try:
parsed = parse_youtube_url(url)
logger.debug(f"Parsed URL - video_id: {parsed.video_id}")
except ValueError as exc:
logger.warning(f"URL parsing failed: {exc}")
raise
with tempfile.TemporaryDirectory() as tmp_dir:
downloader = YouTubeDownloader(output_dir=Path(tmp_dir))
start_download = time.monotonic()
try:
download_result = downloader.download(parsed.normalized_url, parsed.video_id)
logger.info(f"Download completed in {time.monotonic() - start_download:.2f}s")
except YouTubeDownloadError as exc:
logger.error(f"Download failed ({exc.error_code}): {exc}")
warnings.extend(exc.warnings)
errors.extend(_download_error_codes(exc.error_code))
timings["total_sec"] = round(time.monotonic() - start_total, 4)
summary = _preview_summary(parsed.video_id, _unique_strings(warnings))
source = YouTubeSource(
url=url,
normalized_url=parsed.normalized_url,
video_id=parsed.video_id,
start_time_sec=parsed.start_time_sec,
)
return YouTubeAnalyzeResponse(
request_id=request_id,
status="partial",
source=source,
summary=summary,
music_ai=ServiceResult(available=False, response=None, error=exc.error_code),
ses_analizi=ServiceResult(available=False, response=None, error=exc.error_code),
warnings=_unique_strings(warnings),
errors=_unique_strings(errors),
timings=timings,
)
except Exception as exc:
logger.error(f"Download failed: {exc}")
errors.append("youtube_analysis_failed")
timings["total_sec"] = round(time.monotonic() - start_total, 4)
summary = _preview_summary(parsed.video_id, _unique_strings(warnings))
source = YouTubeSource(
url=url,
normalized_url=parsed.normalized_url,
video_id=parsed.video_id,
start_time_sec=parsed.start_time_sec,
)
return YouTubeAnalyzeResponse(
request_id=request_id,
status="partial",
source=source,
summary=summary,
music_ai=ServiceResult(available=False, response=None, error="download_failed"),
ses_analizi=ServiceResult(available=False, response=None, error="download_failed"),
warnings=_unique_strings(warnings),
errors=_unique_strings(errors),
timings=timings,
)
timings["download_sec"] = round(time.monotonic() - start_download, 4)
warnings.extend(download_result.warnings)
start_analysis = time.monotonic()
audio_ext = download_result.file_path.suffix.lower()
music_supported = audio_ext in {".mp3", ".wav", ".flac", ".ogg", ".m4a"}
ses_supported = audio_ext in {".mp3", ".wav", ".flac", ".ogg", ".m4a", ".webm", ".opus"}
logger.debug(f"Audio format: {audio_ext}, music_ai: {music_supported}, ses_analizi: {ses_supported}")
music_ai_result = (
ClientResponse(available=False, response=None, error="music_ai_unsupported_format")
if not music_supported
else None
)
ses_result = (
ClientResponse(available=False, response=None, error="ses_analizi_unsupported_format")
if not ses_supported
else None
)
music_task = asyncio.create_task(self.music_ai.predict(download_result.file_path)) if music_supported else None
ses_task = asyncio.create_task(self.ses_analizi.analyze(download_result.file_path)) if ses_supported else None
if music_task and ses_task:
music_ai_result, ses_result = await asyncio.gather(music_task, ses_task)
elif music_task:
music_ai_result = await music_task
elif ses_task:
ses_result = await ses_task
if music_ai_result is None:
music_ai_result = ClientResponse(available=False, response=None, error="music_ai_unavailable")
if ses_result is None:
ses_result = ClientResponse(available=False, response=None, error="ses_analizi_unavailable")
timings["analysis_sec"] = round(time.monotonic() - start_analysis, 4)
logger.info(f"Analysis completed in {timings['analysis_sec']}s")
if not music_ai_result.available:
if music_ai_result.error == "music_ai_unsupported_format":
warnings.append("music_ai_unsupported_format")
else:
warnings.append("music_ai_unavailable")
elif music_ai_result.error:
warnings.append("music_ai_failed")
if not ses_result.available:
if ses_result.error == "ses_analizi_unsupported_format":
warnings.append("ses_analizi_unsupported_format")
else:
warnings.append("ses_analizi_unavailable")
elif ses_result.error:
warnings.append("ses_analizi_failed")
warnings = _unique_strings(warnings)
summary = self._build_summary(music_ai_result, ses_result, parsed.video_id, warnings)
timings["total_sec"] = round(time.monotonic() - start_total, 4)
logger.info(f"Request {request_id} completed in {timings['total_sec']}s")
if music_ai_result.error and music_ai_result.error not in {"music_ai_not_configured", "music_ai_unsupported_format"}:
errors.append(music_ai_result.error)
if ses_result.error and ses_result.error not in {"ses_analizi_not_configured", "ses_analizi_unsupported_format"}:
errors.append(ses_result.error)
errors = _unique_strings(errors)
status = "ok" if not errors else "partial"
source = YouTubeSource(
url=url,
normalized_url=parsed.normalized_url,
video_id=parsed.video_id,
start_time_sec=parsed.start_time_sec,
title=download_result.title,
duration_sec=download_result.duration_sec,
audio_format=download_result.audio_format,
)
music_payload = music_ai_result.response if include_raw else None
ses_payload = ses_result.response if include_raw else None
return YouTubeAnalyzeResponse(
request_id=request_id,
status=status,
source=source,
summary=summary,
music_ai=ServiceResult(
available=music_ai_result.available,
response=music_payload,
error=music_ai_result.error,
),
ses_analizi=ServiceResult(
available=ses_result.available,
response=ses_payload,
error=ses_result.error,
),
warnings=warnings,
errors=errors,
timings=timings,
)
def _build_summary(self, music_ai, ses_result, video_id: str, warnings: List[str]) -> AnalysisSummary:
if music_ai.response and isinstance(music_ai.response, dict):
prediction = music_ai.response.get("prediction")
confidence = music_ai.response.get("confidence")
if prediction in {"AI", "Human"} and isinstance(confidence, (int, float)):
indicators = [
"Decision based on Music-AI Detector response.",
f"Prediction: {prediction}",
]
return AnalysisSummary(
is_ai_generated=prediction == "AI",
confidence=float(confidence),
decision_source="music_ai",
model_version="music-ai-detector",
indicators=indicators,
)
if ses_result.response and isinstance(ses_result.response, dict):
authenticity = ses_result.response.get("authenticity_score")
if isinstance(authenticity, (int, float)):
is_ai = float(authenticity) >= self.auth_threshold
indicators = [
"Decision based on Ses-Analizi authenticity score.",
f"Authenticity score: {float(authenticity):.3f}",
]
return AnalysisSummary(
is_ai_generated=is_ai,
confidence=float(authenticity),
decision_source="ses_analizi",
model_version="ses-analizi-authenticity",
indicators=indicators,
)
return _preview_summary(video_id, warnings)