flow2api / src /services /file_cache.py
iyougame's picture
Sync from GitHub: fb7bac37fa23d30514ddc08f3ae5081422c7cd21
17f3fb1 verified
"""File caching service"""
import os
import asyncio
import hashlib
import time
from pathlib import Path
from typing import Optional
from datetime import datetime, timedelta
from curl_cffi.requests import AsyncSession
from ..core.config import config
from ..core.logger import debug_logger
class FileCache:
"""File caching service for videos"""
def __init__(self, cache_dir: str = "tmp", default_timeout: int = 7200, proxy_manager=None):
"""
Initialize file cache
Args:
cache_dir: Cache directory path
default_timeout: Default cache timeout in seconds (default: 2 hours)
proxy_manager: ProxyManager instance for downloading files
"""
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
self.default_timeout = default_timeout
self.proxy_manager = proxy_manager
self._cleanup_task = None
async def start_cleanup_task(self):
"""Start background cleanup task"""
if self._cleanup_task is None:
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
async def stop_cleanup_task(self):
"""Stop background cleanup task"""
if self._cleanup_task:
self._cleanup_task.cancel()
try:
await self._cleanup_task
except asyncio.CancelledError:
pass
self._cleanup_task = None
async def _cleanup_loop(self):
"""Background task to clean up expired files"""
while True:
try:
await asyncio.sleep(300) # Check every 5 minutes
await self._cleanup_expired_files()
except asyncio.CancelledError:
break
except Exception as e:
debug_logger.log_error(
error_message=f"Cleanup task error: {str(e)}",
status_code=0,
response_text=""
)
async def _cleanup_expired_files(self):
"""Remove expired cache files"""
try:
current_time = time.time()
removed_count = 0
for file_path in self.cache_dir.iterdir():
if file_path.is_file():
# Check file age
file_age = current_time - file_path.stat().st_mtime
if file_age > self.default_timeout:
try:
file_path.unlink()
removed_count += 1
except Exception:
pass
if removed_count > 0:
debug_logger.log_info(f"Cleanup: removed {removed_count} expired cache files")
except Exception as e:
debug_logger.log_error(
error_message=f"Failed to cleanup expired files: {str(e)}",
status_code=0,
response_text=""
)
def _generate_cache_filename(self, url: str, media_type: str) -> str:
"""Generate unique filename for cached file"""
# Use URL hash as filename
url_hash = hashlib.md5(url.encode()).hexdigest()
# Determine file extension
if media_type == "video":
ext = ".mp4"
elif media_type == "image":
ext = ".jpg"
else:
ext = ""
return f"{url_hash}{ext}"
async def download_and_cache(self, url: str, media_type: str) -> str:
"""
Download file from URL and cache it locally
Args:
url: File URL to download
media_type: 'image' or 'video'
Returns:
Local cache filename
"""
filename = self._generate_cache_filename(url, media_type)
file_path = self.cache_dir / filename
# Check if already cached and not expired
if file_path.exists():
file_age = time.time() - file_path.stat().st_mtime
if file_age < self.default_timeout:
debug_logger.log_info(f"Cache hit: {filename}")
return filename
else:
# Remove expired file
try:
file_path.unlink()
except Exception:
pass
# Download file
debug_logger.log_info(f"Downloading file from: {url}")
# Get proxy if available
proxy_url = None
if self.proxy_manager:
proxy_config = await self.proxy_manager.get_proxy_config()
if proxy_config and proxy_config.enabled and proxy_config.proxy_url:
proxy_url = proxy_config.proxy_url
# Try method 1: curl_cffi with browser impersonation
try:
async with AsyncSession() as session:
proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
headers = {
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Upgrade-Insecure-Requests": "1"
}
response = await session.get(
url,
timeout=60,
proxies=proxies,
headers=headers,
impersonate="chrome120",
verify=False
)
if response.status_code == 200:
with open(file_path, 'wb') as f:
f.write(response.content)
debug_logger.log_info(f"File cached (curl_cffi): {filename} ({len(response.content)} bytes)")
return filename
else:
debug_logger.log_warning(f"curl_cffi failed with HTTP {response.status_code}, trying wget...")
except Exception as e:
debug_logger.log_warning(f"curl_cffi failed: {str(e)}, trying wget...")
# Try method 2: wget command
try:
import subprocess
wget_cmd = [
"wget",
"-q", # Quiet mode
"-O", str(file_path), # Output file
"--timeout=60",
"--tries=3",
"--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"--header=Accept: */*",
"--header=Accept-Language: zh-CN,zh;q=0.9,en;q=0.8",
"--header=Connection: keep-alive"
]
# Add proxy if configured
if proxy_url:
# wget uses environment variables for proxy
env = os.environ.copy()
env['http_proxy'] = proxy_url
env['https_proxy'] = proxy_url
else:
env = None
# Add URL
wget_cmd.append(url)
# Execute wget
result = subprocess.run(wget_cmd, capture_output=True, timeout=90, env=env)
if result.returncode == 0 and file_path.exists():
file_size = file_path.stat().st_size
if file_size > 0:
debug_logger.log_info(f"File cached (wget): {filename} ({file_size} bytes)")
return filename
else:
raise Exception("Downloaded file is empty")
else:
error_msg = result.stderr.decode('utf-8', errors='ignore') if result.stderr else "Unknown error"
debug_logger.log_warning(f"wget failed: {error_msg}, trying curl...")
except FileNotFoundError:
debug_logger.log_warning("wget not found, trying curl...")
except Exception as e:
debug_logger.log_warning(f"wget failed: {str(e)}, trying curl...")
# Try method 3: system curl command
try:
import subprocess
curl_cmd = [
"curl",
"-L", # Follow redirects
"-s", # Silent mode
"-o", str(file_path), # Output file
"--max-time", "60",
"-H", "Accept: */*",
"-H", "Accept-Language: zh-CN,zh;q=0.9,en;q=0.8",
"-H", "Connection: keep-alive",
"-A", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
]
# Add proxy if configured
if proxy_url:
curl_cmd.extend(["-x", proxy_url])
# Add URL
curl_cmd.append(url)
# Execute curl
result = subprocess.run(curl_cmd, capture_output=True, timeout=90)
if result.returncode == 0 and file_path.exists():
file_size = file_path.stat().st_size
if file_size > 0:
debug_logger.log_info(f"File cached (curl): {filename} ({file_size} bytes)")
return filename
else:
raise Exception("Downloaded file is empty")
else:
error_msg = result.stderr.decode('utf-8', errors='ignore') if result.stderr else "Unknown error"
raise Exception(f"curl command failed: {error_msg}")
except Exception as e:
debug_logger.log_error(
error_message=f"Failed to download file: {str(e)}",
status_code=0,
response_text=str(e)
)
raise Exception(f"Failed to cache file: {str(e)}")
def get_cache_path(self, filename: str) -> Path:
"""Get full path to cached file"""
return self.cache_dir / filename
def set_timeout(self, timeout: int):
"""Set cache timeout in seconds"""
self.default_timeout = timeout
debug_logger.log_info(f"Cache timeout updated to {timeout} seconds")
def get_timeout(self) -> int:
"""Get current cache timeout"""
return self.default_timeout
async def clear_all(self):
"""Clear all cached files"""
try:
removed_count = 0
for file_path in self.cache_dir.iterdir():
if file_path.is_file():
try:
file_path.unlink()
removed_count += 1
except Exception:
pass
debug_logger.log_info(f"Cache cleared: removed {removed_count} files")
return removed_count
except Exception as e:
debug_logger.log_error(
error_message=f"Failed to clear cache: {str(e)}",
status_code=0,
response_text=""
)
raise