Daankular/models / Wan2GP /shared /utils /video_metadata.py
Daankular's picture
download
raw
38.3 kB
"""
Video Metadata Utilities
This module provides functionality to read and write JSON metadata to video files.
- MP4: Uses mutagen to store metadata in ©cmt tag
- MKV: Uses FFmpeg to store metadata in comment/description tags and source images as attached pictures
"""
import json
import mmap
import os
import shutil
import subprocess
import tempfile
import time
from pathlib import Path
from shared.utils.video_decode import resolve_media_binary
DEFAULT_RESERVED_VIDEO_METADATA_BYTES = 50 * 1024
_RESERVED_METADATA_KEY = "_wangp_metadata_reserved"
_MP4_COMMENT_TAG = "\xa9cmt"
_MP4_COMMENT_BOX = b"\xa9cmt"
_MKV_COMMENT_NAME = b"\x45\xa3\x87COMMENT"
_CONTAINER_COMMENT_TAGS = ("comment", "COMMENT", "description", "DESCRIPTION")
_MP4_METADATA_CONTAINERS = {b"moov", b"udta", b"meta", b"ilst", _MP4_COMMENT_BOX}
_MP4_METADATA_EXTENSIONS = (".mp4", ".mov")
_IMAGE_EXTENSIONS = (".jpg", ".jpeg", ".png")
def _resolve_media_tool(name):
return resolve_media_binary(name) or name
def _is_verbose_metadata_debug(verbose_level):
try:
return int(verbose_level or 0) >= 2
except (TypeError, ValueError):
return False
def _log_metadata_debug(verbose_level, message):
if _is_verbose_metadata_debug(verbose_level):
print(f"[Video Metadata] {message}")
def _normalize_metadata_text(text):
if isinstance(text, bytes):
text = text.decode("utf-8", errors="ignore")
return str(text or "").replace("\ufeff", "").rstrip("\0")
def _parse_metadata_text(text):
payload = _normalize_metadata_text(text)
if len(payload.strip()) == 0:
return None
try:
metadata = json.loads(payload)
except (TypeError, ValueError, json.JSONDecodeError):
return None
return None if isinstance(metadata, dict) and metadata.get(_RESERVED_METADATA_KEY) else metadata
def _encode_metadata_bytes(metadata_dict):
return json.dumps(metadata_dict, ensure_ascii=False, separators=(",", ":")).encode("utf-8")
def _pad_metadata_bytes(payload_bytes, reserved_bytes):
reserved_bytes = int(reserved_bytes)
if len(payload_bytes) > reserved_bytes:
return None
return payload_bytes + (b" " * (reserved_bytes - len(payload_bytes)))
def build_reserved_video_metadata_text(reserved_bytes=DEFAULT_RESERVED_VIDEO_METADATA_BYTES):
reserved_bytes = max(128, int(reserved_bytes))
placeholder = _encode_metadata_bytes({_RESERVED_METADATA_KEY: True})
return _pad_metadata_bytes(placeholder, max(reserved_bytes, len(placeholder))).decode("utf-8")
def _escape_ffmetadata_value(value):
text = str(value or "").replace("\\", "\\\\").replace("\r", "").replace("\n", "\\\n")
return text.replace("=", "\\=").replace(";", "\\;").replace("#", "\\#")
def _write_ffmetadata_file(file_path, tags):
with open(file_path, "w", encoding="utf-8", newline="\n") as handle:
handle.write(";FFMETADATA1\n")
for key, value in (tags or {}).items():
if value is None:
continue
handle.write(f"{key}={_escape_ffmetadata_value(value)}\n")
return file_path
def write_reserved_video_ffmetadata(file_path, reserved_bytes=DEFAULT_RESERVED_VIDEO_METADATA_BYTES):
return _write_ffmetadata_file(file_path, {"comment": build_reserved_video_metadata_text(reserved_bytes)})
def _read_container_tags(file_path):
ffprobe_path = _resolve_media_tool("ffprobe")
result = subprocess.run([ffprobe_path, "-v", "error", "-show_entries", "format_tags", "-of", "json", file_path], capture_output=True, text=True, encoding="utf-8", errors="ignore", check=False)
if result.returncode != 0:
return {}
try:
tags = ((json.loads(result.stdout) or {}).get("format") or {}).get("tags") or {}
except (TypeError, ValueError, json.JSONDecodeError):
return {}
return {str(key): str(value) for key, value in tags.items() if value is not None}
def _make_temp_output_path(file_path, suffix):
path = Path(file_path)
return str(path.with_name(f"{path.stem}_{suffix}{path.suffix}"))
def _read_mp4_box(mm, offset, end):
if offset + 8 > end:
return None
box_size = int.from_bytes(mm[offset:offset + 4], "big")
box_type = bytes(mm[offset + 4:offset + 8])
header_size = 8
if box_size == 1:
if offset + 16 > end:
return None
box_size = int.from_bytes(mm[offset + 8:offset + 16], "big")
header_size = 16
elif box_size == 0:
box_size = end - offset
box_end = offset + box_size
if box_size < header_size or box_end > end:
return None
content_offset = offset + header_size + (4 if box_type == b"meta" else 0)
return None if content_offset > box_end else (box_type, content_offset, box_end)
def _find_mp4_comment_slot_in_range(mm, start, end):
cursor = start
while cursor + 8 <= end:
box = _read_mp4_box(mm, cursor, end)
if box is None:
break
box_type, content_offset, box_end = box
if box_type == _MP4_COMMENT_BOX:
child_cursor = content_offset
while child_cursor + 8 <= box_end:
child_box = _read_mp4_box(mm, child_cursor, box_end)
if child_box is None:
break
child_type, child_content_offset, child_end = child_box
if child_type == b"data" and child_content_offset + 8 <= child_end:
payload_offset = child_content_offset + 8
return payload_offset, child_end - payload_offset
child_cursor = child_end
elif box_type in _MP4_METADATA_CONTAINERS:
slot = _find_mp4_comment_slot_in_range(mm, content_offset, box_end)
if slot is not None:
return slot
cursor = box_end
return None
def _find_mp4_comment_slot(file_path):
with open(file_path, "rb") as handle:
with mmap.mmap(handle.fileno(), 0, access=mmap.ACCESS_READ) as mm:
return _find_mp4_comment_slot_in_range(mm, 0, len(mm))
def _find_reserved_metadata_text_slot(file_path):
placeholder = _encode_metadata_bytes({_RESERVED_METADATA_KEY: True})
with open(file_path, "rb") as handle:
with mmap.mmap(handle.fileno(), 0, access=mmap.ACCESS_READ) as mm:
cursor = 0
fallback_slot = None
comment_slot = None
while True:
payload_offset = mm.find(placeholder, cursor)
if payload_offset < 0:
break
payload_end = payload_offset + len(placeholder)
while payload_end < len(mm) and mm[payload_end] == 0x20:
payload_end += 1
slot = payload_offset, payload_end - payload_offset
fallback_slot = slot
if mm.rfind(_MP4_COMMENT_BOX, max(0, payload_offset - 64), payload_offset) >= 0:
comment_slot = slot
cursor = payload_offset + len(placeholder)
return comment_slot or fallback_slot
def _find_mp4_metadata_slot(file_path):
return _find_reserved_metadata_text_slot(file_path) or _find_mp4_comment_slot(file_path)
def _read_ebml_size(mm, offset):
if offset >= len(mm):
return None
first = mm[offset]
mask = 0x80
length = 1
while length <= 8 and not (first & mask):
mask >>= 1
length += 1
if length > 8 or offset + length > len(mm):
return None
value = first & (mask - 1)
for index in range(1, length):
value = (value << 8) | mm[offset + index]
return value, length
def _find_mkv_comment_slot(file_path):
with open(file_path, "rb") as handle:
with mmap.mmap(handle.fileno(), 0, access=mmap.ACCESS_READ) as mm:
cursor = 0
while True:
name_offset = mm.find(_MKV_COMMENT_NAME, cursor)
if name_offset < 0:
return None
search_start = name_offset + len(_MKV_COMMENT_NAME)
search_end = min(len(mm), search_start + 8192)
value_offset = mm.find(b"\x44\x87", search_start, search_end)
if value_offset >= 0:
parsed_size = _read_ebml_size(mm, value_offset + 2)
if parsed_size is not None:
data_size, size_len = parsed_size
data_offset = value_offset + 2 + size_len
if data_offset + data_size <= len(mm):
return data_offset, data_size
cursor = name_offset + len(_MKV_COMMENT_NAME)
def _write_metadata_slot(file_path, slot, payload_bytes):
if slot is None:
return False
payload_offset, reserved_bytes = slot
padded = _pad_metadata_bytes(payload_bytes, reserved_bytes)
if padded is None:
return False
with open(file_path, "r+b") as handle:
handle.seek(payload_offset)
handle.write(padded)
return True
def _maybe_update_metadata_in_place(file_path, payload_bytes, *, container_name, find_slot_fn, allow_inplace_update=False, verbose_level=0):
if not allow_inplace_update:
return False
started_at = time.perf_counter()
slot = find_slot_fn(file_path)
elapsed_ms = (time.perf_counter() - started_at) * 1000.0
if slot is None:
_log_metadata_debug(verbose_level, f"{container_name}: no reserved metadata slot found in {elapsed_ms:.1f} ms")
return False
payload_offset, reserved_bytes = slot
if len(payload_bytes) > int(reserved_bytes):
_log_metadata_debug(verbose_level, f"{container_name}: reserved metadata slot too small ({len(payload_bytes)} > {int(reserved_bytes)}) after {elapsed_ms:.1f} ms")
return False
ok = _write_metadata_slot(file_path, slot, payload_bytes)
if ok:
_log_metadata_debug(verbose_level, f"{container_name}: updated metadata in place in {elapsed_ms:.1f} ms at offset {int(payload_offset)}")
else:
_log_metadata_debug(verbose_level, f"{container_name}: in-place update failed after slot lookup in {elapsed_ms:.1f} ms")
return ok
def _copy_video_with_comment(file_path, metadata_text, source_images=None):
ffmpeg_path = _resolve_media_tool("ffmpeg")
temp_output_path = _make_temp_output_path(file_path, "metadata")
meta_dir = tempfile.mkdtemp(prefix="wangp_metadata_")
metadata_path = os.path.join(meta_dir, "comment.ffmeta")
tags = {key: value for key, value in _read_container_tags(file_path).items() if str(key).lower() not in {"comment", "description"}}
tags["comment"] = metadata_text
try:
_write_ffmetadata_file(metadata_path, tags)
command = [ffmpeg_path, "-y", "-v", "error", "-i", file_path, "-f", "ffmetadata", "-i", metadata_path, "-map", "0", "-map_metadata", "1", "-c", "copy"]
for attachment_no, (attachment_path, attachment_filename, mimetype) in enumerate(_materialize_mkv_attachments(source_images, meta_dir)):
command += ["-attach", attachment_path, f"-metadata:s:t:{attachment_no}", f"mimetype={mimetype}", f"-metadata:s:t:{attachment_no}", f"filename={attachment_filename}"]
command += [temp_output_path]
result = subprocess.run(command, capture_output=True, text=True, encoding="utf-8", errors="ignore", check=False)
if result.returncode != 0 or not os.path.isfile(temp_output_path):
if os.path.exists(temp_output_path):
os.remove(temp_output_path)
return False, (result.stderr or result.stdout or "").strip()
os.replace(temp_output_path, file_path)
return True, ""
finally:
shutil.rmtree(meta_dir, ignore_errors=True)
def _convert_image_to_bytes(img):
"""
Convert various image formats to bytes suitable for MP4 cover art.
Args:
img: Can be:
- PIL Image object
- File path (str)
- bytes
Returns:
tuple: (image_bytes, image_format)
- image_bytes: Binary image data
- image_format: AtomDataType constant (JPEG or PNG)
"""
from mutagen.mp4 import AtomDataType
from PIL import Image
import io
import os
try:
# If it's already bytes, detect format and return
if isinstance(img, bytes):
# Detect format from magic numbers
if img.startswith(b'\x89PNG'):
return img, AtomDataType.PNG
else:
return img, AtomDataType.JPEG
# If it's a file path, read and convert
if isinstance(img, str):
if not os.path.exists(img):
print(f"Warning: Image file not found: {img}")
return None, None
# Determine format from extension
ext = os.path.splitext(img)[1].lower()
# Open with PIL for conversion
pil_img = Image.open(img)
# Convert to RGB if necessary (handles RGBA, P, etc.)
if pil_img.mode not in ('RGB', 'L'):
if pil_img.mode == 'RGBA':
# Create white background for transparency
background = Image.new('RGB', pil_img.size, (255, 255, 255))
background.paste(pil_img, mask=pil_img.split()[3])
pil_img = background
else:
pil_img = pil_img.convert('RGB')
# Save to bytes
img_bytes = io.BytesIO()
# Use PNG for lossless formats, JPEG for others
if ext in ['.png', '.bmp', '.tiff', '.tif']:
pil_img.save(img_bytes, format='PNG')
img_format = AtomDataType.PNG
else:
pil_img.save(img_bytes, format='JPEG', quality=95)
img_format = AtomDataType.JPEG
return img_bytes.getvalue(), img_format
# If it's a PIL Image
if isinstance(img, Image.Image):
# Convert to RGB if necessary
if img.mode not in ('RGB', 'L'):
if img.mode == 'RGBA':
background = Image.new('RGB', img.size, (255, 255, 255))
background.paste(img, mask=img.split()[3])
img = background
else:
img = img.convert('RGB')
# Save to bytes (prefer PNG for quality)
img_bytes = io.BytesIO()
img.save(img_bytes, format='PNG')
return img_bytes.getvalue(), AtomDataType.PNG
print(f"Warning: Unsupported image type: {type(img)}")
return None, None
except Exception as e:
print(f"Error converting image to bytes: {e}")
return None, None
def _safe_metadata_filename_part(text, default):
safe = "".join(char if char.isalnum() or char in "._-" else "_" for char in str(text or "").strip())
safe = safe.strip("._")
return safe or default
def _split_mkv_attachment_filename(filename):
filename = os.path.basename(str(filename or ""))
stem, ext = os.path.splitext(filename)
if "__" in stem:
tag, original_stem = stem.split("__", 1)
return _safe_metadata_filename_part(tag, "unknown"), _safe_metadata_filename_part(original_stem, "source") + ext
return "unknown", _safe_metadata_filename_part(filename, "source.png")
def _normalize_source_images(source_images):
if not source_images:
return None
normalized = {}
if isinstance(source_images, dict):
iterable = source_images.items()
else:
iterable = ((_split_mkv_attachment_filename(img)[0], img) for img in list(source_images if isinstance(source_images, (list, tuple)) else [source_images]))
for img_tag, img_data in iterable:
img_list = img_data if isinstance(img_data, list) else [img_data]
valid_images = [img for img in img_list if img is not None]
if valid_images:
normalized.setdefault(str(img_tag or "unknown"), []).extend(valid_images)
return normalized or None
def _image_extension_and_mimetype(image_bytes):
if image_bytes.startswith(b"\x89PNG"):
return ".png", "image/png"
return ".jpg", "image/jpeg"
def _materialize_mkv_attachments(source_images, work_dir):
source_images = _normalize_source_images(source_images)
if source_images is None:
return []
attachments = []
used_filenames = set()
for img_tag, img_list in source_images.items():
safe_tag = _safe_metadata_filename_part(img_tag, "unknown")
for image_no, img in enumerate(img_list):
image_bytes, _image_format = _convert_image_to_bytes(img)
if not image_bytes:
continue
extension, mimetype = _image_extension_and_mimetype(image_bytes)
if isinstance(img, str) and os.path.exists(img):
_attachment_tag, original_filename = _split_mkv_attachment_filename(os.path.basename(img))
original_name = os.path.splitext(original_filename)[0]
else:
original_name = f"source_{image_no}"
safe_name = _safe_metadata_filename_part(original_name, f"source_{image_no}")
attachment_filename = f"{safe_tag}__{safe_name}{extension}"
base_name, ext = os.path.splitext(attachment_filename)
counter = 1
while attachment_filename.lower() in used_filenames:
attachment_filename = f"{base_name}_{counter}{ext}"
counter += 1
used_filenames.add(attachment_filename.lower())
attachment_path = os.path.join(work_dir, attachment_filename)
with open(attachment_path, "wb") as handle:
handle.write(image_bytes)
attachments.append((attachment_path, attachment_filename, mimetype))
return attachments
def embed_source_images_metadata_mp4(file, source_images):
from mutagen.mp4 import MP4, MP4Cover, AtomDataType
import json
import os
source_images = _normalize_source_images(source_images)
if not source_images:
return file
try:
if file.tags is None:
file.add_tags()
# Convert source images to cover art and build metadata
cover_data = []
image_metadata = {} # Maps tag to list of {index, filename, extension}
# Process each source image type
for img_tag, img_data in source_images.items():
if img_data is None:
continue
tag_images = []
# Normalize to list for uniform processing
img_list = img_data if isinstance(img_data, list) else [img_data]
for img in img_list:
if img is not None:
cover_bytes, image_format = _convert_image_to_bytes(img)
if cover_bytes:
# Extract filename and extension
if isinstance(img, str) and os.path.exists(img):
filename = os.path.basename(img)
extension = os.path.splitext(filename)[1]
else:
# PIL Image or unknown - infer from format
extension = '.png' if image_format == AtomDataType.PNG else '.jpg'
filename = f"{img_tag}{extension}"
tag_images.append({
'index': len(cover_data),
'filename': filename,
'extension': extension
})
cover_data.append(MP4Cover(cover_bytes, image_format))
if tag_images:
image_metadata[img_tag] = tag_images
if cover_data:
file.tags['----:com.apple.iTunes:EMBEDDED_IMAGES'] = cover_data
# Store the complete metadata as JSON
file.tags['----:com.apple.iTunes:IMAGE_METADATA'] = json.dumps(image_metadata).encode('utf-8')
# print(f"Successfully embedded {len(cover_data)} cover images")
# print(f"Image tags: {list(image_metadata.keys())}")
except Exception as e:
print(f"Failed to embed cover art with mutagen: {e}")
print(f"This might be due to image format or MP4 file structure issues")
return file
def _legacy_save_metadata_to_mp4(file_path, metadata_dict, source_images = None):
"""
Legacy MP4 metadata writer kept for reference.
Args:
file_path (str): Path to MP4 file
metadata_dict (dict): Metadata dictionary to save
Returns:
bool: True if successful, False otherwise
"""
try:
from mutagen.mp4 import MP4
file = MP4(file_path)
file.tags['©cmt'] = [json.dumps(metadata_dict)]
if source_images is not None:
embed_source_images_metadata_mp4(file, source_images)
file.save()
return True
except Exception as e:
print(f"Error saving metadata to MP4 {file_path}: {e}")
return False
def _legacy_save_metadata_to_mkv(file_path, metadata_dict):
"""
Legacy MKV metadata writer kept for reference.
Args:
file_path (str): Path to MKV file
metadata_dict (dict): Metadata dictionary to save
Returns:
bool: True if successful, False otherwise
"""
try:
# Create temporary file with metadata
temp_path = file_path.replace('.mkv', '_temp_with_metadata.mkv')
# Use FFmpeg to add metadata while preserving ALL streams (including attachments)
ffmpeg_cmd = [
'ffmpeg', '-y', '-i', file_path,
'-metadata', f'comment={json.dumps(metadata_dict)}',
'-map', '0', # Map all streams from input (including attachments)
'-c', 'copy', # Copy streams without re-encoding
temp_path
]
result = subprocess.run(ffmpeg_cmd, capture_output=True, text=True)
if result.returncode == 0:
# Replace original with metadata version
shutil.move(temp_path, file_path)
return True
else:
print(f"Warning: Failed to add metadata to MKV file: {result.stderr}")
# Clean up temp file if it exists
if os.path.exists(temp_path):
os.remove(temp_path)
return False
except Exception as e:
print(f"Error saving metadata to MKV {file_path}: {e}")
return False
def _legacy_save_video_metadata(file_path, metadata_dict, source_images= None):
"""
Legacy video metadata writer kept for reference.
Args:
file_path (str): Path to video file
metadata_dict (dict): Metadata dictionary to save
Returns:
bool: True if successful, False otherwise
"""
if str(file_path).lower().endswith(_MP4_METADATA_EXTENSIONS):
return save_metadata_to_mp4(file_path, metadata_dict, source_images)
elif str(file_path).lower().endswith('.mkv'):
return save_metadata_to_mkv(file_path, metadata_dict)
else:
return False
def _legacy_read_metadata_from_mp4(file_path):
"""
Legacy MP4 metadata reader kept for reference.
Args:
file_path (str): Path to MP4 file
Returns:
dict or None: Metadata dictionary if found, None otherwise
"""
try:
from mutagen.mp4 import MP4
file = MP4(file_path)
tags = file.tags['©cmt'][0]
return json.loads(tags)
except Exception:
return None
def _legacy_read_metadata_from_mkv(file_path):
"""
Legacy MKV metadata reader kept for reference.
Args:
file_path (str): Path to MKV file
Returns:
dict or None: Metadata dictionary if found, None otherwise
"""
try:
# Try to get metadata using ffprobe
result = subprocess.run([
'ffprobe', '-v', 'quiet', '-print_format', 'json',
'-show_format', file_path
], capture_output=True, text=True)
if result.returncode == 0:
probe_data = json.loads(result.stdout)
format_tags = probe_data.get('format', {}).get('tags', {})
# Look for our metadata in various possible tag locations
for tag_key in ['comment', 'COMMENT', 'description', 'DESCRIPTION']:
if tag_key in format_tags:
try:
return json.loads(format_tags[tag_key])
except:
continue
return None
except Exception:
return None
def _legacy_read_metadata_from_video(file_path):
"""
Legacy video metadata reader kept for reference.
Args:
file_path (str): Path to video file
Returns:
dict or None: Metadata dictionary if found, None otherwise
"""
if str(file_path).lower().endswith(_MP4_METADATA_EXTENSIONS):
return read_metadata_from_mp4(file_path)
elif str(file_path).lower().endswith('.mkv'):
return read_metadata_from_mkv(file_path)
else:
return None
def save_metadata_to_mp4(file_path, metadata_dict, source_images = None, allow_inplace_update=False, verbose_level=0):
metadata_text = json.dumps(metadata_dict, ensure_ascii=False, separators=(",", ":"))
payload_bytes = metadata_text.encode("utf-8")
source_images = _normalize_source_images(source_images)
if source_images is None and _maybe_update_metadata_in_place(file_path, payload_bytes, container_name="MP4", find_slot_fn=_find_mp4_metadata_slot, allow_inplace_update=allow_inplace_update, verbose_level=verbose_level):
return True
if source_images is not None:
_log_metadata_debug(verbose_level, "MP4: skipping in-place update because embedded images are being written too")
try:
from mutagen.mp4 import MP4
file = MP4(file_path)
if file.tags is None:
file.add_tags()
file.tags[_MP4_COMMENT_TAG] = [metadata_text]
if source_images is not None:
embed_source_images_metadata_mp4(file, source_images)
file.save()
_log_metadata_debug(verbose_level, "MP4: used standard metadata save path (non in-place)")
return True
except Exception as e:
print(f"Error saving metadata to MP4 {file_path}: {e}")
return False
def save_metadata_to_mkv(file_path, metadata_dict, source_images=None, allow_inplace_update=False, verbose_level=0):
metadata_text = json.dumps(metadata_dict, ensure_ascii=False, separators=(",", ":"))
payload_bytes = metadata_text.encode("utf-8")
source_images = _normalize_source_images(source_images)
if source_images is None and _maybe_update_metadata_in_place(file_path, payload_bytes, container_name="MKV", find_slot_fn=_find_mkv_comment_slot, allow_inplace_update=allow_inplace_update, verbose_level=verbose_level):
return True
if source_images is not None:
_log_metadata_debug(verbose_level, "MKV: skipping in-place update because embedded images are being written too")
try:
ok, error = _copy_video_with_comment(file_path, metadata_text, source_images)
if ok:
_log_metadata_debug(verbose_level, "MKV: created a rewritten container copy to store metadata")
return True
print(f"Warning: Failed to add metadata to MKV file: {error}")
return False
except Exception as e:
print(f"Error saving metadata to MKV {file_path}: {e}")
return False
def save_video_metadata(file_path, metadata_dict, source_images= None, allow_inplace_update=False, verbose_level=0):
source_images = _normalize_source_images(source_images)
if str(file_path).lower().endswith(_MP4_METADATA_EXTENSIONS):
return save_metadata_to_mp4(file_path, metadata_dict, source_images, allow_inplace_update=allow_inplace_update, verbose_level=verbose_level)
if str(file_path).lower().endswith('.mkv'):
return save_metadata_to_mkv(file_path, metadata_dict, source_images, allow_inplace_update=allow_inplace_update, verbose_level=verbose_level)
return False
def read_metadata_from_mp4(file_path):
try:
from mutagen.mp4 import MP4
file = MP4(file_path)
tag_values = file.tags.get(_MP4_COMMENT_TAG) if file.tags is not None else None
metadata = None if not tag_values else _parse_metadata_text(tag_values[0])
if metadata is not None:
return metadata
except Exception:
pass
for tag_key in _CONTAINER_COMMENT_TAGS:
metadata = _parse_metadata_text(_read_container_tags(file_path).get(tag_key))
if metadata is not None:
return metadata
return None
def read_metadata_from_mkv(file_path):
try:
ffprobe_path = _resolve_media_tool("ffprobe")
result = subprocess.run([ffprobe_path, "-v", "quiet", "-print_format", "json", "-show_format", file_path], capture_output=True, text=True, encoding="utf-8", errors="ignore", check=False)
if result.returncode == 0:
probe_data = json.loads(result.stdout)
format_tags = probe_data.get("format", {}).get("tags", {})
for tag_key in _CONTAINER_COMMENT_TAGS:
metadata = _parse_metadata_text(format_tags.get(tag_key))
if metadata is not None:
return metadata
return None
except Exception:
return None
def read_metadata_from_video(file_path):
if str(file_path).lower().endswith(_MP4_METADATA_EXTENSIONS):
return read_metadata_from_mp4(file_path)
if str(file_path).lower().endswith('.mkv'):
return read_metadata_from_mkv(file_path)
return None
def _extract_mp4_cover_art(video_path, output_dir = None):
"""
Extract cover art from MP4 files using mutagen with proper tag association.
Args:
video_path (str): Path to the MP4 file
output_dir (str): Directory to save extracted images
Returns:
dict: Dictionary mapping tags to lists of extracted image file paths
Format: {tag_name: [path1, path2, ...], ...}
"""
try:
from mutagen.mp4 import MP4
import json
file = MP4(video_path)
if file.tags is None or '----:com.apple.iTunes:EMBEDDED_IMAGES' not in file.tags:
return {}
cover_art = file.tags['----:com.apple.iTunes:EMBEDDED_IMAGES']
# Retrieve the image metadata
metadata_data = file.tags.get('----:com.apple.iTunes:IMAGE_METADATA')
if metadata_data:
# Deserialize metadata and extract with original filenames
image_metadata = json.loads(metadata_data[0].decode('utf-8'))
extracted_files = {}
for tag, tag_images in image_metadata.items():
extracted_files[tag] = []
for img_info in tag_images:
cover_idx = img_info['index']
if cover_idx >= len(cover_art):
continue
if output_dir is None: output_dir = _create_temp_dir()
os.makedirs(output_dir, exist_ok=True)
cover = cover_art[cover_idx]
# Use original filename
filename = img_info['filename']
output_file = os.path.join(output_dir, filename)
# Handle duplicate filenames by adding suffix
if os.path.exists(output_file):
base, ext = os.path.splitext(filename)
counter = 1
while os.path.exists(output_file):
filename = f"{base}_{counter}{ext}"
output_file = os.path.join(output_dir, filename)
counter += 1
# Write cover art to file
with open(output_file, 'wb') as f:
f.write(cover)
if os.path.exists(output_file):
extracted_files[tag].append(output_file)
return extracted_files
else:
# Fallback: Extract all images with generic naming
print(f"Warning: No IMAGE_METADATA found in {video_path}, using generic extraction")
extracted_files = {'unknown': []}
for i, cover in enumerate(cover_art):
if output_dir is None: output_dir = _create_temp_dir()
os.makedirs(output_dir, exist_ok=True)
filename = f"cover_art_{i}.jpg"
output_file = os.path.join(output_dir, filename)
with open(output_file, 'wb') as f:
f.write(cover)
if os.path.exists(output_file):
extracted_files['unknown'].append(output_file)
return extracted_files
except Exception as e:
print(f"Error extracting cover art from MP4: {e}")
return {}
def _create_temp_dir():
temp_dir = tempfile.mkdtemp()
os.makedirs(temp_dir, exist_ok=True)
return temp_dir
def extract_source_images(video_path, output_dir = None):
# Handle MP4 files with mutagen
if video_path.lower().endswith(_MP4_METADATA_EXTENSIONS):
return _extract_mp4_cover_art(video_path, output_dir)
if output_dir is None:
output_dir = _create_temp_dir()
os.makedirs(output_dir, exist_ok=True)
# Handle MKV files with ffmpeg attached pictures.
try:
ffprobe_path = _resolve_media_tool("ffprobe")
ffmpeg_path = _resolve_media_tool("ffmpeg")
# First, probe the video to find attachment streams (attached pics)
probe_cmd = [
ffprobe_path, '-v', 'quiet', '-print_format', 'json',
'-show_streams', video_path
]
result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True)
import json as json_module
probe_data = json_module.loads(result.stdout)
# Find attachment streams (attached pics)
attachment_streams = []
for i, stream in enumerate(probe_data.get('streams', [])):
# Check for attachment streams in multiple ways:
# 1. Traditional attached_pic flag
# 2. Video streams with image-like metadata (filename, mimetype)
# 3. MJPEG codec which is commonly used for embedded images
is_attached_pic = stream.get('disposition', {}).get('attached_pic', 0) == 1
# Check for image metadata in video streams (our case after metadata embedding)
tags = stream.get('tags', {})
has_image_metadata = (
'FILENAME' in tags and tags['FILENAME'].lower().endswith(('.jpg', '.jpeg', '.png')) or
'filename' in tags and tags['filename'].lower().endswith(('.jpg', '.jpeg', '.png')) or
'MIMETYPE' in tags and tags['MIMETYPE'].startswith('image/') or
'mimetype' in tags and tags['mimetype'].startswith('image/')
)
# Check for MJPEG codec (common for embedded images)
is_mjpeg = stream.get('codec_name') == 'mjpeg'
if (stream.get('codec_type') == 'video' and
(is_attached_pic or (has_image_metadata and is_mjpeg))):
attachment_streams.append(i)
if not attachment_streams:
return {}
# Extract each attachment stream
extracted_files = {}
used_filenames = set() # Track filenames to avoid collisions
for stream_idx in attachment_streams:
# Get original filename from metadata if available
stream_info = probe_data['streams'][stream_idx]
tags = stream_info.get('tags', {})
original_filename = (
tags.get('filename') or
tags.get('FILENAME') or
f'attachment_{stream_idx}.png'
)
img_tag, original_filename = _split_mkv_attachment_filename(original_filename)
# Clean filename for filesystem
safe_filename = os.path.basename(original_filename)
if not safe_filename.lower().endswith(_IMAGE_EXTENSIONS):
safe_filename += '.png'
# Handle filename collisions
base_name, ext = os.path.splitext(safe_filename)
counter = 0
final_filename = safe_filename
while final_filename in used_filenames:
counter += 1
final_filename = f"{base_name}_{counter}{ext}"
used_filenames.add(final_filename)
output_file = os.path.join(output_dir, final_filename)
# Extract the attachment stream
extract_cmd = [
ffmpeg_path, '-y', '-v', 'error', '-i', video_path,
'-map', f'0:{stream_idx}', '-frames:v', '1', '-update', '1',
output_file
]
try:
subprocess.run(extract_cmd, capture_output=True, text=True, check=True)
if os.path.exists(output_file):
extracted_files.setdefault(img_tag, []).append(output_file)
except subprocess.CalledProcessError as e:
print(f"Failed to extract attachment {stream_idx} from {os.path.basename(video_path)}: {e.stderr}")
return extracted_files
except subprocess.CalledProcessError as e:
print(f"Error extracting source images from {os.path.basename(video_path)}: {e.stderr}")
return {}

Xet Storage Details

Size:
38.3 kB
·
Xet hash:
64cef66b0fcc1a125a94eaad4473bbf989c9ede1384e30e2692c5cca978d3c9a

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.