Politrees's picture
Update app.py
f1e910e verified
import os
import re
import uuid
import zipfile
import subprocess
import gradio as gr
from tqdm import tqdm
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
import multiprocessing
from PIL import Image
# ----------------------- Internationalization -----------------------
from i18n_local import en, ru, es, fr, de, it, ja, ko, ar, hi, tr
i18n = gr.I18n(
en=en, ru=ru, es=es, fr=fr, de=de, it=it,
ja=ja, ko=ko, ar=ar, hi=hi, tr=tr
)
# ----------------------- HuggingFace Spaces Settings -----------------------
MAX_WORKERS = min(2, multiprocessing.cpu_count())
# ----------------------- FFmpeg utils -----------------------
def _run_ffmpeg(args):
try:
res = subprocess.run(["ffmpeg", *args], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
return res.stdout
except Exception:
return None
def ffmpeg_writable_formats():
"""Returns a set of FFmpeg format names (including aliases) available for writing (E flag)."""
out = _run_ffmpeg(["-hide_banner", "-v", "error", "-formats"])
if not out:
return set()
fmts = set()
for line in out.splitlines():
if re.match(r"^\s*[D\s]*E\s+", line):
m = re.search(r"^\s*[D\s]*E\s+([^\s]+)", line)
if not m:
continue
names = m.group(1)
for name in names.split(","):
fmts.add(name.strip())
return fmts
def ffmpeg_audio_encoders():
"""Returns a set of available audio encoders."""
out = _run_ffmpeg(["-hide_banner", "-v", "error", "-encoders"])
if not out:
return set()
enc = set()
for line in out.splitlines():
m = re.match(r"^\s*A\S*\s+([^\s]+)", line)
if m:
enc.add(m.group(1).strip())
return enc
# Extension -> FFmpeg container mapping (full list)
AUDIO_EXT_TO_FFMPEG_FORMAT = {
"mp3": "mp3",
"wav": "wav",
"w64": "w64",
"flac": "flac",
"ogg": "ogg",
"oga": "ogg",
"opus": "ogg",
"spx": "ogg",
"aac": "adts",
"m4a": "mp4",
"m4b": "mp4",
"m4r": "mp4",
"ac3": "ac3",
"aiff": "aiff",
"aif": "aiff",
"aifc": "aiff",
"caf": "caf",
"au": "au",
"amr": "amr",
"dts": "dts",
"mp2": "mp2",
"wma": "asf",
"wv": "wv",
"mka": "matroska",
}
AUDIO_REQUIRED_CODECS = {
"mp3": ["libmp3lame"],
"opus": ["libopus"],
"spx": ["libspeex"],
}
VIDEO_EXT_TO_FFMPEG_FORMAT = {
"mp4": "mp4",
"m4v": "mp4",
"mov": "mov",
"avi": "avi",
"mkv": "matroska",
"webm": "webm",
"flv": "flv",
"ogv": "ogg",
"mpeg": "mpeg",
"mpg": "mpeg",
"ts": "mpegts",
"m2ts": "mpegts",
"mxf": "mxf",
"3gp": "3gp",
"3g2": "3g2",
"asf": "asf",
"wmv": "asf",
"vob": "vob",
}
def available_audio_extensions():
writable = ffmpeg_writable_formats()
encoders = ffmpeg_audio_encoders()
exts = []
for ext, ffmt in AUDIO_EXT_TO_FFMPEG_FORMAT.items():
if ffmt not in writable:
continue
req = AUDIO_REQUIRED_CODECS.get(ext)
if req and not any(r in encoders for r in req):
continue
exts.append(ext)
if not exts:
exts = ["mp3", "wav", "flac", "ogg", "aac", "m4a", "aiff", "wma", "opus"]
return sorted(set(exts))
def available_video_extensions():
writable = ffmpeg_writable_formats()
exts = [ext for ext, ffmt in VIDEO_EXT_TO_FFMPEG_FORMAT.items() if ffmt in writable]
if not exts:
exts = ["mp4", "mkv", "avi", "mov", "webm", "flv", "mpeg", "mpg", "ts"]
return sorted(set(exts))
def available_image_extensions():
ext2fmt = Image.registered_extensions()
save_ok = set(getattr(Image, "SAVE", {}).keys()) or set()
if not save_ok:
save_ok = set(ext2fmt.values())
exts = []
for ext, fmt in ext2fmt.items():
if fmt in save_ok:
e = ext.lstrip(".").lower()
exts.append(e)
if not exts:
exts = ["png", "jpg", "jpeg", "webp", "bmp", "tiff", "gif", "ico", "ppm", "pgm", "pbm", "pnm", "tga", "xbm", "xpm", "pdf", "eps"]
return sorted(set(exts))
def pil_format_for_ext(ext):
ext = ext.lower().strip(".")
for k, v in Image.registered_extensions().items():
if k.lstrip(".").lower() == ext:
return v
fallback = {
"jpg": "JPEG",
"jpeg": "JPEG",
"png": "PNG",
"webp": "WEBP",
"bmp": "BMP",
"tiff": "TIFF",
"tif": "TIFF",
"gif": "GIF",
"ico": "ICO",
"ppm": "PPM",
"pgm": "PPM",
"pbm": "PPM",
"pnm": "PPM",
"tga": "TGA",
"xbm": "XBM",
"xpm": "XPM",
"pdf": "PDF",
"eps": "EPS",
}
return fallback.get(ext, None)
# ----------------------- Optimized FFmpeg Processing -----------------------
def run_ffmpeg_with_progress(params):
"""Run FFmpeg with minimal output"""
try:
process = subprocess.Popen(
['ffmpeg'] + params,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
bufsize=1
)
output = []
for line in process.stdout:
output.append(line)
process.wait()
if process.returncode != 0:
error_output = ''.join(output[-20:]) # Only last 20 lines for debugging if error
print(f"FFmpeg error: {error_output}")
return False
return True
except Exception as e:
print(f"FFmpeg exception: {e}")
return False
def get_optimal_ffmpeg_params(input_file, output_file, conversion_type="audio"):
"""Get optimal FFmpeg parameters for conversion"""
params = []
params.extend(['-hide_banner', '-y', '-loglevel', 'error']) # Minimize output
params.extend(['-i', input_file])
ext = os.path.splitext(output_file)[1].lower()[1:]
if conversion_type == "audio":
ff_format = AUDIO_EXT_TO_FFMPEG_FORMAT.get(ext, ext)
if ext == 'mp3':
params.extend(['-codec:a', 'libmp3lame', '-q:a', '2'])
elif ext == 'aac' or ext == 'm4a':
params.extend(['-codec:a', 'aac', '-b:a', '192k'])
elif ext == 'opus':
encoders = ffmpeg_audio_encoders()
if 'libopus' in encoders:
params.extend(['-codec:a', 'libopus', '-b:a', '128k'])
else:
params.extend(['-codec:a', 'libvorbis', '-b:a', '128k'])
elif ext == 'spx':
if 'libspeex' in ffmpeg_audio_encoders():
params.extend(['-codec:a', 'libspeex'])
elif ext == 'flac':
params.extend(['-codec:a', 'flac', '-compression_level', '5'])
elif ext == 'wav':
params.extend(['-codec:a', 'pcm_s16le'])
if ff_format != ext:
params.extend(['-f', ff_format])
params.extend(['-threads', str(MAX_WORKERS)])
elif conversion_type == "video":
if ext in ['mp4', 'm4v', 'mov']:
params.extend(['-codec:v', 'libx264'])
params.extend(['-preset', 'veryfast'])
params.extend(['-crf', '23'])
params.extend(['-codec:a', 'aac', '-b:a', '192k'])
params.extend(['-movflags', '+faststart'])
elif ext == 'webm':
params.extend(['-codec:v', 'libvpx-vp9'])
params.extend(['-crf', '30', '-b:v', '0'])
params.extend(['-codec:a', 'libopus', '-b:a', '128k'])
elif ext == 'avi':
params.extend(['-codec:v', 'libx264'])
params.extend(['-preset', 'veryfast'])
params.extend(['-crf', '23'])
params.extend(['-codec:a', 'mp3', '-b:a', '192k'])
elif ext == 'mkv':
params.extend(['-codec:v', 'libx264'])
params.extend(['-preset', 'veryfast'])
params.extend(['-crf', '23'])
params.extend(['-codec:a', 'aac', '-b:a', '192k'])
elif ext == 'flv':
params.extend(['-codec:v', 'flv1'])
params.extend(['-codec:a', 'mp3', '-ar', '44100'])
else:
params.extend(['-codec:v', 'libx264'])
params.extend(['-preset', 'veryfast'])
params.extend(['-crf', '23'])
params.extend(['-codec:a', 'copy'])
params.extend(['-threads', str(MAX_WORKERS)])
elif conversion_type == "video_to_audio":
params.extend(['-vn'])
ff_format = AUDIO_EXT_TO_FFMPEG_FORMAT.get(ext, ext)
if ext == 'mp3':
params.extend(['-codec:a', 'libmp3lame', '-q:a', '2'])
elif ext == 'aac' or ext == 'm4a':
params.extend(['-codec:a', 'aac', '-b:a', '192k'])
elif ext == 'flac':
params.extend(['-codec:a', 'flac', '-compression_level', '5'])
elif ext == 'wav':
params.extend(['-codec:a', 'pcm_s16le'])
elif ext == 'opus':
encoders = ffmpeg_audio_encoders()
if 'libopus' in encoders:
params.extend(['-codec:a', 'libopus', '-b:a', '128k'])
else:
params.extend(['-codec:a', 'libvorbis', '-b:a', '128k'])
else:
params.extend(['-codec:a', 'copy'])
if ff_format != ext:
params.extend(['-f', ff_format])
params.extend(['-threads', str(MAX_WORKERS)])
params.append(output_file)
return params
def convert_audio_ffmpeg(input_file, output_file, output_ext):
"""Fast audio conversion via direct FFmpeg call"""
if not os.path.exists(input_file):
return False
output_dir = os.path.dirname(output_file)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir, exist_ok=True)
params = get_optimal_ffmpeg_params(input_file, output_file, "audio")
return run_ffmpeg_with_progress(params)
def convert_video_ffmpeg(input_file, output_file, conversion_type):
"""Fast video conversion via direct FFmpeg call"""
conv_type = "video_to_audio" if conversion_type == "Video to Audio" else "video"
if not os.path.exists(input_file):
return False
output_dir = os.path.dirname(output_file)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir, exist_ok=True)
params = get_optimal_ffmpeg_params(input_file, output_file, conv_type)
return run_ffmpeg_with_progress(params)
def merge_audio_files_ffmpeg(input_files, output_file, gap_duration):
"""Fast audio merging via FFmpeg"""
list_file = f"concat_{uuid.uuid4().hex}.txt"
try:
with open(list_file, 'w') as f:
for file in input_files:
f.write(f"file '{os.path.abspath(file)}'\n")
params = [
'-hide_banner',
'-loglevel', 'error',
'-f', 'concat',
'-safe', '0',
'-i', list_file,
'-c', 'copy',
'-threads', str(MAX_WORKERS),
output_file
]
return run_ffmpeg_with_progress(params)
finally:
if os.path.exists(list_file):
os.remove(list_file)
def convert_image_pillow(input_file, output_path, output_ext):
"""Image conversion with maximum quality"""
try:
pil_format = pil_format_for_ext(output_ext)
if not pil_format:
return False
with Image.open(input_file) as img:
# Preserve original mode when possible
original_mode = img.mode
if pil_format == "JPEG":
# JPEG doesn't support transparency
if original_mode in ('RGBA', 'LA', 'P'):
if original_mode == 'P':
img = img.convert('RGBA')
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode in ('RGBA', 'LA'):
background.paste(img, mask=img.split()[-1])
img = background
else:
img = img.convert("RGB")
# Maximum quality: quality=100, no subsampling
img.save(output_path, format=pil_format, quality=100, subsampling=0)
elif pil_format == "PNG":
# Minimal compression for maximum quality
img.save(output_path, format=pil_format, compress_level=1)
elif pil_format == "WEBP":
# WebP lossless mode for maximum quality
img.save(output_path, format=pil_format, lossless=True, quality=100, method=6)
elif pil_format == "ICO":
# ICO - preserve original size and convert to RGBA
if original_mode not in ('RGBA', 'RGB'):
img = img.convert('RGBA')
# Save with original dimensions
img.save(output_path, format=pil_format, sizes=[(img.width, img.height)])
elif pil_format == "BMP":
# BMP doesn't support transparency
if original_mode in ('RGBA', 'LA', 'P'):
if original_mode == 'P':
img = img.convert('RGBA')
if img.mode in ('RGBA', 'LA'):
background = Image.new('RGB', img.size, (255, 255, 255))
background.paste(img, mask=img.split()[-1])
img = background
else:
img = img.convert('RGB')
else:
img = img.convert('RGB')
img.save(output_path, format=pil_format)
elif pil_format == "TIFF":
# TIFF without compression for maximum quality
img.save(output_path, format=pil_format, compression=None)
elif pil_format == "GIF":
# GIF with high quality palette
if original_mode == 'RGBA':
# Convert with adaptive palette for best quality
alpha = img.split()[-1]
img = img.convert('RGB').convert('P', palette=Image.ADAPTIVE, colors=255)
# Add transparency
img.paste(255, alpha.point(lambda x: 0 if x < 128 else 255))
img.info['transparency'] = 255
elif original_mode != 'P':
img = img.convert('P', palette=Image.ADAPTIVE, colors=256)
img.save(output_path, format=pil_format, optimize=False)
elif pil_format in ("PPM", "PGM", "PBM"):
# PPM formats - raw format for maximum quality
if pil_format == "PPM":
img = img.convert('RGB')
elif pil_format == "PGM":
img = img.convert('L')
elif pil_format == "PBM":
img = img.convert('1')
img.save(output_path, format="PPM")
elif pil_format == "TGA":
# TGA with RLE disabled for maximum quality
if original_mode not in ('RGBA', 'RGB'):
img = img.convert('RGBA')
img.save(output_path, format=pil_format, compression=None)
else:
# For all other formats, try to save with maximum quality
try:
img.save(output_path, format=pil_format, quality=100)
except TypeError:
# If quality parameter not supported, save without it
img.save(output_path, format=pil_format)
return True
except Exception as e:
print(f"Image conversion error: {e}")
return False
def process_file_parallel(args):
"""Function for parallel file processing"""
input_file, output_path, file_type, output_ext = args
try:
if file_type == "audio":
success = convert_audio_ffmpeg(input_file, output_path, output_ext)
elif file_type == "image":
success = convert_image_pillow(input_file, output_path, output_ext)
else:
success = False
return output_path if success else None
except Exception:
return None
def create_zip(files_to_zip, session_id):
"""Create ZIP archive"""
zip_filename = f"{session_id}.zip"
with zipfile.ZipFile(zip_filename, 'w', compression=zipfile.ZIP_DEFLATED, compresslevel=1) as zipf:
for file in tqdm(files_to_zip, desc="Creating ZIP"):
zipf.write(file, os.path.basename(file))
return zip_filename
# ----------------------- Main Processing Functions -----------------------
def process_audio_files(files, output_ext, merge_files, gap_duration, progress=gr.Progress(track_tqdm=True)):
"""Process audio files"""
if not files:
raise gr.Error("Please upload at least one audio file!")
session_id = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}"
os.makedirs(session_id, exist_ok=True)
print(f"\nStarting audio session: {session_id}")
print(f"Files to convert: {len(files)} to .{output_ext}")
file_paths = [f if isinstance(f, str) else f.name for f in files]
if merge_files:
merged_output_path = os.path.join(session_id, f"merged.{output_ext}")
temp_files = []
for i, file_path in enumerate(tqdm(file_paths, desc="Converting")):
temp_output = os.path.join(session_id, f"temp_{i}.{output_ext}")
if convert_audio_ffmpeg(file_path, temp_output, output_ext):
temp_files.append(temp_output)
if not temp_files:
raise gr.Error("No files were successfully converted")
if merge_audio_files_ffmpeg(temp_files, merged_output_path, gap_duration):
for temp_file in temp_files:
try:
os.remove(temp_file)
except:
pass
return merged_output_path
else:
raise gr.Error("Failed to merge audio files")
else:
output_files = []
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
tasks = []
for file_path in file_paths:
base_name = os.path.splitext(os.path.basename(file_path))[0]
output_path = os.path.join(session_id, f"{base_name}.{output_ext}")
tasks.append((file_path, output_path, "audio", output_ext))
results = list(tqdm(
executor.map(process_file_parallel, tasks),
total=len(tasks),
desc="Converting"
))
output_files = [r for r in results if r is not None]
if not output_files:
raise gr.Error("No files were successfully converted")
if len(output_files) > 1:
return create_zip(output_files, session_id)
return output_files[0]
def process_image_files(files, output_ext, progress=gr.Progress(track_tqdm=True)):
"""Process image files"""
if not files:
raise gr.Error("Please upload at least one image!")
session_id = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}"
os.makedirs(session_id, exist_ok=True)
print(f"\nStarting image session: {session_id}")
print(f"Files to convert: {len(files)} to .{output_ext}")
file_paths = [f if isinstance(f, str) else f.name for f in files]
output_files = []
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
tasks = []
for file_path in file_paths:
base_name = os.path.splitext(os.path.basename(file_path))[0]
output_path = os.path.join(session_id, f"{base_name}.{output_ext}")
tasks.append((file_path, output_path, "image", output_ext))
results = list(tqdm(
executor.map(process_file_parallel, tasks),
total=len(tasks),
desc="Converting"
))
output_files = [r for r in results if r is not None]
if not output_files:
raise gr.Error("No images were successfully converted")
if len(output_files) > 1:
return create_zip(output_files, session_id)
return output_files[0]
def process_video(input_video, conversion_type, output_ext, progress=gr.Progress(track_tqdm=True)):
"""Process video file"""
if not input_video:
raise gr.Error("Please upload a video file!")
session_id = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}"
os.makedirs(session_id, exist_ok=True)
input_path = input_video if isinstance(input_video, str) else input_video.name
base_name = os.path.splitext(os.path.basename(input_path))[0]
output_path = os.path.join(session_id, f"{base_name}.{output_ext}")
print(f"\nStarting video session: {session_id}")
print(f"Conversion type: {conversion_type}, Output: .{output_ext}")
progress(0.5, "Processing...")
success = convert_video_ffmpeg(input_path, output_path, conversion_type)
if success:
return output_path
else:
raise gr.Error("Video processing failed. Please check the file format and try again.")
def update_format_choices(conversion_type):
"""Update format dropdown based on conversion type"""
if conversion_type == "Video to Video":
vf = available_video_extensions()
value = "mp4" if "mp4" in vf else (vf[0] if vf else None)
return gr.Dropdown(choices=vf, value=value, label="Output Video Format")
else:
af = available_audio_extensions()
value = "mp3" if "mp3" in af else (af[0] if af else None)
return gr.Dropdown(choices=af, value=value, label="Output Audio Format")
# ----------------------- UI -----------------------
AUDIO_FORMATS = available_audio_extensions()
VIDEO_FORMATS = available_video_extensions()
IMAGE_FORMATS = available_image_extensions()
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.HTML(i18n("title_app"))
with gr.Tabs():
# AUDIO TAB
with gr.TabItem(i18n("audio_tab")):
gr.HTML(i18n("audio_description"))
with gr.Row():
with gr.Column(scale=2):
audio_file_input = gr.Files(label=i18n("label_audio_file_input"), file_types=["audio"], height=160)
with gr.Column(scale=1):
default_audio = "mp3" if "mp3" in AUDIO_FORMATS else (AUDIO_FORMATS[0] if AUDIO_FORMATS else None)
audio_format_choice = gr.Dropdown(choices=AUDIO_FORMATS, label=i18n("label_audio_format_choice"), value=default_audio)
merge_files_checkbox = gr.Checkbox(label=i18n("label_merge_files_checkbox"))
gap_slider = gr.Slider(minimum=0, maximum=5000, step=100, value=500, label=i18n("label_gap_slider"), visible=False)
audio_submit_button = gr.Button(i18n("сonvert"), variant="primary")
audio_output_file = gr.File(label=i18n("download_result"))
merge_files_checkbox.change(
lambda x: gr.update(visible=x),
inputs=merge_files_checkbox,
outputs=gap_slider
)
audio_submit_button.click(
fn=process_audio_files,
inputs=[audio_file_input, audio_format_choice, merge_files_checkbox, gap_slider],
outputs=audio_output_file
)
# IMAGE TAB
with gr.TabItem(i18n("image_tab")):
gr.HTML(i18n("image_description"))
with gr.Row():
with gr.Column(scale=2):
image_file_input = gr.Files(label=i18n("label_image_file_input"), file_types=["image"], height=160)
with gr.Column(scale=1):
default_image = "png" if "png" in IMAGE_FORMATS else (IMAGE_FORMATS[0] if IMAGE_FORMATS else None)
image_format_choice = gr.Dropdown(choices=IMAGE_FORMATS, label=i18n("label_image_format_choice"), value=default_image)
image_submit_button = gr.Button(i18n("сonvert"), variant="primary")
image_output_file = gr.File(label=i18n("download_result"))
image_submit_button.click(
fn=process_image_files,
inputs=[image_file_input, image_format_choice],
outputs=image_output_file
)
# VIDEO TAB
with gr.TabItem(i18n("video_tab")):
gr.HTML(i18n("video_description"))
with gr.Row():
with gr.Column(scale=2):
video_input = gr.File(label=i18n("label_video_input"), file_types=["video"], height=160)
with gr.Column(scale=1):
conversion_type_radio = gr.Radio(
choices=["Video to Video", "Video to Audio"],
label=i18n("label_conversion_type_radio"),
value="Video to Video"
)
default_video = "mp4" if "mp4" in VIDEO_FORMATS else (VIDEO_FORMATS[0] if VIDEO_FORMATS else None)
video_format_dropdown = gr.Dropdown(
choices=VIDEO_FORMATS,
label=i18n("label_video_format_dropdown"),
value=default_video
)
video_submit_button = gr.Button(i18n("сonvert"), variant="primary")
video_output_file = gr.File(label=i18n("download_result"))
conversion_type_radio.change(
fn=update_format_choices,
inputs=conversion_type_radio,
outputs=video_format_dropdown
)
video_submit_button.click(
fn=process_video,
inputs=[video_input, conversion_type_radio, video_format_dropdown],
outputs=video_output_file
)
if __name__ == "__main__":
demo.queue().launch(i18n=i18n, debug=True, show_error=True)