File size: 4,267 Bytes
b894479
70f978a
 
b894479
 
 
 
 
cdfe973
70f978a
b894479
 
 
70f978a
cdfe973
b894479
 
 
70f978a
 
b894479
 
 
 
70f978a
 
 
b894479
 
 
70f978a
 
 
b894479
 
 
70f978a
b894479
 
 
 
 
 
 
70f978a
b894479
 
 
 
70f978a
 
b894479
70f978a
b894479
70f978a
 
b894479
70f978a
b894479
 
 
 
 
 
 
 
 
 
70f978a
b894479
 
70f978a
 
b894479
 
70f978a
 
 
b894479
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
70f978a
b894479
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# tools/youtube_video_tool.py
import base64
import os
import re
import requests
import subprocess
import tempfile
from io import BytesIO
from tools.base_tool import BaseTool

import av
import yt_dlp
from tools.speech_recognition_tool import SpeechRecognitionTool

class YouTubeVideoTool(BaseTool):
    name = 'youtube_video'
    description = 'Process a YouTube video and answer questions based on content.'
    
    def __init__(
        self,
        speech_tool: SpeechRecognitionTool = None,
        quality: int = 360,
        frame_interval: float = 2.0,
        chunk_duration: float = 2.0,
        debug: bool = False,
    ):
        self.speech_tool = speech_tool
        self.quality = quality
        self.frame_interval = frame_interval
        self.chunk_duration = chunk_duration
        self.debug = debug

    def forward(self, url: str, query: str) -> str:
        video = self._download_video_info(url)
        captions = self._get_captions(video)
        title, description = video['title'], video['description']

        chunks = self._split_captions(captions)
        answer = ""

        for chunk in chunks:
            prompt = self._build_prompt(title, description, chunk, query, answer)
            response = self._mock_llm(prompt)  # replace with real call to your LLM
            answer = response.strip()

        return answer

    def _download_video_info(self, url: str):
        opts = {
            'quiet': True,
            'skip_download': True,
            'format': f'bestvideo[height<={self.quality}]+bestaudio/best',
        }
        with yt_dlp.YoutubeDL(opts) as ydl:
            return ydl.extract_info(url, download=False)

    def _get_captions(self, info: dict):
        lang = 'en'
        subs = info.get('subtitles', {}).get(lang) or info.get('automatic_captions', {}).get(lang)

        if subs:
            sub = next((s for s in subs if s['ext'] == 'vtt'), None)
            if sub:
                text = requests.get(sub['url']).text
                return self._parse_vtt(text)

        # fallback to Whisper-based transcription
        if self.speech_tool:
            audio_url = self._select_audio_format(info['formats'])
            audio = self._download_audio(audio_url)
            with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
                f.write(audio.read())
                f.flush()
                transcription = self.speech_tool.forward(audio=f.name, with_time_markers=True)
            return self._parse_whisper_transcription(transcription)
        return []

    def _select_audio_format(self, formats):
        audio_only = [f for f in formats if f.get('vcodec') == 'none']
        audio_only.sort(key=lambda f: f.get('abr', 0), reverse=True)
        return audio_only[0]['url']

    def _download_audio(self, audio_url: str) -> BytesIO:
        cmd = ["ffmpeg", "-i", audio_url, "-f", "wav", "-ac", "1", "-ar", "16000", "-"]
        proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
        return BytesIO(proc.stdout)

    def _parse_vtt(self, vtt_data: str):
        segments = []
        entries = re.findall(r'(\d+:\d+:\d+\.\d+ --> \d+:\d+:\d+\.\d+)(.*?)\n(?=\n|\d)', vtt_data, re.DOTALL)
        for (time_range, text) in entries:
            clean_text = re.sub(r'<.*?>', '', text).strip().replace("\n", " ")
            segments.append({"text": clean_text})
        return segments

    def _parse_whisper_transcription(self, text: str):
        pattern = re.compile(r'\[(\d+\.\d+)]\n(.+?)\n\[(\d+\.\d+)]')
        return [{"text": match[1]} for match in pattern.findall(text)]

    def _split_captions(self, captions):
        # Simple fixed-length chunking
        return [
            {"text": " ".join([c["text"] for c in captions[i:i+3]])}
            for i in range(0, len(captions), 3)
        ]

    def _build_prompt(self, title, desc, chunk, query, prev):
        base = f"""
Video Title: {title}
Video Description: {desc}
Transcript:
{chunk['text']}
"""
        if prev:
            base += f"\nPrevious answer: {prev}\n"
        base += f"Question: {query}"
        return base.strip()

    def _mock_llm(self, prompt: str):
        # Replace this with call to your real LLM
        return "I need to keep watching."