Spaces:
Sleeping
Sleeping
File size: 4,267 Bytes
b894479 70f978a b894479 cdfe973 70f978a b894479 70f978a cdfe973 b894479 70f978a b894479 70f978a b894479 70f978a b894479 70f978a b894479 70f978a b894479 70f978a b894479 70f978a b894479 70f978a b894479 70f978a b894479 70f978a b894479 70f978a b894479 70f978a b894479 70f978a b894479 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 | # tools/youtube_video_tool.py
import base64
import os
import re
import requests
import subprocess
import tempfile
from io import BytesIO
from tools.base_tool import BaseTool
import av
import yt_dlp
from tools.speech_recognition_tool import SpeechRecognitionTool
class YouTubeVideoTool(BaseTool):
name = 'youtube_video'
description = 'Process a YouTube video and answer questions based on content.'
def __init__(
self,
speech_tool: SpeechRecognitionTool = None,
quality: int = 360,
frame_interval: float = 2.0,
chunk_duration: float = 2.0,
debug: bool = False,
):
self.speech_tool = speech_tool
self.quality = quality
self.frame_interval = frame_interval
self.chunk_duration = chunk_duration
self.debug = debug
def forward(self, url: str, query: str) -> str:
video = self._download_video_info(url)
captions = self._get_captions(video)
title, description = video['title'], video['description']
chunks = self._split_captions(captions)
answer = ""
for chunk in chunks:
prompt = self._build_prompt(title, description, chunk, query, answer)
response = self._mock_llm(prompt) # replace with real call to your LLM
answer = response.strip()
return answer
def _download_video_info(self, url: str):
opts = {
'quiet': True,
'skip_download': True,
'format': f'bestvideo[height<={self.quality}]+bestaudio/best',
}
with yt_dlp.YoutubeDL(opts) as ydl:
return ydl.extract_info(url, download=False)
def _get_captions(self, info: dict):
lang = 'en'
subs = info.get('subtitles', {}).get(lang) or info.get('automatic_captions', {}).get(lang)
if subs:
sub = next((s for s in subs if s['ext'] == 'vtt'), None)
if sub:
text = requests.get(sub['url']).text
return self._parse_vtt(text)
# fallback to Whisper-based transcription
if self.speech_tool:
audio_url = self._select_audio_format(info['formats'])
audio = self._download_audio(audio_url)
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
f.write(audio.read())
f.flush()
transcription = self.speech_tool.forward(audio=f.name, with_time_markers=True)
return self._parse_whisper_transcription(transcription)
return []
def _select_audio_format(self, formats):
audio_only = [f for f in formats if f.get('vcodec') == 'none']
audio_only.sort(key=lambda f: f.get('abr', 0), reverse=True)
return audio_only[0]['url']
def _download_audio(self, audio_url: str) -> BytesIO:
cmd = ["ffmpeg", "-i", audio_url, "-f", "wav", "-ac", "1", "-ar", "16000", "-"]
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
return BytesIO(proc.stdout)
def _parse_vtt(self, vtt_data: str):
segments = []
entries = re.findall(r'(\d+:\d+:\d+\.\d+ --> \d+:\d+:\d+\.\d+)(.*?)\n(?=\n|\d)', vtt_data, re.DOTALL)
for (time_range, text) in entries:
clean_text = re.sub(r'<.*?>', '', text).strip().replace("\n", " ")
segments.append({"text": clean_text})
return segments
def _parse_whisper_transcription(self, text: str):
pattern = re.compile(r'\[(\d+\.\d+)]\n(.+?)\n\[(\d+\.\d+)]')
return [{"text": match[1]} for match in pattern.findall(text)]
def _split_captions(self, captions):
# Simple fixed-length chunking
return [
{"text": " ".join([c["text"] for c in captions[i:i+3]])}
for i in range(0, len(captions), 3)
]
def _build_prompt(self, title, desc, chunk, query, prev):
base = f"""
Video Title: {title}
Video Description: {desc}
Transcript:
{chunk['text']}
"""
if prev:
base += f"\nPrevious answer: {prev}\n"
base += f"Question: {query}"
return base.strip()
def _mock_llm(self, prompt: str):
# Replace this with call to your real LLM
return "I need to keep watching." |