| | """Generation handling module""" |
| | import json |
| | import asyncio |
| | import base64 |
| | import time |
| | import random |
| | import re |
| | from typing import Optional, AsyncGenerator, Dict, Any |
| | from datetime import datetime |
| | from .sora_client import SoraClient |
| | from .token_manager import TokenManager |
| | from .load_balancer import LoadBalancer |
| | from .file_cache import FileCache |
| | from ..core.database import Database |
| | from ..core.models import Task, RequestLog |
| | from ..core.config import config |
| | from ..core.logger import debug_logger |
| |
|
| | |
| | MODEL_CONFIG = { |
| | "sora-image": { |
| | "type": "image", |
| | "width": 360, |
| | "height": 360 |
| | }, |
| | "sora-image-landscape": { |
| | "type": "image", |
| | "width": 540, |
| | "height": 360 |
| | }, |
| | "sora-image-portrait": { |
| | "type": "image", |
| | "width": 360, |
| | "height": 540 |
| | }, |
| | |
| | "sora-video-10s": { |
| | "type": "video", |
| | "orientation": "landscape", |
| | "n_frames": 300 |
| | }, |
| | "sora-video-landscape-10s": { |
| | "type": "video", |
| | "orientation": "landscape", |
| | "n_frames": 300 |
| | }, |
| | "sora-video-portrait-10s": { |
| | "type": "video", |
| | "orientation": "portrait", |
| | "n_frames": 300 |
| | }, |
| | |
| | "sora-video-15s": { |
| | "type": "video", |
| | "orientation": "landscape", |
| | "n_frames": 450 |
| | }, |
| | "sora-video-landscape-15s": { |
| | "type": "video", |
| | "orientation": "landscape", |
| | "n_frames": 450 |
| | }, |
| | "sora-video-portrait-15s": { |
| | "type": "video", |
| | "orientation": "portrait", |
| | "n_frames": 450 |
| | } |
| | } |
| |
|
| | class GenerationHandler: |
| | """Handle generation requests""" |
| |
|
| | def __init__(self, sora_client: SoraClient, token_manager: TokenManager, |
| | load_balancer: LoadBalancer, db: Database, proxy_manager=None): |
| | self.sora_client = sora_client |
| | self.token_manager = token_manager |
| | self.load_balancer = load_balancer |
| | self.db = db |
| | self.file_cache = FileCache( |
| | cache_dir="tmp", |
| | default_timeout=config.cache_timeout, |
| | proxy_manager=proxy_manager |
| | ) |
| |
|
| | def _get_base_url(self) -> str: |
| | """Get base URL for cache files""" |
| | |
| | config.reload_config() |
| |
|
| | |
| | if config.cache_base_url: |
| | return config.cache_base_url.rstrip('/') |
| | |
| | return f"http://{config.server_host}:{config.server_port}" |
| | |
| | def _decode_base64_image(self, image_str: str) -> bytes: |
| | """Decode base64 image""" |
| | |
| | if "," in image_str: |
| | image_str = image_str.split(",", 1)[1] |
| | return base64.b64decode(image_str) |
| |
|
| | def _decode_base64_video(self, video_str: str) -> bytes: |
| | """Decode base64 video""" |
| | |
| | if "," in video_str: |
| | video_str = video_str.split(",", 1)[1] |
| | return base64.b64decode(video_str) |
| |
|
| | def _process_character_username(self, username_hint: str) -> str: |
| | """Process character username from API response |
| | |
| | Logic: |
| | 1. Remove prefix (e.g., "blackwill." from "blackwill.meowliusma68") |
| | 2. Keep the remaining part (e.g., "meowliusma68") |
| | 3. Append 3 random digits |
| | 4. Return final username (e.g., "meowliusma68123") |
| | |
| | Args: |
| | username_hint: Original username from API (e.g., "blackwill.meowliusma68") |
| | |
| | Returns: |
| | Processed username with 3 random digits appended |
| | """ |
| | |
| | if "." in username_hint: |
| | base_username = username_hint.split(".")[-1] |
| | else: |
| | base_username = username_hint |
| |
|
| | |
| | random_digits = str(random.randint(100, 999)) |
| |
|
| | |
| | final_username = f"{base_username}{random_digits}" |
| | debug_logger.log_info(f"Processed username: {username_hint} -> {final_username}") |
| |
|
| | return final_username |
| |
|
| | def _clean_remix_link_from_prompt(self, prompt: str) -> str: |
| | """Remove remix link from prompt |
| | |
| | Removes both formats: |
| | 1. Full URL: https://sora.chatgpt.com/p/s_68e3a06dcd888191b150971da152c1f5 |
| | 2. Short ID: s_68e3a06dcd888191b150971da152c1f5 |
| | |
| | Args: |
| | prompt: Original prompt that may contain remix link |
| | |
| | Returns: |
| | Cleaned prompt without remix link |
| | """ |
| | if not prompt: |
| | return prompt |
| |
|
| | |
| | cleaned = re.sub(r'https://sora\.chatgpt\.com/p/s_[a-f0-9]{32}', '', prompt) |
| |
|
| | |
| | cleaned = re.sub(r's_[a-f0-9]{32}', '', cleaned) |
| |
|
| | |
| | cleaned = ' '.join(cleaned.split()) |
| |
|
| | debug_logger.log_info(f"Cleaned prompt: '{prompt}' -> '{cleaned}'") |
| |
|
| | return cleaned |
| |
|
| | async def _download_file(self, url: str) -> bytes: |
| | """Download file from URL |
| | |
| | Args: |
| | url: File URL |
| | |
| | Returns: |
| | File bytes |
| | """ |
| | from curl_cffi.requests import AsyncSession |
| |
|
| | proxy_url = await self.load_balancer.proxy_manager.get_proxy_url() |
| |
|
| | kwargs = { |
| | "timeout": 30, |
| | "impersonate": "chrome" |
| | } |
| |
|
| | if proxy_url: |
| | kwargs["proxy"] = proxy_url |
| |
|
| | async with AsyncSession() as session: |
| | response = await session.get(url, **kwargs) |
| | if response.status_code != 200: |
| | raise Exception(f"Failed to download file: {response.status_code}") |
| | return response.content |
| | |
| | async def check_token_availability(self, is_image: bool, is_video: bool) -> bool: |
| | """Check if tokens are available for the given model type |
| | |
| | Args: |
| | is_image: Whether checking for image generation |
| | is_video: Whether checking for video generation |
| | |
| | Returns: |
| | True if available tokens exist, False otherwise |
| | """ |
| | token_obj = await self.load_balancer.select_token(for_image_generation=is_image, for_video_generation=is_video) |
| | return token_obj is not None |
| |
|
| | async def _run_background_poll(self, polling_generator): |
| | """Run polling generator in background until completion""" |
| | try: |
| | async for _ in polling_generator: |
| | pass |
| | except Exception as e: |
| | debug_logger.log_error(f"Background polling failed: {str(e)}") |
| |
|
| | async def submit_generation_task(self, model: str, prompt: str, |
| | image: Optional[str] = None, |
| | video: Optional[str] = None, |
| | remix_target_id: Optional[str] = None) -> str: |
| | """Submit generation task and return task ID immediately |
| | |
| | Args: |
| | model: Model name |
| | prompt: Generation prompt |
| | image: Base64 encoded image |
| | video: Base64 encoded video or video URL |
| | remix_target_id: Sora share link video ID for remix |
| | |
| | Returns: |
| | Task ID |
| | """ |
| | |
| | if model not in MODEL_CONFIG: |
| | raise ValueError(f"Invalid model: {model}") |
| |
|
| | model_config = MODEL_CONFIG[model] |
| | is_video = model_config["type"] == "video" |
| | is_image = model_config["type"] == "image" |
| |
|
| | |
| | if is_video and remix_target_id: |
| | return await self._submit_remix_task(remix_target_id, prompt, model_config) |
| |
|
| | |
| | token_obj = await self.load_balancer.select_token(for_image_generation=is_image, for_video_generation=is_video) |
| | if not token_obj: |
| | if is_image: |
| | raise Exception("No available tokens for image generation") |
| | else: |
| | raise Exception("No available tokens for video generation") |
| | |
| | |
| | |
| | |
| | |
| | pass_video_to_poll = False |
| | media_id = None |
| | |
| | |
| | if is_image: |
| | lock_acquired = await self.load_balancer.token_lock.acquire_lock(token_obj.id) |
| | if not lock_acquired: |
| | raise Exception(f"Failed to acquire lock for token {token_obj.id}") |
| |
|
| | try: |
| | |
| | if image: |
| | image_data = self._decode_base64_image(image) |
| | media_id = await self.sora_client.upload_image(image_data, token_obj.token) |
| |
|
| | |
| | task_id = None |
| | if is_video: |
| | n_frames = model_config.get("n_frames", 300) |
| | |
| | |
| | task_id = await self.sora_client.generate_video( |
| | prompt, token_obj.token, |
| | orientation=model_config["orientation"], |
| | media_id=media_id, |
| | n_frames=n_frames |
| | ) |
| | else: |
| | task_id = await self.sora_client.generate_image( |
| | prompt, token_obj.token, |
| | width=model_config["width"], |
| | height=model_config["height"], |
| | media_id=media_id |
| | ) |
| | |
| | |
| | task = Task( |
| | task_id=task_id, |
| | token_id=token_obj.id, |
| | model=model, |
| | prompt=prompt, |
| | status="processing", |
| | progress=0.0 |
| | ) |
| | await self.db.create_task(task) |
| | |
| | |
| | await self.token_manager.record_usage(token_obj.id, is_video=is_video) |
| | |
| | |
| | polling_gen = self._poll_task_result( |
| | task_id, token_obj.token, is_video, stream=False, prompt=prompt, token_id=token_obj.id |
| | ) |
| | asyncio.create_task(self._run_background_poll(polling_gen)) |
| | |
| | return task_id |
| |
|
| | except Exception as e: |
| | if is_image and token_obj: |
| | await self.load_balancer.token_lock.release_lock(token_obj.id) |
| | raise e |
| |
|
| | async def _submit_remix_task(self, remix_target_id: str, prompt: str, model_config: Dict) -> str: |
| | """Submit remix task""" |
| | token_obj = await self.load_balancer.select_token(for_video_generation=True) |
| | if not token_obj: |
| | raise Exception("No available tokens for remix generation") |
| |
|
| | try: |
| | clean_prompt = self._clean_remix_link_from_prompt(prompt) |
| | n_frames = model_config.get("n_frames", 300) |
| |
|
| | |
| | task_id = await self.sora_client.remix_video( |
| | remix_target_id=remix_target_id, |
| | prompt=clean_prompt, |
| | token=token_obj.token, |
| | orientation=model_config["orientation"], |
| | n_frames=n_frames |
| | ) |
| |
|
| | |
| | task = Task( |
| | task_id=task_id, |
| | token_id=token_obj.id, |
| | model=f"sora-video-{model_config['orientation']}", |
| | prompt=f"remix:{remix_target_id} {clean_prompt}", |
| | status="processing", |
| | progress=0.0 |
| | ) |
| | await self.db.create_task(task) |
| |
|
| | |
| | await self.token_manager.record_usage(token_obj.id, is_video=True) |
| |
|
| | |
| | polling_gen = self._poll_task_result( |
| | task_id, token_obj.token, True, False, clean_prompt, token_obj.id |
| | ) |
| | asyncio.create_task(self._run_background_poll(polling_gen)) |
| |
|
| | return task_id |
| | |
| | except Exception as e: |
| | if token_obj: |
| | await self.token_manager.record_error(token_obj.id) |
| | raise e |
| |
|
| |
|
| | async def handle_generation(self, model: str, prompt: str, |
| | image: Optional[str] = None, |
| | video: Optional[str] = None, |
| | remix_target_id: Optional[str] = None, |
| | stream: bool = True) -> AsyncGenerator[str, None]: |
| | """Handle generation request |
| | |
| | Args: |
| | model: Model name |
| | prompt: Generation prompt |
| | image: Base64 encoded image |
| | video: Base64 encoded video or video URL |
| | remix_target_id: Sora share link video ID for remix |
| | stream: Whether to stream response |
| | """ |
| | start_time = time.time() |
| |
|
| | |
| | if model not in MODEL_CONFIG: |
| | raise ValueError(f"Invalid model: {model}") |
| |
|
| | model_config = MODEL_CONFIG[model] |
| | is_video = model_config["type"] == "video" |
| | is_image = model_config["type"] == "image" |
| |
|
| | |
| | if not stream: |
| | available = await self.check_token_availability(is_image, is_video) |
| | if available: |
| | if is_image: |
| | message = "All tokens available for image generation. Please enable streaming to use the generation feature." |
| | else: |
| | message = "All tokens available for video generation. Please enable streaming to use the generation feature." |
| | else: |
| | if is_image: |
| | message = "No available models for image generation" |
| | else: |
| | message = "No available models for video generation" |
| |
|
| | yield self._format_non_stream_response(message, is_availability_check=True) |
| | return |
| |
|
| | |
| | if is_video: |
| | |
| | if remix_target_id: |
| | async for chunk in self._handle_remix(remix_target_id, prompt, model_config): |
| | yield chunk |
| | return |
| |
|
| | |
| | if video: |
| | |
| | video_data = self._decode_base64_video(video) if video.startswith("data:") or not video.startswith("http") else video |
| |
|
| | |
| | if not prompt: |
| | async for chunk in self._handle_character_creation_only(video_data, model_config): |
| | yield chunk |
| | return |
| | else: |
| | |
| | async for chunk in self._handle_character_and_video_generation(video_data, prompt, model_config): |
| | yield chunk |
| | return |
| |
|
| | |
| | |
| | token_obj = await self.load_balancer.select_token(for_image_generation=is_image, for_video_generation=is_video) |
| | if not token_obj: |
| | if is_image: |
| | raise Exception("No available tokens for image generation. All tokens are either disabled, cooling down, locked, or expired.") |
| | else: |
| | raise Exception("No available tokens for video generation. All tokens are either disabled, cooling down, Sora2 quota exhausted, don't support Sora2, or expired.") |
| |
|
| | |
| | if is_image: |
| | lock_acquired = await self.load_balancer.token_lock.acquire_lock(token_obj.id) |
| | if not lock_acquired: |
| | raise Exception(f"Failed to acquire lock for token {token_obj.id}") |
| |
|
| | task_id = None |
| | is_first_chunk = True |
| |
|
| | try: |
| | |
| | media_id = None |
| | if image: |
| | if stream: |
| | yield self._format_stream_chunk( |
| | reasoning_content="**Image Upload Begins**\n\nUploading image to server...\n", |
| | is_first=is_first_chunk |
| | ) |
| | is_first_chunk = False |
| |
|
| | image_data = self._decode_base64_image(image) |
| | media_id = await self.sora_client.upload_image(image_data, token_obj.token) |
| |
|
| | if stream: |
| | yield self._format_stream_chunk( |
| | reasoning_content="Image uploaded successfully. Proceeding to generation...\n" |
| | ) |
| |
|
| | |
| | if stream: |
| | if is_first_chunk: |
| | yield self._format_stream_chunk( |
| | reasoning_content="**Generation Process Begins**\n\nInitializing generation request...\n", |
| | is_first=True |
| | ) |
| | is_first_chunk = False |
| | else: |
| | yield self._format_stream_chunk( |
| | reasoning_content="**Generation Process Begins**\n\nInitializing generation request...\n" |
| | ) |
| | |
| | if is_video: |
| | |
| | n_frames = model_config.get("n_frames", 300) |
| |
|
| | task_id = await self.sora_client.generate_video( |
| | prompt, token_obj.token, |
| | orientation=model_config["orientation"], |
| | media_id=media_id, |
| | n_frames=n_frames |
| | ) |
| | else: |
| | task_id = await self.sora_client.generate_image( |
| | prompt, token_obj.token, |
| | width=model_config["width"], |
| | height=model_config["height"], |
| | media_id=media_id |
| | ) |
| | |
| | |
| | task = Task( |
| | task_id=task_id, |
| | token_id=token_obj.id, |
| | model=model, |
| | prompt=prompt, |
| | status="processing", |
| | progress=0.0 |
| | ) |
| | await self.db.create_task(task) |
| | |
| | |
| | await self.token_manager.record_usage(token_obj.id, is_video=is_video) |
| | |
| | |
| | async for chunk in self._poll_task_result(task_id, token_obj.token, is_video, stream, prompt, token_obj.id): |
| | yield chunk |
| | |
| | |
| | await self.token_manager.record_success(token_obj.id, is_video=is_video) |
| |
|
| | |
| | if is_image: |
| | await self.load_balancer.token_lock.release_lock(token_obj.id) |
| |
|
| | |
| | duration = time.time() - start_time |
| | await self._log_request( |
| | token_obj.id, |
| | f"generate_{model_config['type']}", |
| | {"model": model, "prompt": prompt, "has_image": image is not None}, |
| | {"task_id": task_id, "status": "success"}, |
| | 200, |
| | duration |
| | ) |
| |
|
| | except Exception as e: |
| | |
| | if is_image and token_obj: |
| | await self.load_balancer.token_lock.release_lock(token_obj.id) |
| |
|
| | |
| | if token_obj: |
| | await self.token_manager.record_error(token_obj.id) |
| |
|
| | |
| | duration = time.time() - start_time |
| | await self._log_request( |
| | token_obj.id if token_obj else None, |
| | f"generate_{model_config['type'] if model_config else 'unknown'}", |
| | {"model": model, "prompt": prompt, "has_image": image is not None}, |
| | {"error": str(e)}, |
| | 500, |
| | duration |
| | ) |
| | raise e |
| | |
| | async def _poll_task_result(self, task_id: str, token: str, is_video: bool, |
| | stream: bool, prompt: str, token_id: int = None) -> AsyncGenerator[str, None]: |
| | """Poll for task result with timeout""" |
| | |
| | timeout = config.video_timeout if is_video else config.image_timeout |
| | poll_interval = config.poll_interval |
| | max_attempts = int(timeout / poll_interval) |
| | last_progress = 0 |
| | start_time = time.time() |
| | last_heartbeat_time = start_time |
| | heartbeat_interval = 10 |
| | last_status_output_time = start_time |
| | video_status_interval = 30 |
| |
|
| | debug_logger.log_info(f"Starting task polling: task_id={task_id}, is_video={is_video}, timeout={timeout}s, max_attempts={max_attempts}") |
| |
|
| | |
| | if is_video: |
| | watermark_free_config = await self.db.get_watermark_free_config() |
| | debug_logger.log_info(f"Watermark-free mode: {'ENABLED' if watermark_free_config.watermark_free_enabled else 'DISABLED'}") |
| |
|
| | for attempt in range(max_attempts): |
| | |
| | elapsed_time = time.time() - start_time |
| | if elapsed_time > timeout: |
| | debug_logger.log_error( |
| | error_message=f"Task timeout: {elapsed_time:.1f}s > {timeout}s", |
| | status_code=408, |
| | response_text=f"Task {task_id} timed out after {elapsed_time:.1f} seconds" |
| | ) |
| | |
| | if not is_video and token_id: |
| | await self.load_balancer.token_lock.release_lock(token_id) |
| | debug_logger.log_info(f"Released lock for token {token_id} due to timeout") |
| |
|
| | await self.db.update_task(task_id, "failed", 0, error_message=f"Generation timeout after {elapsed_time:.1f} seconds") |
| | raise Exception(f"Upstream API timeout: Generation exceeded {timeout} seconds limit") |
| |
|
| |
|
| | await asyncio.sleep(poll_interval) |
| |
|
| | try: |
| | if is_video: |
| | |
| | pending_tasks = await self.sora_client.get_pending_tasks(token) |
| |
|
| | |
| | task_found = False |
| | for task in pending_tasks: |
| | if task.get("id") == task_id: |
| | task_found = True |
| | |
| | progress_pct = task.get("progress_pct") |
| | |
| | if progress_pct is None: |
| | progress_pct = 0 |
| | else: |
| | progress_pct = int(progress_pct * 100) |
| |
|
| | |
| | last_progress = progress_pct |
| | status = task.get("status", "processing") |
| |
|
| | |
| | current_time = time.time() |
| | if stream and (current_time - last_status_output_time >= video_status_interval): |
| | last_status_output_time = current_time |
| | debug_logger.log_info(f"Task {task_id} progress: {progress_pct}% (status: {status})") |
| | yield self._format_stream_chunk( |
| | reasoning_content=f"**Video Generation Progress**: {progress_pct}% ({status})\n" |
| | ) |
| | break |
| |
|
| | |
| | if not task_found: |
| | debug_logger.log_info(f"Task {task_id} not found in pending tasks, fetching from drafts...") |
| | result = await self.sora_client.get_video_drafts(token) |
| | items = result.get("items", []) |
| |
|
| | |
| | for item in items: |
| | if item.get("task_id") == task_id: |
| | |
| | watermark_free_config = await self.db.get_watermark_free_config() |
| | watermark_free_enabled = watermark_free_config.watermark_free_enabled |
| |
|
| | if watermark_free_enabled: |
| | |
| | debug_logger.log_info(f"Entering watermark-free mode for task {task_id}") |
| | generation_id = item.get("id") |
| | debug_logger.log_info(f"Generation ID: {generation_id}") |
| | if not generation_id: |
| | raise Exception("Generation ID not found in video draft") |
| |
|
| | if stream: |
| | yield self._format_stream_chunk( |
| | reasoning_content="**Video Generation Completed**\n\nWatermark-free mode enabled. Publishing video to get watermark-free version...\n" |
| | ) |
| |
|
| | |
| | watermark_config = await self.db.get_watermark_free_config() |
| | parse_method = watermark_config.parse_method or "third_party" |
| |
|
| | |
| | try: |
| | debug_logger.log_info(f"Calling post_video_for_watermark_free with generation_id={generation_id}, prompt={prompt[:50]}...") |
| | post_id = await self.sora_client.post_video_for_watermark_free( |
| | generation_id=generation_id, |
| | prompt=prompt, |
| | token=token |
| | ) |
| | debug_logger.log_info(f"Received post_id: {post_id}") |
| |
|
| | if not post_id: |
| | raise Exception("Failed to get post ID from publish API") |
| |
|
| | |
| | if parse_method == "custom": |
| | |
| | if not watermark_config.custom_parse_url or not watermark_config.custom_parse_token: |
| | raise Exception("Custom parse server URL or token not configured") |
| |
|
| | if stream: |
| | yield self._format_stream_chunk( |
| | reasoning_content=f"Video published successfully. Post ID: {post_id}\nUsing custom parse server to get watermark-free URL...\n" |
| | ) |
| |
|
| | debug_logger.log_info(f"Using custom parse server: {watermark_config.custom_parse_url}") |
| | watermark_free_url = await self.sora_client.get_watermark_free_url_custom( |
| | parse_url=watermark_config.custom_parse_url, |
| | parse_token=watermark_config.custom_parse_token, |
| | post_id=post_id |
| | ) |
| | else: |
| | |
| | watermark_free_url = f"https://oscdn2.dyysy.com/MP4/{post_id}.mp4" |
| | debug_logger.log_info(f"Using third-party parse server") |
| |
|
| | debug_logger.log_info(f"Watermark-free URL: {watermark_free_url}") |
| |
|
| | if stream: |
| | yield self._format_stream_chunk( |
| | reasoning_content=f"Video published successfully. Post ID: {post_id}\nNow {'caching' if config.cache_enabled else 'preparing'} watermark-free video...\n" |
| | ) |
| |
|
| | |
| | if config.cache_enabled: |
| | try: |
| | cached_filename = await self.file_cache.download_and_cache(watermark_free_url, "video") |
| | local_url = f"{self._get_base_url()}/tmp/{cached_filename}" |
| | if stream: |
| | yield self._format_stream_chunk( |
| | reasoning_content="Watermark-free video cached successfully. Preparing final response...\n" |
| | ) |
| |
|
| | |
| | try: |
| | debug_logger.log_info(f"Deleting published post: {post_id}") |
| | await self.sora_client.delete_post(post_id, token) |
| | debug_logger.log_info(f"Published post deleted successfully: {post_id}") |
| | if stream: |
| | yield self._format_stream_chunk( |
| | reasoning_content="Published post deleted successfully.\n" |
| | ) |
| | except Exception as delete_error: |
| | debug_logger.log_error( |
| | error_message=f"Failed to delete published post {post_id}: {str(delete_error)}", |
| | status_code=500, |
| | response_text=str(delete_error) |
| | ) |
| | if stream: |
| | yield self._format_stream_chunk( |
| | reasoning_content=f"Warning: Failed to delete published post - {str(delete_error)}\n" |
| | ) |
| | except Exception as cache_error: |
| | |
| | local_url = watermark_free_url |
| | if stream: |
| | yield self._format_stream_chunk( |
| | reasoning_content=f"Warning: Failed to cache file - {str(cache_error)}\nUsing original watermark-free URL instead...\n" |
| | ) |
| | else: |
| | |
| | local_url = watermark_free_url |
| | if stream: |
| | yield self._format_stream_chunk( |
| | reasoning_content="Cache is disabled. Using watermark-free URL directly...\n" |
| | ) |
| |
|
| | except Exception as publish_error: |
| | |
| | debug_logger.log_error( |
| | error_message=f"Watermark-free mode failed: {str(publish_error)}", |
| | status_code=500, |
| | response_text=str(publish_error) |
| | ) |
| | if stream: |
| | yield self._format_stream_chunk( |
| | reasoning_content=f"Warning: Failed to get watermark-free version - {str(publish_error)}\nFalling back to normal video...\n" |
| | ) |
| | |
| | url = item.get("downloadable_url") or item.get("url") |
| | if not url: |
| | raise Exception("Video URL not found") |
| | if config.cache_enabled: |
| | try: |
| | cached_filename = await self.file_cache.download_and_cache(url, "video") |
| | local_url = f"{self._get_base_url()}/tmp/{cached_filename}" |
| | except Exception as cache_error: |
| | local_url = url |
| | else: |
| | local_url = url |
| | else: |
| | |
| | url = item.get("downloadable_url") or item.get("url") |
| | if url: |
| | |
| | if config.cache_enabled: |
| | if stream: |
| | yield self._format_stream_chunk( |
| | reasoning_content="**Video Generation Completed**\n\nVideo generation successful. Now caching the video file...\n" |
| | ) |
| |
|
| | try: |
| | cached_filename = await self.file_cache.download_and_cache(url, "video") |
| | local_url = f"{self._get_base_url()}/tmp/{cached_filename}" |
| | if stream: |
| | yield self._format_stream_chunk( |
| | reasoning_content="Video file cached successfully. Preparing final response...\n" |
| | ) |
| | except Exception as cache_error: |
| | |
| | local_url = url |
| | if stream: |
| | yield self._format_stream_chunk( |
| | reasoning_content=f"Warning: Failed to cache file - {str(cache_error)}\nUsing original URL instead...\n" |
| | ) |
| | else: |
| | |
| | local_url = url |
| | if stream: |
| | yield self._format_stream_chunk( |
| | reasoning_content="**Video Generation Completed**\n\nCache is disabled. Using original URL directly...\n" |
| | ) |
| |
|
| | |
| | await self.db.update_task( |
| | task_id, "completed", 100.0, |
| | result_urls=json.dumps([local_url]) |
| | ) |
| |
|
| | if stream: |
| | |
| | yield self._format_stream_chunk( |
| | content=f"```html\n<video src='{local_url}' controls></video>\n```", |
| | finish_reason="STOP" |
| | ) |
| | yield "data: [DONE]\n\n" |
| | return |
| | else: |
| | result = await self.sora_client.get_image_tasks(token) |
| | task_responses = result.get("task_responses", []) |
| |
|
| | |
| | task_found = False |
| | for task_resp in task_responses: |
| | if task_resp.get("id") == task_id: |
| | task_found = True |
| | status = task_resp.get("status") |
| | print("status:"+status+",progress_pct:"+task_resp.get("progress_pct", 0)) |
| | progress = task_resp.get("progress_pct", 0) * 100 |
| |
|
| | if status == "succeeded": |
| | |
| | generations = task_resp.get("generations", []) |
| | urls = [gen.get("url") for gen in generations if gen.get("url")] |
| |
|
| | if urls: |
| | |
| | if stream: |
| | yield self._format_stream_chunk( |
| | reasoning_content=f"**Image Generation Completed**\n\nImage generation successful. Now caching {len(urls)} image(s)...\n" |
| | ) |
| |
|
| | base_url = self._get_base_url() |
| | local_urls = [] |
| |
|
| | |
| | if config.cache_enabled: |
| | for idx, url in enumerate(urls): |
| | try: |
| | cached_filename = await self.file_cache.download_and_cache(url, "image") |
| | local_url = f"{base_url}/tmp/{cached_filename}" |
| | local_urls.append(local_url) |
| | if stream and len(urls) > 1: |
| | yield self._format_stream_chunk( |
| | reasoning_content=f"Cached image {idx + 1}/{len(urls)}...\n" |
| | ) |
| | except Exception as cache_error: |
| | |
| | local_urls.append(url) |
| | if stream: |
| | yield self._format_stream_chunk( |
| | reasoning_content=f"Warning: Failed to cache image {idx + 1} - {str(cache_error)}\nUsing original URL instead...\n" |
| | ) |
| |
|
| | if stream and all(u.startswith(base_url) for u in local_urls): |
| | yield self._format_stream_chunk( |
| | reasoning_content="All images cached successfully. Preparing final response...\n" |
| | ) |
| | else: |
| | |
| | local_urls = urls |
| | if stream: |
| | yield self._format_stream_chunk( |
| | reasoning_content="Cache is disabled. Using original URLs directly...\n" |
| | ) |
| |
|
| | await self.db.update_task( |
| | task_id, "completed", 100.0, |
| | result_urls=json.dumps(local_urls) |
| | ) |
| |
|
| | if stream: |
| | |
| | content_markdown = "\n".join([f"" for url in local_urls]) |
| | yield self._format_stream_chunk( |
| | content=content_markdown, |
| | finish_reason="STOP" |
| | ) |
| | yield "data: [DONE]\n\n" |
| | return |
| |
|
| | elif status == "failed": |
| | error_msg = task_resp.get("error_message", "Generation failed") |
| | await self.db.update_task(task_id, "failed", progress, error_message=error_msg) |
| | raise Exception(error_msg) |
| |
|
| | elif status == "processing": |
| | |
| | if progress > last_progress + 20: |
| | last_progress = progress |
| | await self.db.update_task(task_id, "processing", progress) |
| |
|
| | if stream: |
| | yield self._format_stream_chunk( |
| | reasoning_content=f"**Processing**\n\nGeneration in progress: {progress:.0f}% completed...\n" |
| | ) |
| |
|
| | |
| | if not is_video and stream: |
| | current_time = time.time() |
| | if current_time - last_heartbeat_time >= heartbeat_interval: |
| | last_heartbeat_time = current_time |
| | elapsed = int(current_time - start_time) |
| | yield self._format_stream_chunk( |
| | reasoning_content=f"Image generation in progress... ({elapsed}s elapsed)\n" |
| | ) |
| |
|
| | |
| | if not task_found and not is_video and stream: |
| | current_time = time.time() |
| | if current_time - last_heartbeat_time >= heartbeat_interval: |
| | last_heartbeat_time = current_time |
| | elapsed = int(current_time - start_time) |
| | yield self._format_stream_chunk( |
| | reasoning_content=f"Image generation in progress... ({elapsed}s elapsed)\n" |
| | ) |
| |
|
| | |
| | if stream and attempt % 10 == 0: |
| | estimated_progress = min(90, (attempt / max_attempts) * 100) |
| | if estimated_progress > last_progress + 20: |
| | last_progress = estimated_progress |
| | yield self._format_stream_chunk( |
| | reasoning_content=f"**Processing**\n\nGeneration in progress: {estimated_progress:.0f}% completed (estimated)...\n" |
| | ) |
| | |
| | except Exception as e: |
| | if attempt >= max_attempts - 1: |
| | raise e |
| | continue |
| |
|
| | |
| | if not is_video and token_id: |
| | await self.load_balancer.token_lock.release_lock(token_id) |
| | debug_logger.log_info(f"Released lock for token {token_id} due to max attempts reached") |
| |
|
| | await self.db.update_task(task_id, "failed", 0, error_message=f"Generation timeout after {timeout} seconds") |
| | raise Exception(f"Upstream API timeout: Generation exceeded {timeout} seconds limit") |
| | |
| | def _format_stream_chunk(self, content: str = None, reasoning_content: str = None, |
| | finish_reason: str = None, is_first: bool = False) -> str: |
| | """Format streaming response chunk |
| | |
| | Args: |
| | content: Final response content (for user-facing output) |
| | reasoning_content: Thinking/reasoning process content |
| | finish_reason: Finish reason (e.g., "STOP") |
| | is_first: Whether this is the first chunk (includes role) |
| | """ |
| | chunk_id = f"chatcmpl-{int(datetime.now().timestamp() * 1000)}" |
| |
|
| | delta = {} |
| |
|
| | |
| | if is_first: |
| | delta["role"] = "assistant" |
| |
|
| | |
| | if content is not None: |
| | delta["content"] = content |
| | else: |
| | delta["content"] = None |
| |
|
| | if reasoning_content is not None: |
| | delta["reasoning_content"] = reasoning_content |
| | else: |
| | delta["reasoning_content"] = None |
| |
|
| | delta["tool_calls"] = None |
| |
|
| | response = { |
| | "id": chunk_id, |
| | "object": "chat.completion.chunk", |
| | "created": int(datetime.now().timestamp()), |
| | "model": "sora", |
| | "choices": [{ |
| | "index": 0, |
| | "delta": delta, |
| | "finish_reason": finish_reason, |
| | "native_finish_reason": finish_reason |
| | }], |
| | "usage": { |
| | "prompt_tokens": 0 |
| | } |
| | } |
| |
|
| | |
| | if finish_reason: |
| | response["usage"]["completion_tokens"] = 1 |
| | response["usage"]["total_tokens"] = 1 |
| |
|
| | return f'data: {json.dumps(response)}\n\n' |
| | |
| | def _format_non_stream_response(self, content: str, media_type: str = None, is_availability_check: bool = False) -> str: |
| | """Format non-streaming response |
| | |
| | Args: |
| | content: Response content (either URL for generation or message for availability check) |
| | media_type: Type of media ("video", "image") - only used for generation responses |
| | is_availability_check: Whether this is an availability check response |
| | """ |
| | if not is_availability_check: |
| | |
| | if media_type == "video": |
| | content = f"```html\n<video src='{content}' controls></video>\n```" |
| | else: |
| | content = f"" |
| |
|
| | response = { |
| | "id": f"chatcmpl-{datetime.now().timestamp()}", |
| | "object": "chat.completion", |
| | "created": int(datetime.now().timestamp()), |
| | "model": "sora", |
| | "choices": [{ |
| | "index": 0, |
| | "message": { |
| | "role": "assistant", |
| | "content": content |
| | }, |
| | "finish_reason": "stop" |
| | }] |
| | } |
| | return json.dumps(response) |
| |
|
| | async def _log_request(self, token_id: Optional[int], operation: str, |
| | request_data: Dict[str, Any], response_data: Dict[str, Any], |
| | status_code: int, duration: float): |
| | """Log request to database""" |
| | try: |
| | log = RequestLog( |
| | token_id=token_id, |
| | operation=operation, |
| | request_body=json.dumps(request_data), |
| | response_body=json.dumps(response_data), |
| | status_code=status_code, |
| | duration=duration |
| | ) |
| | await self.db.log_request(log) |
| | except Exception as e: |
| | |
| | print(f"Failed to log request: {e}") |
| |
|
| | |
| |
|
| | async def _handle_character_creation_only(self, video_data, model_config: Dict) -> AsyncGenerator[str, None]: |
| | """Handle character creation only (no video generation) |
| | |
| | Flow: |
| | 1. Download video if URL, or use bytes directly |
| | 2. Upload video to create character |
| | 3. Poll for character processing |
| | 4. Download and cache avatar |
| | 5. Upload avatar |
| | 6. Finalize character |
| | 7. Set character as public |
| | 8. Return success message |
| | """ |
| | token_obj = await self.load_balancer.select_token(for_video_generation=True) |
| | if not token_obj: |
| | raise Exception("No available tokens for character creation") |
| |
|
| | try: |
| | yield self._format_stream_chunk( |
| | reasoning_content="**Character Creation Begins**\n\nInitializing character creation...\n", |
| | is_first=True |
| | ) |
| |
|
| | |
| | if isinstance(video_data, str): |
| | |
| | yield self._format_stream_chunk( |
| | reasoning_content="Downloading video file...\n" |
| | ) |
| | video_bytes = await self._download_file(video_data) |
| | else: |
| | video_bytes = video_data |
| |
|
| | |
| | yield self._format_stream_chunk( |
| | reasoning_content="Uploading video file...\n" |
| | ) |
| | cameo_id = await self.sora_client.upload_character_video(video_bytes, token_obj.token) |
| | debug_logger.log_info(f"Video uploaded, cameo_id: {cameo_id}") |
| |
|
| | |
| | yield self._format_stream_chunk( |
| | reasoning_content="Processing video to extract character...\n" |
| | ) |
| | cameo_status = await self._poll_cameo_status(cameo_id, token_obj.token) |
| | debug_logger.log_info(f"Cameo status: {cameo_status}") |
| |
|
| | |
| | username_hint = cameo_status.get("username_hint", "character") |
| | display_name = cameo_status.get("display_name_hint", "Character") |
| |
|
| | |
| | username = self._process_character_username(username_hint) |
| |
|
| | |
| | yield self._format_stream_chunk( |
| | reasoning_content=f"✨ 角色已识别: {display_name} (@{username})\n" |
| | ) |
| |
|
| | |
| | yield self._format_stream_chunk( |
| | reasoning_content="Downloading character avatar...\n" |
| | ) |
| | profile_asset_url = cameo_status.get("profile_asset_url") |
| | if not profile_asset_url: |
| | raise Exception("Profile asset URL not found in cameo status") |
| |
|
| | avatar_data = await self.sora_client.download_character_image(profile_asset_url) |
| | debug_logger.log_info(f"Avatar downloaded, size: {len(avatar_data)} bytes") |
| |
|
| | |
| | yield self._format_stream_chunk( |
| | reasoning_content="Uploading character avatar...\n" |
| | ) |
| | asset_pointer = await self.sora_client.upload_character_image(avatar_data, token_obj.token) |
| | debug_logger.log_info(f"Avatar uploaded, asset_pointer: {asset_pointer}") |
| |
|
| | |
| | yield self._format_stream_chunk( |
| | reasoning_content="Finalizing character creation...\n" |
| | ) |
| | |
| | instruction_set = cameo_status.get("instruction_set_hint") or cameo_status.get("instruction_set") |
| |
|
| | character_id = await self.sora_client.finalize_character( |
| | cameo_id=cameo_id, |
| | username=username, |
| | display_name=display_name, |
| | profile_asset_pointer=asset_pointer, |
| | instruction_set=instruction_set, |
| | token=token_obj.token |
| | ) |
| | debug_logger.log_info(f"Character finalized, character_id: {character_id}") |
| |
|
| | |
| | yield self._format_stream_chunk( |
| | reasoning_content="Setting character as public...\n" |
| | ) |
| | await self.sora_client.set_character_public(cameo_id, token_obj.token) |
| | debug_logger.log_info(f"Character set as public") |
| |
|
| | |
| | yield self._format_stream_chunk( |
| | content=f"角色创建成功,角色名@{username}", |
| | finish_reason="STOP" |
| | ) |
| | yield "data: [DONE]\n\n" |
| |
|
| | except Exception as e: |
| | debug_logger.log_error( |
| | error_message=f"Character creation failed: {str(e)}", |
| | status_code=500, |
| | response_text=str(e) |
| | ) |
| | raise |
| |
|
| | async def _handle_character_and_video_generation(self, video_data, prompt: str, model_config: Dict) -> AsyncGenerator[str, None]: |
| | """Handle character creation and video generation |
| | |
| | Flow: |
| | 1. Download video if URL, or use bytes directly |
| | 2. Upload video to create character |
| | 3. Poll for character processing |
| | 4. Download and cache avatar |
| | 5. Upload avatar |
| | 6. Finalize character |
| | 7. Generate video with character (@username + prompt) |
| | 8. Delete character |
| | 9. Return video result |
| | """ |
| | token_obj = await self.load_balancer.select_token(for_video_generation=True) |
| | if not token_obj: |
| | raise Exception("No available tokens for video generation") |
| |
|
| | character_id = None |
| | try: |
| | yield self._format_stream_chunk( |
| | reasoning_content="**Character Creation and Video Generation Begins**\n\nInitializing...\n", |
| | is_first=True |
| | ) |
| |
|
| | |
| | if isinstance(video_data, str): |
| | |
| | yield self._format_stream_chunk( |
| | reasoning_content="Downloading video file...\n" |
| | ) |
| | video_bytes = await self._download_file(video_data) |
| | else: |
| | video_bytes = video_data |
| |
|
| | |
| | yield self._format_stream_chunk( |
| | reasoning_content="Uploading video file...\n" |
| | ) |
| | cameo_id = await self.sora_client.upload_character_video(video_bytes, token_obj.token) |
| | debug_logger.log_info(f"Video uploaded, cameo_id: {cameo_id}") |
| |
|
| | |
| | yield self._format_stream_chunk( |
| | reasoning_content="Processing video to extract character...\n" |
| | ) |
| | cameo_status = await self._poll_cameo_status(cameo_id, token_obj.token) |
| | debug_logger.log_info(f"Cameo status: {cameo_status}") |
| |
|
| | |
| | username_hint = cameo_status.get("username_hint", "character") |
| | display_name = cameo_status.get("display_name_hint", "Character") |
| |
|
| | |
| | username = self._process_character_username(username_hint) |
| |
|
| | |
| | yield self._format_stream_chunk( |
| | reasoning_content=f"✨ 角色已识别: {display_name} (@{username})\n" |
| | ) |
| |
|
| | |
| | yield self._format_stream_chunk( |
| | reasoning_content="Downloading character avatar...\n" |
| | ) |
| | profile_asset_url = cameo_status.get("profile_asset_url") |
| | if not profile_asset_url: |
| | raise Exception("Profile asset URL not found in cameo status") |
| |
|
| | avatar_data = await self.sora_client.download_character_image(profile_asset_url) |
| | debug_logger.log_info(f"Avatar downloaded, size: {len(avatar_data)} bytes") |
| |
|
| | |
| | yield self._format_stream_chunk( |
| | reasoning_content="Uploading character avatar...\n" |
| | ) |
| | asset_pointer = await self.sora_client.upload_character_image(avatar_data, token_obj.token) |
| | debug_logger.log_info(f"Avatar uploaded, asset_pointer: {asset_pointer}") |
| |
|
| | |
| | yield self._format_stream_chunk( |
| | reasoning_content="Finalizing character creation...\n" |
| | ) |
| | |
| | instruction_set = cameo_status.get("instruction_set_hint") or cameo_status.get("instruction_set") |
| |
|
| | character_id = await self.sora_client.finalize_character( |
| | cameo_id=cameo_id, |
| | username=username, |
| | display_name=display_name, |
| | profile_asset_pointer=asset_pointer, |
| | instruction_set=instruction_set, |
| | token=token_obj.token |
| | ) |
| | debug_logger.log_info(f"Character finalized, character_id: {character_id}") |
| |
|
| | |
| | yield self._format_stream_chunk( |
| | reasoning_content="**Video Generation Process Begins**\n\nGenerating video with character...\n" |
| | ) |
| |
|
| | |
| | full_prompt = f"@{username} {prompt}" |
| | debug_logger.log_info(f"Full prompt: {full_prompt}") |
| |
|
| | |
| | n_frames = model_config.get("n_frames", 300) |
| |
|
| | task_id = await self.sora_client.generate_video( |
| | full_prompt, token_obj.token, |
| | orientation=model_config["orientation"], |
| | n_frames=n_frames |
| | ) |
| | debug_logger.log_info(f"Video generation started, task_id: {task_id}") |
| |
|
| | |
| | task = Task( |
| | task_id=task_id, |
| | token_id=token_obj.id, |
| | model=f"sora-video-{model_config['orientation']}", |
| | prompt=full_prompt, |
| | status="processing", |
| | progress=0.0 |
| | ) |
| | await self.db.create_task(task) |
| |
|
| | |
| | await self.token_manager.record_usage(token_obj.id, is_video=True) |
| |
|
| | |
| | async for chunk in self._poll_task_result(task_id, token_obj.token, True, True, full_prompt, token_obj.id): |
| | yield chunk |
| |
|
| | |
| | await self.token_manager.record_success(token_obj.id, is_video=True) |
| |
|
| | except Exception as e: |
| | |
| | if token_obj: |
| | await self.token_manager.record_error(token_obj.id) |
| | debug_logger.log_error( |
| | error_message=f"Character and video generation failed: {str(e)}", |
| | status_code=500, |
| | response_text=str(e) |
| | ) |
| | raise |
| | finally: |
| | |
| | if character_id: |
| | try: |
| | yield self._format_stream_chunk( |
| | reasoning_content="Cleaning up temporary character...\n" |
| | ) |
| | await self.sora_client.delete_character(character_id, token_obj.token) |
| | debug_logger.log_info(f"Character deleted: {character_id}") |
| | except Exception as e: |
| | debug_logger.log_error( |
| | error_message=f"Failed to delete character: {str(e)}", |
| | status_code=500, |
| | response_text=str(e) |
| | ) |
| |
|
| | async def _handle_remix(self, remix_target_id: str, prompt: str, model_config: Dict) -> AsyncGenerator[str, None]: |
| | """Handle remix video generation |
| | |
| | Flow: |
| | 1. Select token |
| | 2. Clean remix link from prompt |
| | 3. Call remix API |
| | 4. Poll for results |
| | 5. Return video result |
| | """ |
| | token_obj = await self.load_balancer.select_token(for_video_generation=True) |
| | if not token_obj: |
| | raise Exception("No available tokens for remix generation") |
| |
|
| | task_id = None |
| | try: |
| | yield self._format_stream_chunk( |
| | reasoning_content="**Remix Generation Process Begins**\n\nInitializing remix request...\n", |
| | is_first=True |
| | ) |
| |
|
| | |
| | clean_prompt = self._clean_remix_link_from_prompt(prompt) |
| |
|
| | |
| | n_frames = model_config.get("n_frames", 300) |
| |
|
| | |
| | yield self._format_stream_chunk( |
| | reasoning_content="Sending remix request to server...\n" |
| | ) |
| | task_id = await self.sora_client.remix_video( |
| | remix_target_id=remix_target_id, |
| | prompt=clean_prompt, |
| | token=token_obj.token, |
| | orientation=model_config["orientation"], |
| | n_frames=n_frames |
| | ) |
| | debug_logger.log_info(f"Remix generation started, task_id: {task_id}") |
| |
|
| | |
| | task = Task( |
| | task_id=task_id, |
| | token_id=token_obj.id, |
| | model=f"sora-video-{model_config['orientation']}", |
| | prompt=f"remix:{remix_target_id} {clean_prompt}", |
| | status="processing", |
| | progress=0.0 |
| | ) |
| | await self.db.create_task(task) |
| |
|
| | |
| | await self.token_manager.record_usage(token_obj.id, is_video=True) |
| |
|
| | |
| | async for chunk in self._poll_task_result(task_id, token_obj.token, True, True, clean_prompt, token_obj.id): |
| | yield chunk |
| |
|
| | |
| | await self.token_manager.record_success(token_obj.id, is_video=True) |
| |
|
| | except Exception as e: |
| | |
| | if token_obj: |
| | await self.token_manager.record_error(token_obj.id) |
| | debug_logger.log_error( |
| | error_message=f"Remix generation failed: {str(e)}", |
| | status_code=500, |
| | response_text=str(e) |
| | ) |
| | raise |
| |
|
| | async def _poll_cameo_status(self, cameo_id: str, token: str, timeout: int = 600, poll_interval: int = 5) -> Dict[str, Any]: |
| | """Poll for cameo (character) processing status |
| | |
| | Args: |
| | cameo_id: The cameo ID |
| | token: Access token |
| | timeout: Maximum time to wait in seconds |
| | poll_interval: Time between polls in seconds |
| | |
| | Returns: |
| | Cameo status dictionary with display_name_hint, username_hint, profile_asset_url, instruction_set_hint |
| | """ |
| | start_time = time.time() |
| | max_attempts = int(timeout / poll_interval) |
| | consecutive_errors = 0 |
| | max_consecutive_errors = 3 |
| |
|
| | for attempt in range(max_attempts): |
| | elapsed_time = time.time() - start_time |
| | if elapsed_time > timeout: |
| | raise Exception(f"Cameo processing timeout after {elapsed_time:.1f} seconds") |
| |
|
| | await asyncio.sleep(poll_interval) |
| |
|
| | try: |
| | status = await self.sora_client.get_cameo_status(cameo_id, token) |
| | current_status = status.get("status") |
| | status_message = status.get("status_message", "") |
| |
|
| | |
| | consecutive_errors = 0 |
| |
|
| | debug_logger.log_info(f"Cameo status: {current_status} (message: {status_message}) (attempt {attempt + 1}/{max_attempts})") |
| |
|
| | |
| | |
| | if status_message == "Completed": |
| | debug_logger.log_info(f"Cameo processing completed (status: {current_status}, message: {status_message})") |
| | return status |
| |
|
| | |
| | if current_status == "finalized": |
| | debug_logger.log_info(f"Cameo processing completed (status: {current_status}, message: {status_message})") |
| | return status |
| |
|
| | except Exception as e: |
| | consecutive_errors += 1 |
| | error_msg = str(e) |
| |
|
| | |
| | debug_logger.log_error( |
| | error_message=f"Failed to get cameo status (attempt {attempt + 1}/{max_attempts}, consecutive errors: {consecutive_errors}): {error_msg}", |
| | status_code=500, |
| | response_text=error_msg |
| | ) |
| |
|
| | |
| | is_tls_error = "TLS" in error_msg or "curl" in error_msg or "OPENSSL" in error_msg |
| |
|
| | if is_tls_error: |
| | |
| | backoff_time = min(poll_interval * (2 ** (consecutive_errors - 1)), 30) |
| | debug_logger.log_info(f"TLS error detected, using exponential backoff: {backoff_time}s") |
| | await asyncio.sleep(backoff_time) |
| |
|
| | |
| | if consecutive_errors >= max_consecutive_errors: |
| | raise Exception(f"Too many consecutive errors ({consecutive_errors}) while polling cameo status: {error_msg}") |
| |
|
| | |
| | continue |
| |
|
| | raise Exception(f"Cameo processing timeout after {timeout} seconds") |
| |
|