crowncode-backend / app /services /vocal_analyzer.py
Rthur2003's picture
fix: add ffmpeg decoding fallback for audio loading in analyze_vocals
4718070
"""
Vocal analysis for AI music detection.
Separates vocals from instruments and analyzes vocal characteristics
that distinguish AI-generated singing from real human vocals.
Key detection signals:
- Formant consistency (AI has unnaturally smooth or irregular formants)
- Pitch micro-variation (humans have 5-20 cent natural jitter)
- Breath patterns (AI either omits or over-regularizes breath sounds)
- Vibrato regularity (AI vibrato is mathematically perfect)
"""
from __future__ import annotations
import io
import subprocess
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional, Union
import numpy as np
import librosa
from .logging_config import get_logger
logger = get_logger(__name__)
# ── Constants ────────────────────────────────────────────────────────────
_TARGET_SR = 22050
_HOP_LENGTH = 512
_DURATION_LIMIT = 60.0
_MIN_VOCAL_ENERGY = 1e-5 # Threshold for "vocals present"
@dataclass
class VocalFeatures:
"""Vocal-specific analysis results."""
has_vocals: bool
vocal_confidence: float # 0.0-1.0, how confident we are vocals exist
vocal_ai_score: float # 0.0-1.0, overall vocal AI likelihood
# Sub-scores
pitch_stability_score: float # High = unnaturally stable = AI-like
vibrato_regularity_score: float
formant_consistency_score: float
breath_pattern_score: float
vocal_texture_score: float
# Raw metrics
pitch_mean_hz: float
pitch_std_cents: float # Standard deviation of pitch in cents
vibrato_rate_hz: float
vibrato_extent_cents: float
vocal_harmonic_ratio: float
vocal_energy_ratio: float # vocal energy / total energy
indicators: list[str] = field(default_factory=list)
def analyze_vocals(
source: Union[Path, bytes, io.BytesIO],
*,
sr: Optional[int] = None,
) -> VocalFeatures:
"""
Analyze vocal characteristics of an audio source.
Uses harmonic-percussive-vocal separation and pitch tracking
to identify AI-generated vocal patterns.
Args:
source: Audio file path, bytes, or BytesIO.
sr: Target sample rate.
Returns:
VocalFeatures with scores and raw metrics.
"""
target_sr = sr or _TARGET_SR
y, actual_sr = _load_audio(source, target_sr)
duration_sec = len(y) / actual_sr
logger.info(f"Vocal analysis: {duration_sec:.1f}s audio @ {actual_sr}Hz")
# ── Step 1: Separate vocals from accompaniment ───────────────────
y_vocal, y_accompaniment = _separate_vocals(y, actual_sr)
# ── Step 2: Check if vocals are present ──────────────────────────
vocal_energy = float(np.sum(y_vocal ** 2))
total_energy = float(np.sum(y ** 2))
vocal_energy_ratio = vocal_energy / (total_energy + 1e-10)
has_vocals = vocal_energy_ratio > 0.05 # At least 5% vocal energy
if not has_vocals:
logger.info("No significant vocals detected")
return VocalFeatures(
has_vocals=False,
vocal_confidence=vocal_energy_ratio,
vocal_ai_score=0.0,
pitch_stability_score=0.0,
vibrato_regularity_score=0.0,
formant_consistency_score=0.0,
breath_pattern_score=0.0,
vocal_texture_score=0.0,
pitch_mean_hz=0.0,
pitch_std_cents=0.0,
vibrato_rate_hz=0.0,
vibrato_extent_cents=0.0,
vocal_harmonic_ratio=0.0,
vocal_energy_ratio=vocal_energy_ratio,
indicators=["No significant vocal content detected in audio."],
)
# ── Step 3: Pitch tracking on vocal ──────────────────────────────
pitch_data = _analyze_pitch(y_vocal, actual_sr)
# ── Step 4: Vibrato analysis ─────────────────────────────────────
vibrato_data = _analyze_vibrato(pitch_data["f0_hz"], actual_sr)
# ── Step 5: Formant analysis (via spectral envelope) ─────────────
formant_data = _analyze_formants(y_vocal, actual_sr)
# ── Step 6: Breath / micro-silence detection ─────────────────────
breath_data = _analyze_breath_patterns(y_vocal, actual_sr)
# ── Step 7: Vocal texture (harmonic richness of vocal) ───────────
texture_data = _analyze_vocal_texture(y_vocal, actual_sr)
# ── Step 8: Compute sub-scores ───────────────────────────────────
pitch_score = _score_pitch_stability(pitch_data)
vibrato_score = _score_vibrato_regularity(vibrato_data)
formant_score = _score_formant_consistency(formant_data)
breath_score = _score_breath_patterns(breath_data)
texture_score = _score_vocal_texture(texture_data)
# ── Step 9: Overall vocal AI score ───────────────────────────────
vocal_ai_score = (
pitch_score * 0.25
+ vibrato_score * 0.20
+ formant_score * 0.25
+ breath_score * 0.15
+ texture_score * 0.15
)
vocal_ai_score = round(max(0.0, min(0.99, vocal_ai_score)), 3)
# ── Step 10: Build indicators ────────────────────────────────────
indicators = _build_vocal_indicators(
vocal_ai_score, pitch_score, vibrato_score,
formant_score, breath_score, pitch_data
)
return VocalFeatures(
has_vocals=True,
vocal_confidence=min(1.0, vocal_energy_ratio * 5),
vocal_ai_score=vocal_ai_score,
pitch_stability_score=round(pitch_score, 3),
vibrato_regularity_score=round(vibrato_score, 3),
formant_consistency_score=round(formant_score, 3),
breath_pattern_score=round(breath_score, 3),
vocal_texture_score=round(texture_score, 3),
pitch_mean_hz=pitch_data["f0_mean"],
pitch_std_cents=pitch_data["f0_std_cents"],
vibrato_rate_hz=vibrato_data["rate_hz"],
vibrato_extent_cents=vibrato_data["extent_cents"],
vocal_harmonic_ratio=texture_data["vocal_harmonic_ratio"],
vocal_energy_ratio=vocal_energy_ratio,
indicators=indicators,
)
# ═══════════════════════════════════════════════════════════════════════
# PRIVATE — Audio loading
# ═══════════════════════════════════════════════════════════════════════
def _ffmpeg_decode(data: bytes) -> io.BytesIO:
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
tmp_path = tmp.name
try:
result = subprocess.run(
["ffmpeg", "-y", "-i", "pipe:0", "-ar", "22050", "-ac", "1", "-f", "wav", tmp_path],
input=data, capture_output=True, timeout=30,
)
if result.returncode != 0:
raise RuntimeError(f"ffmpeg failed: {result.stderr.decode()[:200]}")
with open(tmp_path, "rb") as f:
return io.BytesIO(f.read())
finally:
Path(tmp_path).unlink(missing_ok=True)
def _load_audio(
source: Union[Path, bytes, io.BytesIO], target_sr: int
) -> tuple[np.ndarray, int]:
if isinstance(source, bytes):
source = io.BytesIO(source)
if isinstance(source, io.BytesIO):
raw_bytes = source.read()
source = io.BytesIO(raw_bytes)
else:
raw_bytes = None
try:
y, sr = librosa.load(source, sr=target_sr, mono=True, duration=_DURATION_LIMIT)
except Exception:
if raw_bytes is None:
raise
y, sr = librosa.load(_ffmpeg_decode(raw_bytes), sr=target_sr, mono=True, duration=_DURATION_LIMIT)
if len(y) < sr:
raise ValueError("Audio too short for vocal analysis (< 1s)")
return y, sr
# ═══════════════════════════════════════════════════════════════════════
# PRIVATE — Vocal separation
# ═══════════════════════════════════════════════════════════════════════
def _separate_vocals(y: np.ndarray, sr: int) -> tuple[np.ndarray, np.ndarray]:
"""
Separate vocals from accompaniment using harmonic-percussive
source separation with spectral masking.
This is a lightweight alternative to Demucs/Spleeter that works
without GPU or large model downloads. For production, replace
with Demucs for better quality.
"""
# HPSS to get harmonic component (vocals + melodic instruments)
y_harmonic, y_percussive = librosa.effects.hpss(y, margin=3.0)
# Use spectral masking to isolate vocal frequency range (80Hz-4kHz)
S = librosa.stft(y_harmonic, n_fft=2048, hop_length=_HOP_LENGTH)
freqs = librosa.fft_frequencies(sr=sr, n_fft=2048)
# Vocal frequency mask
vocal_mask = np.zeros_like(freqs)
vocal_range = (freqs >= 80) & (freqs <= 4000)
vocal_mask[vocal_range] = 1.0
# Smooth the mask edges
from scipy.ndimage import gaussian_filter1d
vocal_mask = gaussian_filter1d(vocal_mask, sigma=3)
# Apply mask
S_vocal = S * vocal_mask[:, np.newaxis]
S_accomp = S * (1.0 - vocal_mask[:, np.newaxis])
y_vocal = librosa.istft(S_vocal, hop_length=_HOP_LENGTH, length=len(y))
y_accomp = librosa.istft(S_accomp, hop_length=_HOP_LENGTH, length=len(y))
return y_vocal, y_accomp
# ═══════════════════════════════════════════════════════════════════════
# PRIVATE — Pitch analysis
# ═══════════════════════════════════════════════════════════════════════
def _analyze_pitch(y_vocal: np.ndarray, sr: int) -> dict:
"""Extract pitch (f0) from vocal signal using pyin."""
f0, voiced_flag, voiced_probs = librosa.pyin(
y_vocal,
fmin=librosa.note_to_hz('C2'), # ~65 Hz
fmax=librosa.note_to_hz('C7'), # ~2093 Hz
sr=sr,
hop_length=_HOP_LENGTH,
)
# Filter to voiced frames only
voiced_f0 = f0[voiced_flag]
if len(voiced_f0) < 10:
return {
"f0_hz": f0,
"f0_mean": 0.0,
"f0_std_hz": 0.0,
"f0_std_cents": 0.0,
"voiced_ratio": 0.0,
"pitch_jitter": 0.0,
"pitch_range_semitones": 0.0,
}
f0_mean = float(np.mean(voiced_f0))
f0_std = float(np.std(voiced_f0))
# Convert to cents for perceptual accuracy
# 1 cent = 1/100 of a semitone
cents = 1200 * np.log2(voiced_f0 / f0_mean)
f0_std_cents = float(np.std(cents))
# Pitch jitter — frame-to-frame pitch variation
if len(voiced_f0) > 1:
jitter_cents = 1200 * np.abs(np.log2(voiced_f0[1:] / voiced_f0[:-1]))
pitch_jitter = float(np.mean(jitter_cents))
else:
pitch_jitter = 0.0
# Pitch range in semitones
pitch_range = float(12 * np.log2(np.max(voiced_f0) / np.min(voiced_f0)))
voiced_ratio = float(np.sum(voiced_flag) / len(voiced_flag))
return {
"f0_hz": f0,
"f0_mean": f0_mean,
"f0_std_hz": f0_std,
"f0_std_cents": f0_std_cents,
"voiced_ratio": voiced_ratio,
"pitch_jitter": pitch_jitter,
"pitch_range_semitones": pitch_range,
}
# ═══════════════════════════════════════════════════════════════════════
# PRIVATE — Vibrato analysis
# ═══════════════════════════════════════════════════════════════════════
def _analyze_vibrato(f0_hz: np.ndarray, sr: int) -> dict:
"""Analyze vibrato characteristics from pitch contour."""
voiced = f0_hz[~np.isnan(f0_hz)]
if len(voiced) < 20:
return {
"rate_hz": 0.0,
"extent_cents": 0.0,
"regularity": 0.0,
}
# Detrend pitch to isolate oscillation
from scipy.signal import detrend
detrended = detrend(voiced)
# Convert to cents
mean_f0 = np.mean(voiced)
if mean_f0 < 1:
return {"rate_hz": 0.0, "extent_cents": 0.0, "regularity": 0.0}
cents_deviation = 1200 * np.log2((voiced) / mean_f0)
cents_detrended = detrend(cents_deviation)
# FFT to find vibrato rate
hop_rate = sr / _HOP_LENGTH # frames per second
fft = np.abs(np.fft.rfft(cents_detrended))
freqs = np.fft.rfftfreq(len(cents_detrended), d=1.0 / hop_rate)
# Vibrato typically 4-8 Hz
vibrato_range = (freqs >= 3) & (freqs <= 10)
if not np.any(vibrato_range):
return {"rate_hz": 0.0, "extent_cents": 0.0, "regularity": 0.0}
fft_vibrato = fft.copy()
fft_vibrato[~vibrato_range] = 0
peak_idx = np.argmax(fft_vibrato)
vibrato_rate = float(freqs[peak_idx])
vibrato_power = float(fft[peak_idx])
# Extent — average deviation in cents
extent_cents = float(np.std(cents_detrended)) * 2 # ~peak-to-peak
# Regularity — how periodic is the vibrato
total_power = float(np.sum(fft[vibrato_range] ** 2))
peak_power = float(fft[peak_idx] ** 2)
regularity = peak_power / (total_power + 1e-10)
return {
"rate_hz": vibrato_rate,
"extent_cents": extent_cents,
"regularity": float(regularity),
}
# ═══════════════════════════════════════════════════════════════════════
# PRIVATE — Formant analysis
# ═══════════════════════════════════════════════════════════════════════
def _analyze_formants(y_vocal: np.ndarray, sr: int) -> dict:
"""
Analyze formant consistency via spectral envelope.
Uses LPC (Linear Predictive Coding) to estimate formant
frequencies and tracks their stability over time.
"""
frame_length = 2048
hop = _HOP_LENGTH
n_frames = (len(y_vocal) - frame_length) // hop
if n_frames < 5:
return {"f1_std": 0.0, "f2_std": 0.0, "formant_stability": 0.0}
formant_tracks = {1: [], 2: [], 3: []}
for i in range(min(n_frames, 200)): # Limit to 200 frames
start = i * hop
frame = y_vocal[start: start + frame_length]
if np.max(np.abs(frame)) < 1e-6:
continue
# Apply window
frame = frame * np.hamming(len(frame))
# LPC analysis (order 12-16 works well for formants)
try:
lpc_order = min(16, len(frame) - 1)
a = librosa.lpc(frame, order=lpc_order)
# Find formant frequencies from LPC roots
roots = np.roots(a)
roots = roots[np.imag(roots) >= 0] # Keep positive frequencies
angles = np.angle(roots)
freqs_hz = angles * (sr / (2 * np.pi))
# Filter to reasonable formant ranges
formants = sorted(f for f in freqs_hz if 200 < f < 5000)
if len(formants) >= 3:
formant_tracks[1].append(formants[0])
formant_tracks[2].append(formants[1])
formant_tracks[3].append(formants[2])
except Exception:
continue
if len(formant_tracks[1]) < 5:
return {"f1_std": 0.0, "f2_std": 0.0, "formant_stability": 0.0}
f1_std = float(np.std(formant_tracks[1]))
f2_std = float(np.std(formant_tracks[2]))
# Formant stability: lower = more stable = potentially more AI-like
formant_stability = float(np.mean([
np.std(formant_tracks[1]) / (np.mean(formant_tracks[1]) + 1e-10),
np.std(formant_tracks[2]) / (np.mean(formant_tracks[2]) + 1e-10),
]))
return {
"f1_std": f1_std,
"f2_std": f2_std,
"formant_stability": formant_stability,
}
# ═══════════════════════════════════════════════════════════════════════
# PRIVATE — Breath pattern analysis
# ═══════════════════════════════════════════════════════════════════════
def _analyze_breath_patterns(y_vocal: np.ndarray, sr: int) -> dict:
"""
Detect breath-like sounds and silence patterns.
Human singers have irregular breath sounds between phrases.
AI either omits them or produces unnaturally regular patterns.
"""
# RMS energy envelope
rms = librosa.feature.rms(y=y_vocal, hop_length=_HOP_LENGTH)[0]
# Silence threshold (relative)
silence_thresh = np.mean(rms) * 0.15
# Find silence segments (potential breath locations)
is_quiet = rms < silence_thresh
quiet_segments = _find_segments(is_quiet)
# Filter to breath-like durations (0.1s - 1.0s)
hop_sec = _HOP_LENGTH / sr
breath_like = [
seg for seg in quiet_segments
if 0.1 <= seg["duration"] * hop_sec <= 1.0
]
breath_count = len(breath_like)
if breath_count < 2:
return {
"breath_count": breath_count,
"breath_regularity": 0.0,
"breath_density": 0.0,
}
# Inter-breath intervals
breath_starts = [seg["start"] * hop_sec for seg in breath_like]
ibi = np.diff(breath_starts)
breath_regularity = float(np.std(ibi) / (np.mean(ibi) + 1e-10))
duration_sec = len(y_vocal) / sr
breath_density = breath_count / duration_sec
return {
"breath_count": breath_count,
"breath_regularity": breath_regularity,
"breath_density": breath_density,
}
def _find_segments(mask: np.ndarray) -> list[dict]:
"""Find contiguous True segments in a boolean array."""
segments = []
in_segment = False
start = 0
for i, val in enumerate(mask):
if val and not in_segment:
start = i
in_segment = True
elif not val and in_segment:
segments.append({"start": start, "duration": i - start})
in_segment = False
if in_segment:
segments.append({"start": start, "duration": len(mask) - start})
return segments
# ═══════════════════════════════════════════════════════════════════════
# PRIVATE — Vocal texture
# ═══════════════════════════════════════════════════════════════════════
def _analyze_vocal_texture(y_vocal: np.ndarray, sr: int) -> dict:
"""Analyze the harmonic richness and texture of the vocal."""
y_h, y_p = librosa.effects.hpss(y_vocal)
h_energy = float(np.sum(y_h ** 2))
total = float(np.sum(y_vocal ** 2))
vocal_harmonic_ratio = h_energy / (total + 1e-10)
# Spectral roll-off — where 85% of energy is below
rolloff = librosa.feature.spectral_rolloff(
y=y_vocal, sr=sr, hop_length=_HOP_LENGTH, roll_percent=0.85
)[0]
rolloff_std = float(np.std(rolloff))
rolloff_mean = float(np.mean(rolloff))
# MFCC variance on vocal
mfcc = librosa.feature.mfcc(y=y_vocal, sr=sr, n_mfcc=13, hop_length=_HOP_LENGTH)
mfcc_var = float(np.mean(np.var(mfcc, axis=1)))
return {
"vocal_harmonic_ratio": vocal_harmonic_ratio,
"rolloff_std": rolloff_std,
"rolloff_mean": rolloff_mean,
"mfcc_var": mfcc_var,
}
# ═══════════════════════════════════════════════════════════════════════
# PRIVATE — Scoring functions
# ═══════════════════════════════════════════════════════════════════════
def _sigmoid(x: float, mid: float, steep: float) -> float:
z = steep * (x - mid)
z = max(-20.0, min(20.0, z))
return 1.0 / (1.0 + np.exp(-z))
def _score_pitch_stability(pitch_data: dict) -> float:
"""
Low pitch jitter + low pitch std = unnaturally stable = AI-like.
Human singers: jitter ~10-25 cents, std ~50-150 cents.
AI singers: jitter ~2-8 cents, std ~10-40 cents.
"""
if pitch_data["voiced_ratio"] < 0.1:
return 0.5
jitter_score = 1.0 - _sigmoid(pitch_data["pitch_jitter"], mid=12, steep=0.15)
std_score = 1.0 - _sigmoid(pitch_data["f0_std_cents"], mid=60, steep=0.03)
return jitter_score * 0.6 + std_score * 0.4
def _score_vibrato_regularity(vibrato_data: dict) -> float:
"""
Very regular vibrato (high regularity value) = AI-like.
Human vibrato: regularity ~0.2-0.5
AI vibrato: regularity ~0.6-0.9
"""
if vibrato_data["rate_hz"] < 1:
return 0.5 # No clear vibrato
reg_score = _sigmoid(vibrato_data["regularity"], mid=0.45, steep=6)
return float(reg_score)
def _score_formant_consistency(formant_data: dict) -> float:
"""
Very stable formants (low formant_stability CV) = AI-like.
Human: CV ~0.08-0.20
AI: CV ~0.02-0.07
"""
if formant_data["formant_stability"] == 0:
return 0.5
return float(1.0 - _sigmoid(formant_data["formant_stability"], mid=0.10, steep=15))
def _score_breath_patterns(breath_data: dict) -> float:
"""
AI tends to either have no breaths or very regular breaths.
Very low breath count or very low breath_regularity variance = AI-like.
"""
if breath_data["breath_count"] == 0:
return 0.7 # No breaths at all is suspicious
if breath_data["breath_count"] == 1:
return 0.5
# Very regular breathing (low CV) = AI-like
reg = breath_data["breath_regularity"]
return float(1.0 - _sigmoid(reg, mid=0.3, steep=5))
def _score_vocal_texture(texture_data: dict) -> float:
"""
Very clean vocal texture (high harmonic ratio, low MFCC variance) = AI-like.
"""
hr_score = _sigmoid(texture_data["vocal_harmonic_ratio"], mid=0.65, steep=6)
mfcc_score = 1.0 - _sigmoid(texture_data["mfcc_var"], mid=40, steep=0.04)
return float(hr_score * 0.5 + mfcc_score * 0.5)
# ═══════════════════════════════════════════════════════════════════════
# PRIVATE — Indicator text generation
# ═══════════════════════════════════════════════════════════════════════
def _build_vocal_indicators(
overall: float,
pitch: float,
vibrato: float,
formant: float,
breath: float,
pitch_data: dict,
) -> list[str]:
"""Generate human-readable vocal analysis indicators."""
indicators = []
if overall > 0.7:
indicators.append(
"Vocal patterns show strong synthetic characteristics."
)
elif overall > 0.5:
indicators.append(
"Vocal patterns show moderate synthetic indicators."
)
else:
indicators.append(
"Vocal patterns appear consistent with natural human singing."
)
if pitch > 0.7:
indicators.append(
f"Pitch stability is unusually high "
f"(jitter: {pitch_data['pitch_jitter']:.1f} cents). "
f"Human singers typically show more micro-variation."
)
if vibrato > 0.7:
indicators.append(
"Vibrato is mathematically regular, suggesting algorithmic generation."
)
if formant > 0.7:
indicators.append(
"Formant transitions are unnaturally consistent across frames."
)
if breath < 0.3:
indicators.append(
"Natural breath patterns detected between vocal phrases."
)
elif breath > 0.6:
indicators.append(
"Breath patterns are absent or overly regular."
)
return indicators