import os import re import uuid import zipfile import subprocess import gradio as gr from tqdm import tqdm from datetime import datetime from concurrent.futures import ThreadPoolExecutor import multiprocessing from PIL import Image # ----------------------- Internationalization ----------------------- from i18n_local import en, ru, es, fr, de, it, ja, ko, ar, hi, tr i18n = gr.I18n( en=en, ru=ru, es=es, fr=fr, de=de, it=it, ja=ja, ko=ko, ar=ar, hi=hi, tr=tr ) # ----------------------- HuggingFace Spaces Settings ----------------------- MAX_WORKERS = min(2, multiprocessing.cpu_count()) # ----------------------- FFmpeg utils ----------------------- def _run_ffmpeg(args): try: res = subprocess.run(["ffmpeg", *args], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) return res.stdout except Exception: return None def ffmpeg_writable_formats(): """Returns a set of FFmpeg format names (including aliases) available for writing (E flag).""" out = _run_ffmpeg(["-hide_banner", "-v", "error", "-formats"]) if not out: return set() fmts = set() for line in out.splitlines(): if re.match(r"^\s*[D\s]*E\s+", line): m = re.search(r"^\s*[D\s]*E\s+([^\s]+)", line) if not m: continue names = m.group(1) for name in names.split(","): fmts.add(name.strip()) return fmts def ffmpeg_audio_encoders(): """Returns a set of available audio encoders.""" out = _run_ffmpeg(["-hide_banner", "-v", "error", "-encoders"]) if not out: return set() enc = set() for line in out.splitlines(): m = re.match(r"^\s*A\S*\s+([^\s]+)", line) if m: enc.add(m.group(1).strip()) return enc # Extension -> FFmpeg container mapping (full list) AUDIO_EXT_TO_FFMPEG_FORMAT = { "mp3": "mp3", "wav": "wav", "w64": "w64", "flac": "flac", "ogg": "ogg", "oga": "ogg", "opus": "ogg", "spx": "ogg", "aac": "adts", "m4a": "mp4", "m4b": "mp4", "m4r": "mp4", "ac3": "ac3", "aiff": "aiff", "aif": "aiff", "aifc": "aiff", "caf": "caf", "au": "au", "amr": "amr", "dts": "dts", "mp2": "mp2", "wma": "asf", "wv": "wv", "mka": "matroska", } AUDIO_REQUIRED_CODECS = { "mp3": ["libmp3lame"], "opus": ["libopus"], "spx": ["libspeex"], } VIDEO_EXT_TO_FFMPEG_FORMAT = { "mp4": "mp4", "m4v": "mp4", "mov": "mov", "avi": "avi", "mkv": "matroska", "webm": "webm", "flv": "flv", "ogv": "ogg", "mpeg": "mpeg", "mpg": "mpeg", "ts": "mpegts", "m2ts": "mpegts", "mxf": "mxf", "3gp": "3gp", "3g2": "3g2", "asf": "asf", "wmv": "asf", "vob": "vob", } def available_audio_extensions(): writable = ffmpeg_writable_formats() encoders = ffmpeg_audio_encoders() exts = [] for ext, ffmt in AUDIO_EXT_TO_FFMPEG_FORMAT.items(): if ffmt not in writable: continue req = AUDIO_REQUIRED_CODECS.get(ext) if req and not any(r in encoders for r in req): continue exts.append(ext) if not exts: exts = ["mp3", "wav", "flac", "ogg", "aac", "m4a", "aiff", "wma", "opus"] return sorted(set(exts)) def available_video_extensions(): writable = ffmpeg_writable_formats() exts = [ext for ext, ffmt in VIDEO_EXT_TO_FFMPEG_FORMAT.items() if ffmt in writable] if not exts: exts = ["mp4", "mkv", "avi", "mov", "webm", "flv", "mpeg", "mpg", "ts"] return sorted(set(exts)) def available_image_extensions(): ext2fmt = Image.registered_extensions() save_ok = set(getattr(Image, "SAVE", {}).keys()) or set() if not save_ok: save_ok = set(ext2fmt.values()) exts = [] for ext, fmt in ext2fmt.items(): if fmt in save_ok: e = ext.lstrip(".").lower() exts.append(e) if not exts: exts = ["png", "jpg", "jpeg", "webp", "bmp", "tiff", "gif", "ico", "ppm", "pgm", "pbm", "pnm", "tga", "xbm", "xpm", "pdf", "eps"] return sorted(set(exts)) def pil_format_for_ext(ext): ext = ext.lower().strip(".") for k, v in Image.registered_extensions().items(): if k.lstrip(".").lower() == ext: return v fallback = { "jpg": "JPEG", "jpeg": "JPEG", "png": "PNG", "webp": "WEBP", "bmp": "BMP", "tiff": "TIFF", "tif": "TIFF", "gif": "GIF", "ico": "ICO", "ppm": "PPM", "pgm": "PPM", "pbm": "PPM", "pnm": "PPM", "tga": "TGA", "xbm": "XBM", "xpm": "XPM", "pdf": "PDF", "eps": "EPS", } return fallback.get(ext, None) # ----------------------- Optimized FFmpeg Processing ----------------------- def run_ffmpeg_with_progress(params): """Run FFmpeg with minimal output""" try: process = subprocess.Popen( ['ffmpeg'] + params, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, bufsize=1 ) output = [] for line in process.stdout: output.append(line) process.wait() if process.returncode != 0: error_output = ''.join(output[-20:]) # Only last 20 lines for debugging if error print(f"FFmpeg error: {error_output}") return False return True except Exception as e: print(f"FFmpeg exception: {e}") return False def get_optimal_ffmpeg_params(input_file, output_file, conversion_type="audio"): """Get optimal FFmpeg parameters for conversion""" params = [] params.extend(['-hide_banner', '-y', '-loglevel', 'error']) # Minimize output params.extend(['-i', input_file]) ext = os.path.splitext(output_file)[1].lower()[1:] if conversion_type == "audio": ff_format = AUDIO_EXT_TO_FFMPEG_FORMAT.get(ext, ext) if ext == 'mp3': params.extend(['-codec:a', 'libmp3lame', '-q:a', '2']) elif ext == 'aac' or ext == 'm4a': params.extend(['-codec:a', 'aac', '-b:a', '192k']) elif ext == 'opus': encoders = ffmpeg_audio_encoders() if 'libopus' in encoders: params.extend(['-codec:a', 'libopus', '-b:a', '128k']) else: params.extend(['-codec:a', 'libvorbis', '-b:a', '128k']) elif ext == 'spx': if 'libspeex' in ffmpeg_audio_encoders(): params.extend(['-codec:a', 'libspeex']) elif ext == 'flac': params.extend(['-codec:a', 'flac', '-compression_level', '5']) elif ext == 'wav': params.extend(['-codec:a', 'pcm_s16le']) if ff_format != ext: params.extend(['-f', ff_format]) params.extend(['-threads', str(MAX_WORKERS)]) elif conversion_type == "video": if ext in ['mp4', 'm4v', 'mov']: params.extend(['-codec:v', 'libx264']) params.extend(['-preset', 'veryfast']) params.extend(['-crf', '23']) params.extend(['-codec:a', 'aac', '-b:a', '192k']) params.extend(['-movflags', '+faststart']) elif ext == 'webm': params.extend(['-codec:v', 'libvpx-vp9']) params.extend(['-crf', '30', '-b:v', '0']) params.extend(['-codec:a', 'libopus', '-b:a', '128k']) elif ext == 'avi': params.extend(['-codec:v', 'libx264']) params.extend(['-preset', 'veryfast']) params.extend(['-crf', '23']) params.extend(['-codec:a', 'mp3', '-b:a', '192k']) elif ext == 'mkv': params.extend(['-codec:v', 'libx264']) params.extend(['-preset', 'veryfast']) params.extend(['-crf', '23']) params.extend(['-codec:a', 'aac', '-b:a', '192k']) elif ext == 'flv': params.extend(['-codec:v', 'flv1']) params.extend(['-codec:a', 'mp3', '-ar', '44100']) else: params.extend(['-codec:v', 'libx264']) params.extend(['-preset', 'veryfast']) params.extend(['-crf', '23']) params.extend(['-codec:a', 'copy']) params.extend(['-threads', str(MAX_WORKERS)]) elif conversion_type == "video_to_audio": params.extend(['-vn']) ff_format = AUDIO_EXT_TO_FFMPEG_FORMAT.get(ext, ext) if ext == 'mp3': params.extend(['-codec:a', 'libmp3lame', '-q:a', '2']) elif ext == 'aac' or ext == 'm4a': params.extend(['-codec:a', 'aac', '-b:a', '192k']) elif ext == 'flac': params.extend(['-codec:a', 'flac', '-compression_level', '5']) elif ext == 'wav': params.extend(['-codec:a', 'pcm_s16le']) elif ext == 'opus': encoders = ffmpeg_audio_encoders() if 'libopus' in encoders: params.extend(['-codec:a', 'libopus', '-b:a', '128k']) else: params.extend(['-codec:a', 'libvorbis', '-b:a', '128k']) else: params.extend(['-codec:a', 'copy']) if ff_format != ext: params.extend(['-f', ff_format]) params.extend(['-threads', str(MAX_WORKERS)]) params.append(output_file) return params def convert_audio_ffmpeg(input_file, output_file, output_ext): """Fast audio conversion via direct FFmpeg call""" if not os.path.exists(input_file): return False output_dir = os.path.dirname(output_file) if output_dir and not os.path.exists(output_dir): os.makedirs(output_dir, exist_ok=True) params = get_optimal_ffmpeg_params(input_file, output_file, "audio") return run_ffmpeg_with_progress(params) def convert_video_ffmpeg(input_file, output_file, conversion_type): """Fast video conversion via direct FFmpeg call""" conv_type = "video_to_audio" if conversion_type == "Video to Audio" else "video" if not os.path.exists(input_file): return False output_dir = os.path.dirname(output_file) if output_dir and not os.path.exists(output_dir): os.makedirs(output_dir, exist_ok=True) params = get_optimal_ffmpeg_params(input_file, output_file, conv_type) return run_ffmpeg_with_progress(params) def merge_audio_files_ffmpeg(input_files, output_file, gap_duration): """Fast audio merging via FFmpeg""" list_file = f"concat_{uuid.uuid4().hex}.txt" try: with open(list_file, 'w') as f: for file in input_files: f.write(f"file '{os.path.abspath(file)}'\n") params = [ '-hide_banner', '-loglevel', 'error', '-f', 'concat', '-safe', '0', '-i', list_file, '-c', 'copy', '-threads', str(MAX_WORKERS), output_file ] return run_ffmpeg_with_progress(params) finally: if os.path.exists(list_file): os.remove(list_file) def convert_image_pillow(input_file, output_path, output_ext): """Image conversion with maximum quality""" try: pil_format = pil_format_for_ext(output_ext) if not pil_format: return False with Image.open(input_file) as img: # Preserve original mode when possible original_mode = img.mode if pil_format == "JPEG": # JPEG doesn't support transparency if original_mode in ('RGBA', 'LA', 'P'): if original_mode == 'P': img = img.convert('RGBA') background = Image.new('RGB', img.size, (255, 255, 255)) if img.mode in ('RGBA', 'LA'): background.paste(img, mask=img.split()[-1]) img = background else: img = img.convert("RGB") # Maximum quality: quality=100, no subsampling img.save(output_path, format=pil_format, quality=100, subsampling=0) elif pil_format == "PNG": # Minimal compression for maximum quality img.save(output_path, format=pil_format, compress_level=1) elif pil_format == "WEBP": # WebP lossless mode for maximum quality img.save(output_path, format=pil_format, lossless=True, quality=100, method=6) elif pil_format == "ICO": # ICO - preserve original size and convert to RGBA if original_mode not in ('RGBA', 'RGB'): img = img.convert('RGBA') # Save with original dimensions img.save(output_path, format=pil_format, sizes=[(img.width, img.height)]) elif pil_format == "BMP": # BMP doesn't support transparency if original_mode in ('RGBA', 'LA', 'P'): if original_mode == 'P': img = img.convert('RGBA') if img.mode in ('RGBA', 'LA'): background = Image.new('RGB', img.size, (255, 255, 255)) background.paste(img, mask=img.split()[-1]) img = background else: img = img.convert('RGB') else: img = img.convert('RGB') img.save(output_path, format=pil_format) elif pil_format == "TIFF": # TIFF without compression for maximum quality img.save(output_path, format=pil_format, compression=None) elif pil_format == "GIF": # GIF with high quality palette if original_mode == 'RGBA': # Convert with adaptive palette for best quality alpha = img.split()[-1] img = img.convert('RGB').convert('P', palette=Image.ADAPTIVE, colors=255) # Add transparency img.paste(255, alpha.point(lambda x: 0 if x < 128 else 255)) img.info['transparency'] = 255 elif original_mode != 'P': img = img.convert('P', palette=Image.ADAPTIVE, colors=256) img.save(output_path, format=pil_format, optimize=False) elif pil_format in ("PPM", "PGM", "PBM"): # PPM formats - raw format for maximum quality if pil_format == "PPM": img = img.convert('RGB') elif pil_format == "PGM": img = img.convert('L') elif pil_format == "PBM": img = img.convert('1') img.save(output_path, format="PPM") elif pil_format == "TGA": # TGA with RLE disabled for maximum quality if original_mode not in ('RGBA', 'RGB'): img = img.convert('RGBA') img.save(output_path, format=pil_format, compression=None) else: # For all other formats, try to save with maximum quality try: img.save(output_path, format=pil_format, quality=100) except TypeError: # If quality parameter not supported, save without it img.save(output_path, format=pil_format) return True except Exception as e: print(f"Image conversion error: {e}") return False def process_file_parallel(args): """Function for parallel file processing""" input_file, output_path, file_type, output_ext = args try: if file_type == "audio": success = convert_audio_ffmpeg(input_file, output_path, output_ext) elif file_type == "image": success = convert_image_pillow(input_file, output_path, output_ext) else: success = False return output_path if success else None except Exception: return None def create_zip(files_to_zip, session_id): """Create ZIP archive""" zip_filename = f"{session_id}.zip" with zipfile.ZipFile(zip_filename, 'w', compression=zipfile.ZIP_DEFLATED, compresslevel=1) as zipf: for file in tqdm(files_to_zip, desc="Creating ZIP"): zipf.write(file, os.path.basename(file)) return zip_filename # ----------------------- Main Processing Functions ----------------------- def process_audio_files(files, output_ext, merge_files, gap_duration, progress=gr.Progress(track_tqdm=True)): """Process audio files""" if not files: raise gr.Error("Please upload at least one audio file!") session_id = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}" os.makedirs(session_id, exist_ok=True) print(f"\nStarting audio session: {session_id}") print(f"Files to convert: {len(files)} to .{output_ext}") file_paths = [f if isinstance(f, str) else f.name for f in files] if merge_files: merged_output_path = os.path.join(session_id, f"merged.{output_ext}") temp_files = [] for i, file_path in enumerate(tqdm(file_paths, desc="Converting")): temp_output = os.path.join(session_id, f"temp_{i}.{output_ext}") if convert_audio_ffmpeg(file_path, temp_output, output_ext): temp_files.append(temp_output) if not temp_files: raise gr.Error("No files were successfully converted") if merge_audio_files_ffmpeg(temp_files, merged_output_path, gap_duration): for temp_file in temp_files: try: os.remove(temp_file) except: pass return merged_output_path else: raise gr.Error("Failed to merge audio files") else: output_files = [] with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: tasks = [] for file_path in file_paths: base_name = os.path.splitext(os.path.basename(file_path))[0] output_path = os.path.join(session_id, f"{base_name}.{output_ext}") tasks.append((file_path, output_path, "audio", output_ext)) results = list(tqdm( executor.map(process_file_parallel, tasks), total=len(tasks), desc="Converting" )) output_files = [r for r in results if r is not None] if not output_files: raise gr.Error("No files were successfully converted") if len(output_files) > 1: return create_zip(output_files, session_id) return output_files[0] def process_image_files(files, output_ext, progress=gr.Progress(track_tqdm=True)): """Process image files""" if not files: raise gr.Error("Please upload at least one image!") session_id = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}" os.makedirs(session_id, exist_ok=True) print(f"\nStarting image session: {session_id}") print(f"Files to convert: {len(files)} to .{output_ext}") file_paths = [f if isinstance(f, str) else f.name for f in files] output_files = [] with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: tasks = [] for file_path in file_paths: base_name = os.path.splitext(os.path.basename(file_path))[0] output_path = os.path.join(session_id, f"{base_name}.{output_ext}") tasks.append((file_path, output_path, "image", output_ext)) results = list(tqdm( executor.map(process_file_parallel, tasks), total=len(tasks), desc="Converting" )) output_files = [r for r in results if r is not None] if not output_files: raise gr.Error("No images were successfully converted") if len(output_files) > 1: return create_zip(output_files, session_id) return output_files[0] def process_video(input_video, conversion_type, output_ext, progress=gr.Progress(track_tqdm=True)): """Process video file""" if not input_video: raise gr.Error("Please upload a video file!") session_id = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}" os.makedirs(session_id, exist_ok=True) input_path = input_video if isinstance(input_video, str) else input_video.name base_name = os.path.splitext(os.path.basename(input_path))[0] output_path = os.path.join(session_id, f"{base_name}.{output_ext}") print(f"\nStarting video session: {session_id}") print(f"Conversion type: {conversion_type}, Output: .{output_ext}") progress(0.5, "Processing...") success = convert_video_ffmpeg(input_path, output_path, conversion_type) if success: return output_path else: raise gr.Error("Video processing failed. Please check the file format and try again.") def update_format_choices(conversion_type): """Update format dropdown based on conversion type""" if conversion_type == "Video to Video": vf = available_video_extensions() value = "mp4" if "mp4" in vf else (vf[0] if vf else None) return gr.Dropdown(choices=vf, value=value, label="Output Video Format") else: af = available_audio_extensions() value = "mp3" if "mp3" in af else (af[0] if af else None) return gr.Dropdown(choices=af, value=value, label="Output Audio Format") # ----------------------- UI ----------------------- AUDIO_FORMATS = available_audio_extensions() VIDEO_FORMATS = available_video_extensions() IMAGE_FORMATS = available_image_extensions() with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.HTML(i18n("title_app")) with gr.Tabs(): # AUDIO TAB with gr.TabItem(i18n("audio_tab")): gr.HTML(i18n("audio_description")) with gr.Row(): with gr.Column(scale=2): audio_file_input = gr.Files(label=i18n("label_audio_file_input"), file_types=["audio"], height=160) with gr.Column(scale=1): default_audio = "mp3" if "mp3" in AUDIO_FORMATS else (AUDIO_FORMATS[0] if AUDIO_FORMATS else None) audio_format_choice = gr.Dropdown(choices=AUDIO_FORMATS, label=i18n("label_audio_format_choice"), value=default_audio) merge_files_checkbox = gr.Checkbox(label=i18n("label_merge_files_checkbox")) gap_slider = gr.Slider(minimum=0, maximum=5000, step=100, value=500, label=i18n("label_gap_slider"), visible=False) audio_submit_button = gr.Button(i18n("сonvert"), variant="primary") audio_output_file = gr.File(label=i18n("download_result")) merge_files_checkbox.change( lambda x: gr.update(visible=x), inputs=merge_files_checkbox, outputs=gap_slider ) audio_submit_button.click( fn=process_audio_files, inputs=[audio_file_input, audio_format_choice, merge_files_checkbox, gap_slider], outputs=audio_output_file ) # IMAGE TAB with gr.TabItem(i18n("image_tab")): gr.HTML(i18n("image_description")) with gr.Row(): with gr.Column(scale=2): image_file_input = gr.Files(label=i18n("label_image_file_input"), file_types=["image"], height=160) with gr.Column(scale=1): default_image = "png" if "png" in IMAGE_FORMATS else (IMAGE_FORMATS[0] if IMAGE_FORMATS else None) image_format_choice = gr.Dropdown(choices=IMAGE_FORMATS, label=i18n("label_image_format_choice"), value=default_image) image_submit_button = gr.Button(i18n("сonvert"), variant="primary") image_output_file = gr.File(label=i18n("download_result")) image_submit_button.click( fn=process_image_files, inputs=[image_file_input, image_format_choice], outputs=image_output_file ) # VIDEO TAB with gr.TabItem(i18n("video_tab")): gr.HTML(i18n("video_description")) with gr.Row(): with gr.Column(scale=2): video_input = gr.File(label=i18n("label_video_input"), file_types=["video"], height=160) with gr.Column(scale=1): conversion_type_radio = gr.Radio( choices=["Video to Video", "Video to Audio"], label=i18n("label_conversion_type_radio"), value="Video to Video" ) default_video = "mp4" if "mp4" in VIDEO_FORMATS else (VIDEO_FORMATS[0] if VIDEO_FORMATS else None) video_format_dropdown = gr.Dropdown( choices=VIDEO_FORMATS, label=i18n("label_video_format_dropdown"), value=default_video ) video_submit_button = gr.Button(i18n("сonvert"), variant="primary") video_output_file = gr.File(label=i18n("download_result")) conversion_type_radio.change( fn=update_format_choices, inputs=conversion_type_radio, outputs=video_format_dropdown ) video_submit_button.click( fn=process_video, inputs=[video_input, conversion_type_radio, video_format_dropdown], outputs=video_output_file ) if __name__ == "__main__": demo.queue().launch(i18n=i18n, debug=True, show_error=True)