| | import aiohttp |
| | import asyncio |
| | import os |
| | import uuid |
| | import tempfile |
| | from typing import List, Dict, Any |
| |
|
| | from pydantic import BaseModel |
| |
|
| |
|
| | class AlignmentData(BaseModel): |
| | word: str |
| | start: float |
| | end: float |
| |
|
| | def to_dict(self) -> dict: |
| | return { |
| | "word": self.word, |
| | "alignedWord": self.word, |
| | "startTime": self.start, |
| | "endTime": self.end, |
| | "hasFailedAlignment": False, |
| | } |
| |
|
| |
|
| | class CharacterAITTS: |
| | def __init__(self): |
| | self.api_url = "https://yakova-embedding.hf.space" |
| | self.dir = str(tempfile.mkdtemp()) |
| | self.descript = "https://yakova-embedding.hf.space" |
| | self.headers = {"Connection": "keep-alive", "Content-Type": "application/json"} |
| |
|
| | async def _make_transcript(self, links, text): |
| |
|
| | data = {"audio_url": links, "text": text, "file_extenstion": ".mp3"} |
| | response_data = await self._make_request( |
| | "post", "descript_transcript", json=data, external=self.descript |
| | ) |
| | if not response_data: |
| | data["audio_url"] = data["audio_url"][0] |
| | print(data) |
| | response_data = await self.aligner( |
| | "post", |
| | "align/url", |
| | json=data, |
| | ) |
| | print(response_data) |
| | response_data = self.process_alignments( |
| | data=response_data["alignment"], offset=0 |
| | ) |
| | return response_data |
| |
|
| | def process_alignments( |
| | self, data: List[Dict[str, Any]], offset: float = 0 |
| | ) -> List[Dict[str, Any]]: |
| | alignments = [AlignmentData(**item) for item in data] |
| | return [alignment.to_dict() for alignment in alignments] |
| |
|
| | async def aligner( |
| | self, |
| | method, |
| | endpoint, |
| | json=None, |
| | external="https://yakova-aligner.hf.space/align/url", |
| | ): |
| | async with aiohttp.ClientSession() as session: |
| | if external: |
| | url = f"{external}" |
| | else: |
| | url = f"{self.api_url}/{endpoint}" |
| | async with getattr(session, method)(url=url, json=json) as response: |
| | return await response.json() |
| |
|
| | async def _make_request(self, method, endpoint, json=None, external=None): |
| | async with aiohttp.ClientSession() as session: |
| | if external: |
| | url = f"{external}/{endpoint}" |
| | else: |
| | url = f"{self.api_url}/{endpoint}" |
| | async with getattr(session, method)(url=url, json=json) as response: |
| | return await response.json() |
| |
|
| | async def say(self, text, speaker=None): |
| |
|
| | data = {"text": text, "voice": speaker} |
| |
|
| | response_data = await self._make_request("post", "cai_tts", json=data) |
| | |
| | audio_url = response_data["audio"] |
| | temp = await self.download_file(audio_url) |
| | return audio_url, temp |
| |
|
| | async def download_file(self, url): |
| | filename = str(uuid.uuid4()) + ".mp3" |
| | os.makedirs(self.dir, exist_ok=True) |
| | save_path = os.path.join(self.dir, filename) |
| | async with aiohttp.ClientSession() as session: |
| | async with session.get(url) as response: |
| | if response.status == 200: |
| | with open(save_path, "wb") as file: |
| | while True: |
| | chunk = await response.content.read(1024) |
| | if not chunk: |
| | break |
| | file.write(chunk) |
| |
|
| | return save_path |
| |
|
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| |
|
| | |
| | |
| |
|