import functools import io import json import os import librosa import numpy as np import soundfile as sf import torch from huggingface_hub import snapshot_download, hf_hub_download from safetensors.torch import load_file class APIHandler: def __init__(self, device: str | None = None): # ---------------------------------------------------- # DEVICE # ---------------------------------------------------- self.device = device or ("cuda" if torch.cuda.is_available() else "cpu") # ---------------------------------------------------- # SONGGENERATION (LeVo 2 pipeline) # ---------------------------------------------------- self.songgen_path = snapshot_download("tencent/SongGeneration") import sys sys.path.insert(0, self.songgen_path) from generate import LeVoPipeline # from Tencent's repo self.song_pipeline = LeVoPipeline.from_pretrained( self.songgen_path, model_name="SongGeneration-v2-large", # adjust if you use another variant device=self.device, ) # ---------------------------------------------------- # DREAMVAE (FastOobleckDecoder) + ACE-STEP ENCODER # ---------------------------------------------------- self.dream_repo = "daydreamlive/DreamVAE" dream_weights = hf_hub_download(self.dream_repo, "model.safetensors") dream_config = hf_hub_download(self.dream_repo, "config.json") dream_modeling = hf_hub_download(self.dream_repo, "modeling.py") sys.path.insert(0, os.path.dirname(dream_modeling)) from modeling import FastOobleckDecoder cfg = json.load(open(dream_config)) self.dream_vae = FastOobleckDecoder( channels=cfg["channels"], input_channels=cfg["input_channels"], audio_channels=cfg["audio_channels"], upsampling_ratios=cfg["upsampling_ratios"], channel_multiples=cfg["channel_multiples"], ).eval().to(self.device) self.dream_vae.load_state_dict(load_file(dream_weights)) # ---------------------------------------------------- # ACE-STEP ENCODER (for real DreamVAE editing) # ---------------------------------------------------- # This assumes an ACE-Step encoder repo exists; adjust names/paths to match your model. self.ace_repo = "daydreamlive/ACE-STEP" ace_weights = hf_hub_download(self.ace_repo, "model.safetensors") ace_config = hf_hub_download(self.ace_repo, "config.json") ace_modeling = hf_hub_download(self.ace_repo, "modeling.py") sys.path.insert(0, os.path.dirname(ace_modeling)) from modeling import ACEEncoder # adjust to actual class name in ACE-STEP repo ace_cfg = json.load(open(ace_config)) self.ace_encoder = ACEEncoder( channels=ace_cfg["channels"], input_channels=ace_cfg["input_channels"], audio_channels=ace_cfg["audio_channels"], downsampling_ratios=ace_cfg["downsampling_ratios"], channel_multiples=ace_cfg["channel_multiples"], ).eval().to(self.device) self.ace_encoder.load_state_dict(load_file(ace_weights)) # -------------------------------------------------------- # SONG GENERATION (LOCAL LEVO 2) # -------------------------------------------------------- @functools.lru_cache(maxsize=128) def call_song_gen(self, model, prompt, lyrics, voice, duration): if model != "tencent/SongGeneration": return {"status": "error", "message": "Invalid model"} try: request = { "lyrics": lyrics, "sections": [{"type": "verse", "text": lyrics or prompt}], "description": prompt, "voice": voice, "duration": duration, } audio, sr = self.song_pipeline.generate(request) audio = np.asarray(audio, dtype=np.float32) out_path = "generated_song.wav" sf.write(out_path, audio, sr) return { "status": "success", "audio_path": out_path, "meta": { "duration": duration, "voice": voice, "sample_rate": sr, }, } except Exception as e: return {"status": "error", "message": str(e)} # -------------------------------------------------------- # DREAMVAE AUDIO EDITING (REAL LATENT PIPELINE) # -------------------------------------------------------- def call_audio_edit(self, model, audio_bytes, action, param=None): if model != "daydreamlive/DreamVAE": return {"status": "error", "message": "Invalid model"} if not audio_bytes: return {"status": "error", "message": "No audio provided"} try: # 1. Load audio from bytes buf = io.BytesIO(audio_bytes) audio, sr = librosa.load(buf, sr=48000, mono=False) if audio.ndim == 1: audio = np.stack([audio, audio], axis=0) audio_tensor = torch.tensor(audio).float().to(self.device) audio_tensor = audio_tensor.unsqueeze(0) # [B, 2, T] # 2. Encode to latents with ACE-STEP with torch.no_grad(): latents = self.ace_encoder(audio_tensor) # [B, 64, T_latent] # 3. Apply editing in latent space latents = self._apply_edit_action(latents, action, param or {}) # 4. Decode back to audio with DreamVAE with torch.no_grad(): decoded = self.dream_vae(latents) # [B, 2, 1920*T_latent] decoded = decoded.squeeze(0).cpu().numpy() # [2, samples] out_path = "edited_audio.wav" sf.write(out_path, decoded.T, 48000) return { "status": "success", "edited_audio_path": out_path, "meta": { "action": action, "param": param, "sample_rate": 48000, }, } except Exception as e: return {"status": "error", "message": str(e)} # -------------------------------------------------------- # LATENT EDITING ACTIONS # -------------------------------------------------------- def _apply_edit_action(self, latents: torch.Tensor, action: str, param: dict) -> torch.Tensor: if action == "denoise": strength = float(param.get("strength", 0.9)) return latents * strength if action == "boost_highs": gain = float(param.get("gain", 1.1)) return latents * gain if action == "cover": # simple example: randomize some channels for a "cover" feel noise_level = float(param.get("noise", 0.2)) noise = torch.randn_like(latents) * noise_level return latents + noise if action == "extend": repeat = int(param.get("repeat", 2)) return latents.repeat(1, 1, repeat) if action == "swap": # channel swap in latent space (toy example) return latents.flip(1) if action == "crop": start = int(param.get("start", 0)) end = int(param.get("end", latents.shape[-1])) return latents[:, :, start:end] return latents