Spaces:
Running
Running
| import os | |
| import re | |
| import uuid | |
| import zipfile | |
| import subprocess | |
| import gradio as gr | |
| from tqdm import tqdm | |
| from datetime import datetime | |
| from pydub import AudioSegment | |
| from moviepy import VideoFileClip | |
| from PIL import Image | |
| # ----------------------- Internationalization ----------------------- | |
| from i18n import en, ru, es, fr, de, pt_BR, it, zh_CN, ja, ko, ar, hi, tr | |
| i18n = gr.I18n(en, ru, es, fr, de, pt_BR, it, zh_CN, ja, ko, ar, hi, tr) | |
| # ----------------------- FFmpeg utils ----------------------- | |
| def _run_ffmpeg(args): | |
| try: | |
| res = subprocess.run(["ffmpeg", *args], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) | |
| return res.stdout | |
| except Exception: | |
| return None | |
| def ffmpeg_writable_formats(): | |
| """ | |
| Returns a set of FFmpeg format names (including aliases) available for writing (E flag). | |
| Example: {'mp4', 'mov', 'm4a', '3gp', 'matroska', 'webm', ...} | |
| """ | |
| out = _run_ffmpeg(["-hide_banner", "-v", "error", "-formats"]) | |
| if not out: | |
| return set() | |
| fmts = set() | |
| for line in out.splitlines(): | |
| # lines look like: " DE matroska,webm Matroska / WebM" | |
| if re.match(r"^\s*[D\s]*E\s+", line): | |
| m = re.search(r"^\s*[D\s]*E\s+([^\s]+)", line) | |
| if not m: | |
| continue | |
| names = m.group(1) | |
| for name in names.split(","): | |
| fmts.add(name.strip()) | |
| return fmts | |
| def ffmpeg_audio_encoders(): | |
| """ | |
| Returns a set of available audio encoders, e.g. {'aac','libmp3lame','libopus',...} | |
| """ | |
| out = _run_ffmpeg(["-hide_banner", "-v", "error", "-encoders"]) | |
| if not out: | |
| return set() | |
| enc = set() | |
| for line in out.splitlines(): | |
| # lines look like: " A..... libmp3lame MP3 (MPEG audio layer 3) (codec mp3)" | |
| m = re.match(r"^\s*A\S*\s+([^\s]+)", line) | |
| if m: | |
| enc.add(m.group(1).strip()) | |
| return enc | |
| # Extension -> FFmpeg container mapping (curated, common ones) | |
| AUDIO_EXT_TO_FFMPEG_FORMAT = { | |
| "mp3": "mp3", | |
| "wav": "wav", | |
| "w64": "w64", | |
| "flac": "flac", | |
| "ogg": "ogg", | |
| "oga": "ogg", | |
| "opus": "ogg", # ogg container; needs libopus | |
| "spx": "ogg", # ogg container; needs libspeex | |
| "aac": "adts", | |
| "m4a": "mp4", | |
| "m4b": "mp4", | |
| "m4r": "mp4", | |
| "ac3": "ac3", | |
| "aiff": "aiff", | |
| "aif": "aiff", | |
| "aifc": "aiff", | |
| "caf": "caf", | |
| "au": "au", | |
| "amr": "amr", | |
| "dts": "dts", | |
| "mp2": "mp2", | |
| "wma": "asf", | |
| "wv": "wv", | |
| "mka": "matroska", | |
| } | |
| # Some extensions require specific encoders | |
| AUDIO_REQUIRED_CODECS = { | |
| "mp3": ["libmp3lame"], | |
| "opus": ["libopus"], | |
| "spx": ["libspeex"], | |
| # others rely on FFmpeg defaults | |
| } | |
| VIDEO_EXT_TO_FFMPEG_FORMAT = { | |
| "mp4": "mp4", | |
| "m4v": "mp4", | |
| "mov": "mov", | |
| "avi": "avi", | |
| "mkv": "matroska", | |
| "webm": "webm", | |
| "flv": "flv", | |
| "ogv": "ogg", | |
| "mpeg": "mpeg", | |
| "mpg": "mpeg", | |
| "ts": "mpegts", | |
| "m2ts": "mpegts", | |
| "mxf": "mxf", | |
| "3gp": "3gp", | |
| "3g2": "3g2", | |
| "asf": "asf", | |
| "wmv": "asf", | |
| "vob": "vob", | |
| } | |
| def available_audio_extensions(): | |
| writable = ffmpeg_writable_formats() | |
| encoders = ffmpeg_audio_encoders() | |
| exts = [] | |
| for ext, ffmt in AUDIO_EXT_TO_FFMPEG_FORMAT.items(): | |
| if ffmt not in writable: | |
| continue | |
| req = AUDIO_REQUIRED_CODECS.get(ext) | |
| if req and not any(r in encoders for r in req): | |
| continue | |
| exts.append(ext) | |
| # fallback if ffmpeg is missing or query failed | |
| if not exts: | |
| exts = ["mp3", "wav", "flac", "ogg", "aac", "m4a", "aiff", "wma", "opus"] | |
| return sorted(set(exts)) | |
| def available_video_extensions(): | |
| writable = ffmpeg_writable_formats() | |
| exts = [ext for ext, ffmt in VIDEO_EXT_TO_FFMPEG_FORMAT.items() if ffmt in writable] | |
| if not exts: | |
| exts = ["mp4", "mkv", "avi", "mov", "webm", "flv", "mpeg", "mpg", "ts"] | |
| return sorted(set(exts)) | |
| # ----------------------- Pillow utils (images) ----------------------- | |
| def available_image_extensions(): | |
| # All registered extensions Pillow knows how to save (best-effort) | |
| ext2fmt = Image.registered_extensions() # {".jpg":"JPEG", ...} | |
| save_ok = set(getattr(Image, "SAVE", {}).keys()) or set() | |
| if not save_ok: | |
| # If SAVE registry is unavailable, assume registered formats are savable | |
| save_ok = set(ext2fmt.values()) | |
| exts = [] | |
| for ext, fmt in ext2fmt.items(): | |
| if fmt in save_ok: | |
| e = ext.lstrip(".").lower() | |
| exts.append(e) | |
| if not exts: | |
| exts = ["png", "jpg", "jpeg", "webp", "bmp", "tiff", "gif", "ico", "ppm", "pgm", "pbm", "pnm", "tga", "xbm", "xpm", "pdf", "eps"] | |
| return sorted(set(exts)) | |
| def pil_format_for_ext(ext): | |
| ext = ext.lower().strip(".") | |
| for k, v in Image.registered_extensions().items(): | |
| if k.lstrip(".").lower() == ext: | |
| return v | |
| fallback = { | |
| "jpg": "JPEG", | |
| "jpeg": "JPEG", | |
| "png": "PNG", | |
| "webp": "WEBP", | |
| "bmp": "BMP", | |
| "tiff": "TIFF", | |
| "tif": "TIFF", | |
| "gif": "GIF", | |
| "ico": "ICO", | |
| "ppm": "PPM", | |
| "pgm": "PPM", | |
| "pbm": "PPM", | |
| "pnm": "PPM", | |
| "tga": "TGA", | |
| "xbm": "XBM", | |
| "xpm": "XPM", | |
| "pdf": "PDF", | |
| "eps": "EPS", | |
| } | |
| return fallback.get(ext, None) | |
| # ---------- AUDIO PROCESSING ---------- | |
| def convert_audio(input_files, output_ext, session_id, merge_files, gap_duration): | |
| """Convert/merge audio into the selected format (by extension).""" | |
| output_files = [] | |
| merged_audio = AudioSegment.silent(duration=0) | |
| os.makedirs(session_id, exist_ok=True) | |
| ff_format = AUDIO_EXT_TO_FFMPEG_FORMAT.get(output_ext, output_ext) | |
| codec = None | |
| if output_ext == "opus": | |
| codec = "libopus" | |
| elif output_ext == "spx": | |
| codec = "libspeex" | |
| for input_file in tqdm(input_files, desc="Converting audio files"): | |
| file_path = input_file if isinstance(input_file, str) else input_file.name | |
| audio = AudioSegment.from_file(file_path) | |
| base_name = os.path.splitext(os.path.basename(file_path))[0] | |
| output_filename = f"{base_name}.{output_ext}" | |
| output_path = os.path.join(session_id, output_filename) | |
| audio.export(output_path, format=ff_format, codec=codec) | |
| if merge_files: | |
| merged_audio += audio + AudioSegment.silent(duration=gap_duration) | |
| else: | |
| output_files.append(output_path) | |
| if merge_files: | |
| merged_output_path = os.path.join(session_id, f"merged_output.{output_ext}") | |
| merged_audio.export(merged_output_path, format=ff_format, codec=codec) | |
| return [merged_output_path] | |
| return output_files | |
| # ---------- IMAGE PROCESSING ---------- | |
| def convert_images(input_files, output_ext, session_id): | |
| """Simple image format conversion.""" | |
| os.makedirs(session_id, exist_ok=True) | |
| output_files = [] | |
| pil_fmt = pil_format_for_ext(output_ext) | |
| if not pil_fmt: | |
| raise gr.Error(f"Pillow cannot save to format: {output_ext}") | |
| for input_file in tqdm(input_files, desc="Converting images"): | |
| file_path = input_file if isinstance(input_file, str) else input_file.name | |
| base_name = os.path.splitext(os.path.basename(file_path))[0] | |
| output_filename = f"{base_name}.{output_ext}" | |
| output_path = os.path.join(session_id, output_filename) | |
| with Image.open(file_path) as img: | |
| img.load() | |
| # For JPEG ensure RGB mode | |
| if pil_fmt.upper() == "JPEG": | |
| img = img.convert("RGB") | |
| img.save(output_path, format=pil_fmt) | |
| output_files.append(output_path) | |
| return output_files | |
| # ---------- ZIP CREATION ---------- | |
| def create_zip(files_to_zip, session_id): | |
| zip_filename = f"{session_id}.zip" | |
| with zipfile.ZipFile(zip_filename, 'w') as zipf: | |
| for file in tqdm(files_to_zip, desc="Creating ZIP archive"): | |
| zipf.write(file, os.path.basename(file)) | |
| return zip_filename | |
| # ---------- AUDIO HANDLER ---------- | |
| def process_audio_files(files, output_ext, merge_files, gap_duration, progress=gr.Progress(track_tqdm=True)): | |
| if not files: | |
| raise gr.Error("Please upload at least one audio file!") | |
| session_id = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + "_" + str(uuid.uuid4())[:8] | |
| print(f"\nStarting audio session: {session_id}") | |
| print(f"Files to convert: {len(files)} to .{output_ext}") | |
| output_files = convert_audio(files, output_ext, session_id, merge_files, gap_duration) | |
| if len(output_files) > 1: | |
| print("Creating ZIP archive...") | |
| zip_filename = create_zip(output_files, session_id) | |
| return zip_filename | |
| return output_files[0] | |
| # ---------- IMAGE HANDLER ---------- | |
| def process_image_files(files, output_ext, progress=gr.Progress(track_tqdm=True)): | |
| if not files: | |
| raise gr.Error("Please upload at least one image!") | |
| session_id = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + "_" + str(uuid.uuid4())[:8] | |
| print(f"\nStarting image session: {session_id}") | |
| print(f"Files to convert: {len(files)} to .{output_ext}") | |
| output_files = convert_images(files, output_ext, session_id) | |
| if len(output_files) > 1: | |
| print("Creating ZIP archive...") | |
| zip_filename = create_zip(output_files, session_id) | |
| return zip_filename | |
| return output_files[0] | |
| # ---------- VIDEO HANDLER ---------- | |
| def process_video(input_video, conversion_type, output_ext, progress=gr.Progress(track_tqdm=True)): | |
| if not input_video: | |
| raise gr.Error("Please upload a video file!") | |
| session_id = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + "_" + str(uuid.uuid4())[:8] | |
| os.makedirs(session_id, exist_ok=True) | |
| input_path = input_video if isinstance(input_video, str) else input_video.name | |
| base_name = os.path.splitext(os.path.basename(input_path))[0] | |
| output_filename = f"{base_name}_converted.{output_ext}" | |
| output_path = os.path.join(session_id, output_filename) | |
| print(f"\nStarting video session: {session_id}") | |
| print(f"Conversion type: {conversion_type}, Output: .{output_ext}") | |
| try: | |
| clip = VideoFileClip(input_path) | |
| if conversion_type == "Video to Video": | |
| # Let MoviePy use its defaults; for some containers (e.g. webm) this may require specific codecs | |
| clip.write_videofile(output_path, logger=None) | |
| elif conversion_type == "Video to Audio": | |
| if clip.audio is None: | |
| raise gr.Error("The uploaded video does not contain an audio track.") | |
| audio_clip = clip.audio | |
| audio_codec = None | |
| if output_ext == "opus": | |
| if "libopus" in ffmpeg_audio_encoders(): | |
| audio_codec = "libopus" | |
| elif output_ext == "spx": | |
| if "libspeex" in ffmpeg_audio_encoders(): | |
| audio_codec = "libspeex" | |
| audio_clip.write_audiofile(output_path, logger=None, codec=audio_codec) | |
| audio_clip.close() | |
| clip.close() | |
| except Exception as e: | |
| print(f"An error occurred: {e}") | |
| raise gr.Error(f"Processing error: {e}") | |
| print("Video processing complete!") | |
| return output_path | |
| # ---------- FORMAT CHOICES ---------- | |
| def update_format_choices(conversion_type): | |
| if conversion_type == "Video to Video": | |
| vf = available_video_extensions() | |
| value = "mp4" if "mp4" in vf else (vf[0] if vf else None) | |
| return gr.Dropdown(choices=vf, value=value, label="Output Video Format") | |
| else: | |
| af = available_audio_extensions() | |
| value = "mp3" if "mp3" in af else (af[0] if af else None) | |
| return gr.Dropdown(choices=af, value=value, label="Output Audio Format") | |
| # ---------- UI ---------- | |
| AUDIO_FORMATS = available_audio_extensions() | |
| VIDEO_FORMATS = available_video_extensions() | |
| IMAGE_FORMATS = available_image_extensions() | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.HTML(i18n("title_app")) | |
| with gr.Tabs(): | |
| # AUDIO TAB | |
| with gr.TabItem(i18n("audio_tab")): | |
| gr.HTML(i18n("audio_description")) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| audio_file_input = gr.Files(label=i18n("label_audio_file_input"), file_types=["audio"], height=160) | |
| with gr.Column(scale=1): | |
| default_audio = "mp3" if "mp3" in AUDIO_FORMATS else (AUDIO_FORMATS[0] if AUDIO_FORMATS else None) | |
| audio_format_choice = gr.Dropdown(choices=AUDIO_FORMATS, label=i18n("label_audio_format_choice"), value=default_audio) | |
| merge_files_checkbox = gr.Checkbox(label=i18n("label_merge_files_checkbox")) | |
| gap_slider = gr.Slider(minimum=0, maximum=5000, step=100, value=500, label=i18n("label_gap_slider")) | |
| audio_submit_button = gr.Button(i18n("сonvert"), variant="primary") | |
| audio_output_file = gr.File(label=i18n("download_result")) | |
| audio_submit_button.click( | |
| fn=process_audio_files, | |
| inputs=[audio_file_input, audio_format_choice, merge_files_checkbox, gap_slider], | |
| outputs=audio_output_file | |
| ) | |
| # IMAGE TAB | |
| with gr.TabItem(i18n("image_tab")): | |
| gr.HTML(i18n("image_description")) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| image_file_input = gr.Files(label=i18n("label_image_file_input"), file_types=["image"], height=160) | |
| with gr.Column(scale=1): | |
| default_image = "png" if "png" in IMAGE_FORMATS else (IMAGE_FORMATS[0] if IMAGE_FORMATS else None) | |
| image_format_choice = gr.Dropdown(choices=IMAGE_FORMATS, label=i18n("label_image_format_choice"), value=default_image) | |
| image_submit_button = gr.Button(i18n("сonvert"), variant="primary") | |
| image_output_file = gr.File(label=i18n("download_result")) | |
| image_submit_button.click( | |
| fn=process_image_files, | |
| inputs=[image_file_input, image_format_choice], | |
| outputs=image_output_file | |
| ) | |
| # VIDEO TAB | |
| with gr.TabItem(i18n("video_tab")): | |
| gr.HTML(i18n("video_description")) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| video_input = gr.File(label=i18n("label_video_input"), file_types=["video"], height=160) | |
| with gr.Column(scale=1): | |
| conversion_type_radio = gr.Radio( | |
| choices=["Video to Video", "Video to Audio"], | |
| label=i18n("label_conversion_type_radio"), | |
| value="Video to Video" | |
| ) | |
| default_video = "mp4" if "mp4" in VIDEO_FORMATS else (VIDEO_FORMATS[0] if VIDEO_FORMATS else None) | |
| video_format_dropdown = gr.Dropdown( | |
| choices=VIDEO_FORMATS, | |
| label=i18n("label_video_format_dropdown"), | |
| value=default_video | |
| ) | |
| video_submit_button = gr.Button(i18n("сonvert"), variant="primary") | |
| video_output_file = gr.File(label=i18n("download_result")) | |
| conversion_type_radio.change( | |
| fn=update_format_choices, | |
| inputs=conversion_type_radio, | |
| outputs=video_format_dropdown | |
| ) | |
| video_submit_button.click( | |
| fn=process_video, | |
| inputs=[video_input, conversion_type_radio, video_format_dropdown], | |
| outputs=video_output_file | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(i18n=i18n, debug=True) | |