Spaces:
Sleeping
Sleeping
| """Tests for /api/analyze endpoint.""" | |
| from __future__ import annotations | |
| import io | |
| import struct | |
| import math | |
| from fastapi.testclient import TestClient | |
| def test_analyze_missing_source_type( | |
| client: TestClient, | |
| ) -> None: | |
| """Reject request without sourceType.""" | |
| response = client.post("/api/analyze") | |
| assert response.status_code == 422 | |
| def test_analyze_invalid_source_type( | |
| client: TestClient, | |
| ) -> None: | |
| """Return error for unknown sourceType.""" | |
| response = client.post( | |
| "/api/analyze", | |
| data={"sourceType": "unknown"}, | |
| ) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert "invalid_source_type" in data["errors"] | |
| assert data["result"] is None | |
| def test_analyze_unsupported_source( | |
| client: TestClient, | |
| ) -> None: | |
| """Return unsupported_source for spotify/apple.""" | |
| for source in ("spotify", "apple"): | |
| response = client.post( | |
| "/api/analyze", | |
| data={"sourceType": source}, | |
| ) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert "unsupported_source" in data["errors"] | |
| def test_analyze_youtube_missing_url( | |
| client: TestClient, | |
| ) -> None: | |
| """YouTube without url returns missing_url.""" | |
| response = client.post( | |
| "/api/analyze", | |
| data={"sourceType": "youtube"}, | |
| ) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert "missing_url" in data["errors"] | |
| def test_analyze_file_missing_file( | |
| client: TestClient, | |
| ) -> None: | |
| """File without upload returns missing_file.""" | |
| response = client.post( | |
| "/api/analyze", | |
| data={"sourceType": "file"}, | |
| ) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert "missing_file" in data["errors"] | |
| def test_analyze_file_invalid_type( | |
| client: TestClient, | |
| ) -> None: | |
| """Reject non-audio file.""" | |
| fake_file = io.BytesIO(b"not audio content " * 100) | |
| response = client.post( | |
| "/api/analyze", | |
| data={"sourceType": "file"}, | |
| files={ | |
| "file": ("test.txt", fake_file, "text/plain"), | |
| }, | |
| ) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert "invalid_file_type" in data["errors"] | |
| def test_analyze_file_too_small( | |
| client: TestClient, | |
| ) -> None: | |
| """Reject files smaller than 1KB.""" | |
| tiny_file = io.BytesIO(b"\x00" * 100) | |
| response = client.post( | |
| "/api/analyze", | |
| data={"sourceType": "file"}, | |
| files={ | |
| "file": ("tiny.wav", tiny_file, "audio/wav"), | |
| }, | |
| ) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert "file_too_small" in data["errors"] | |
| def test_analyze_file_fallback_on_bad_audio( | |
| client: TestClient, | |
| ) -> None: | |
| """Non-parseable audio falls back to preview model.""" | |
| audio_content = b"\x00" * 2048 | |
| fake_audio = io.BytesIO(audio_content) | |
| response = client.post( | |
| "/api/analyze", | |
| data={"sourceType": "file"}, | |
| files={ | |
| "file": ("test.wav", fake_audio, "audio/wav"), | |
| }, | |
| ) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert data["result"] is not None | |
| result = data["result"] | |
| assert "isAIGenerated" in result | |
| assert "confidence" in result | |
| # Bad audio triggers fallback to preview | |
| assert result["analysisMode"] == "preview" | |
| assert result["decisionSource"] == "preview" | |
| def _make_wav( | |
| duration_sec: float = 3.0, | |
| freq_hz: float = 440.0, | |
| sample_rate: int = 22050, | |
| ) -> bytes: | |
| """ | |
| Generate a minimal valid WAV file with a sine tone. | |
| Returns raw bytes of a valid WAV. | |
| """ | |
| n_samples = int(sample_rate * duration_sec) | |
| samples = [] | |
| for i in range(n_samples): | |
| t = i / sample_rate | |
| val = math.sin(2 * math.pi * freq_hz * t) | |
| samples.append(int(val * 32767)) | |
| # WAV header (16-bit mono PCM) | |
| data_size = n_samples * 2 | |
| buf = io.BytesIO() | |
| # RIFF header | |
| buf.write(b"RIFF") | |
| buf.write(struct.pack("<I", 36 + data_size)) | |
| buf.write(b"WAVE") | |
| # fmt chunk | |
| buf.write(b"fmt ") | |
| buf.write(struct.pack("<I", 16)) # chunk size | |
| buf.write(struct.pack("<H", 1)) # PCM | |
| buf.write(struct.pack("<H", 1)) # mono | |
| buf.write(struct.pack("<I", sample_rate)) | |
| buf.write(struct.pack("<I", sample_rate * 2)) # byte rate | |
| buf.write(struct.pack("<H", 2)) # block align | |
| buf.write(struct.pack("<H", 16)) # bits per sample | |
| # data chunk | |
| buf.write(b"data") | |
| buf.write(struct.pack("<I", data_size)) | |
| for s in samples: | |
| buf.write(struct.pack("<h", s)) | |
| return buf.getvalue() | |
| def test_analyze_file_real_audio( | |
| client: TestClient, | |
| ) -> None: | |
| """ | |
| Real WAV audio triggers the full analysis pipeline. | |
| Expects production mode with feature extraction | |
| and score fusion (not preview fallback). | |
| """ | |
| wav_bytes = _make_wav(duration_sec=3.0, freq_hz=440.0) | |
| audio_file = io.BytesIO(wav_bytes) | |
| response = client.post( | |
| "/api/analyze", | |
| data={"sourceType": "file"}, | |
| files={ | |
| "file": ( | |
| "tone.wav", audio_file, "audio/wav", | |
| ), | |
| }, | |
| ) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert data["result"] is not None | |
| result = data["result"] | |
| # Should use real analysis, not preview fallback | |
| assert result["analysisMode"] == "production" | |
| assert result["decisionSource"].startswith("auris_") | |
| assert result["modelVersion"] == "auris-v1-fusion" | |
| # Confidence in valid range | |
| assert 0.51 <= result["confidence"] <= 0.97 | |
| # Features should have real values | |
| feat = result["features"] | |
| assert 0.0 <= feat["spectralRegularity"] <= 0.99 | |
| assert 0.0 <= feat["temporalPatterns"] <= 0.99 | |
| assert 0.0 <= feat["harmonicStructure"] <= 0.99 | |
| # Audio info should reflect our input | |
| info = result["audioInfo"] | |
| assert info["duration"] > 2.0 | |
| assert info["sampleRate"] == 22050 | |
| # Indicators should be populated | |
| assert len(feat["artificialIndicators"]) > 0 | |
| def test_analyze_file_real_audio_has_processing_time( | |
| client: TestClient, | |
| ) -> None: | |
| """Processing time should be realistic.""" | |
| wav_bytes = _make_wav(duration_sec=2.0, freq_hz=220.0) | |
| audio_file = io.BytesIO(wav_bytes) | |
| response = client.post( | |
| "/api/analyze", | |
| data={"sourceType": "file"}, | |
| files={ | |
| "file": ( | |
| "test2.wav", audio_file, "audio/wav", | |
| ), | |
| }, | |
| ) | |
| data = response.json() | |
| assert data["result"] is not None | |
| # Should complete in reasonable time | |
| assert data["result"]["processingTime"] < 30.0 | |
| assert data["result"]["processingTime"] > 0.0 | |