wan2gp / shared /utils /video_metadata.py
vidfom's picture
Upload folder using huggingface_hub
618f472 verified
"""
Video Metadata Utilities
This module provides functionality to read and write JSON metadata to video files.
- MP4: Uses mutagen to store metadata in ©cmt tag
- MKV: Uses FFmpeg to store metadata in comment/description tags
"""
import json
import subprocess
import os
import shutil
import tempfile
def _convert_image_to_bytes(img):
"""
Convert various image formats to bytes suitable for MP4 cover art.
Args:
img: Can be:
- PIL Image object
- File path (str)
- bytes
Returns:
tuple: (image_bytes, image_format)
- image_bytes: Binary image data
- image_format: AtomDataType constant (JPEG or PNG)
"""
from mutagen.mp4 import AtomDataType
from PIL import Image
import io
import os
try:
# If it's already bytes, detect format and return
if isinstance(img, bytes):
# Detect format from magic numbers
if img.startswith(b'\x89PNG'):
return img, AtomDataType.PNG
else:
return img, AtomDataType.JPEG
# If it's a file path, read and convert
if isinstance(img, str):
if not os.path.exists(img):
print(f"Warning: Image file not found: {img}")
return None, None
# Determine format from extension
ext = os.path.splitext(img)[1].lower()
# Open with PIL for conversion
pil_img = Image.open(img)
# Convert to RGB if necessary (handles RGBA, P, etc.)
if pil_img.mode not in ('RGB', 'L'):
if pil_img.mode == 'RGBA':
# Create white background for transparency
background = Image.new('RGB', pil_img.size, (255, 255, 255))
background.paste(pil_img, mask=pil_img.split()[3])
pil_img = background
else:
pil_img = pil_img.convert('RGB')
# Save to bytes
img_bytes = io.BytesIO()
# Use PNG for lossless formats, JPEG for others
if ext in ['.png', '.bmp', '.tiff', '.tif']:
pil_img.save(img_bytes, format='PNG')
img_format = AtomDataType.PNG
else:
pil_img.save(img_bytes, format='JPEG', quality=95)
img_format = AtomDataType.JPEG
return img_bytes.getvalue(), img_format
# If it's a PIL Image
if isinstance(img, Image.Image):
# Convert to RGB if necessary
if img.mode not in ('RGB', 'L'):
if img.mode == 'RGBA':
background = Image.new('RGB', img.size, (255, 255, 255))
background.paste(img, mask=img.split()[3])
img = background
else:
img = img.convert('RGB')
# Save to bytes (prefer PNG for quality)
img_bytes = io.BytesIO()
img.save(img_bytes, format='PNG')
return img_bytes.getvalue(), AtomDataType.PNG
print(f"Warning: Unsupported image type: {type(img)}")
return None, None
except Exception as e:
print(f"Error converting image to bytes: {e}")
return None, None
def embed_source_images_metadata_mp4(file, source_images):
from mutagen.mp4 import MP4, MP4Cover, AtomDataType
import json
import os
if not source_images:
return file
try:
# Convert source images to cover art and build metadata
cover_data = []
image_metadata = {} # Maps tag to list of {index, filename, extension}
# Process each source image type
for img_tag, img_data in source_images.items():
if img_data is None:
continue
tag_images = []
# Normalize to list for uniform processing
img_list = img_data if isinstance(img_data, list) else [img_data]
for img in img_list:
if img is not None:
cover_bytes, image_format = _convert_image_to_bytes(img)
if cover_bytes:
# Extract filename and extension
if isinstance(img, str) and os.path.exists(img):
filename = os.path.basename(img)
extension = os.path.splitext(filename)[1]
else:
# PIL Image or unknown - infer from format
extension = '.png' if image_format == AtomDataType.PNG else '.jpg'
filename = f"{img_tag}{extension}"
tag_images.append({
'index': len(cover_data),
'filename': filename,
'extension': extension
})
cover_data.append(MP4Cover(cover_bytes, image_format))
if tag_images:
image_metadata[img_tag] = tag_images
if cover_data:
file.tags['----:com.apple.iTunes:EMBEDDED_IMAGES'] = cover_data
# Store the complete metadata as JSON
file.tags['----:com.apple.iTunes:IMAGE_METADATA'] = json.dumps(image_metadata).encode('utf-8')
# print(f"Successfully embedded {len(cover_data)} cover images")
# print(f"Image tags: {list(image_metadata.keys())}")
except Exception as e:
print(f"Failed to embed cover art with mutagen: {e}")
print(f"This might be due to image format or MP4 file structure issues")
return file
def save_metadata_to_mp4(file_path, metadata_dict, source_images = None):
"""
Save JSON metadata to MP4 file using mutagen.
Args:
file_path (str): Path to MP4 file
metadata_dict (dict): Metadata dictionary to save
Returns:
bool: True if successful, False otherwise
"""
try:
from mutagen.mp4 import MP4
file = MP4(file_path)
file.tags['©cmt'] = [json.dumps(metadata_dict)]
if source_images is not None:
embed_source_images_metadata_mp4(file, source_images)
file.save()
return True
except Exception as e:
print(f"Error saving metadata to MP4 {file_path}: {e}")
return False
def save_metadata_to_mkv(file_path, metadata_dict):
"""
Save JSON metadata to MKV file using FFmpeg.
Args:
file_path (str): Path to MKV file
metadata_dict (dict): Metadata dictionary to save
Returns:
bool: True if successful, False otherwise
"""
try:
# Create temporary file with metadata
temp_path = file_path.replace('.mkv', '_temp_with_metadata.mkv')
# Use FFmpeg to add metadata while preserving ALL streams (including attachments)
ffmpeg_cmd = [
'ffmpeg', '-y', '-i', file_path,
'-metadata', f'comment={json.dumps(metadata_dict)}',
'-map', '0', # Map all streams from input (including attachments)
'-c', 'copy', # Copy streams without re-encoding
temp_path
]
result = subprocess.run(ffmpeg_cmd, capture_output=True, text=True)
if result.returncode == 0:
# Replace original with metadata version
shutil.move(temp_path, file_path)
return True
else:
print(f"Warning: Failed to add metadata to MKV file: {result.stderr}")
# Clean up temp file if it exists
if os.path.exists(temp_path):
os.remove(temp_path)
return False
except Exception as e:
print(f"Error saving metadata to MKV {file_path}: {e}")
return False
def save_video_metadata(file_path, metadata_dict, source_images= None):
"""
Save JSON metadata to video file (auto-detects MP4 vs MKV).
Args:
file_path (str): Path to video file
metadata_dict (dict): Metadata dictionary to save
Returns:
bool: True if successful, False otherwise
"""
if file_path.endswith('.mp4'):
return save_metadata_to_mp4(file_path, metadata_dict, source_images)
elif file_path.endswith('.mkv'):
return save_metadata_to_mkv(file_path, metadata_dict)
else:
return False
def read_metadata_from_mp4(file_path):
"""
Read JSON metadata from MP4 file using mutagen.
Args:
file_path (str): Path to MP4 file
Returns:
dict or None: Metadata dictionary if found, None otherwise
"""
try:
from mutagen.mp4 import MP4
file = MP4(file_path)
tags = file.tags['©cmt'][0]
return json.loads(tags)
except Exception:
return None
def read_metadata_from_mkv(file_path):
"""
Read JSON metadata from MKV file using ffprobe.
Args:
file_path (str): Path to MKV file
Returns:
dict or None: Metadata dictionary if found, None otherwise
"""
try:
# Try to get metadata using ffprobe
result = subprocess.run([
'ffprobe', '-v', 'quiet', '-print_format', 'json',
'-show_format', file_path
], capture_output=True, text=True)
if result.returncode == 0:
probe_data = json.loads(result.stdout)
format_tags = probe_data.get('format', {}).get('tags', {})
# Look for our metadata in various possible tag locations
for tag_key in ['comment', 'COMMENT', 'description', 'DESCRIPTION']:
if tag_key in format_tags:
try:
return json.loads(format_tags[tag_key])
except:
continue
return None
except Exception:
return None
def read_metadata_from_video(file_path):
"""
Read JSON metadata from video file (auto-detects MP4 vs MKV).
Args:
file_path (str): Path to video file
Returns:
dict or None: Metadata dictionary if found, None otherwise
"""
if file_path.endswith('.mp4'):
return read_metadata_from_mp4(file_path)
elif file_path.endswith('.mkv'):
return read_metadata_from_mkv(file_path)
else:
return None
def _extract_mp4_cover_art(video_path, output_dir = None):
"""
Extract cover art from MP4 files using mutagen with proper tag association.
Args:
video_path (str): Path to the MP4 file
output_dir (str): Directory to save extracted images
Returns:
dict: Dictionary mapping tags to lists of extracted image file paths
Format: {tag_name: [path1, path2, ...], ...}
"""
try:
from mutagen.mp4 import MP4
import json
file = MP4(video_path)
if file.tags is None or '----:com.apple.iTunes:EMBEDDED_IMAGES' not in file.tags:
return {}
cover_art = file.tags['----:com.apple.iTunes:EMBEDDED_IMAGES']
# Retrieve the image metadata
metadata_data = file.tags.get('----:com.apple.iTunes:IMAGE_METADATA')
if metadata_data:
# Deserialize metadata and extract with original filenames
image_metadata = json.loads(metadata_data[0].decode('utf-8'))
extracted_files = {}
for tag, tag_images in image_metadata.items():
extracted_files[tag] = []
for img_info in tag_images:
cover_idx = img_info['index']
if cover_idx >= len(cover_art):
continue
if output_dir is None: output_dir = _create_temp_dir()
os.makedirs(output_dir, exist_ok=True)
cover = cover_art[cover_idx]
# Use original filename
filename = img_info['filename']
output_file = os.path.join(output_dir, filename)
# Handle duplicate filenames by adding suffix
if os.path.exists(output_file):
base, ext = os.path.splitext(filename)
counter = 1
while os.path.exists(output_file):
filename = f"{base}_{counter}{ext}"
output_file = os.path.join(output_dir, filename)
counter += 1
# Write cover art to file
with open(output_file, 'wb') as f:
f.write(cover)
if os.path.exists(output_file):
extracted_files[tag].append(output_file)
return extracted_files
else:
# Fallback: Extract all images with generic naming
print(f"Warning: No IMAGE_METADATA found in {video_path}, using generic extraction")
extracted_files = {'unknown': []}
for i, cover in enumerate(cover_art):
if output_dir is None: output_dir = _create_temp_dir()
os.makedirs(output_dir, exist_ok=True)
filename = f"cover_art_{i}.jpg"
output_file = os.path.join(output_dir, filename)
with open(output_file, 'wb') as f:
f.write(cover)
if os.path.exists(output_file):
extracted_files['unknown'].append(output_file)
return extracted_files
except Exception as e:
print(f"Error extracting cover art from MP4: {e}")
return {}
def _create_temp_dir():
temp_dir = tempfile.mkdtemp()
os.makedirs(temp_dir, exist_ok=True)
return temp_dir
def extract_source_images(video_path, output_dir = None):
# Handle MP4 files with mutagen
if video_path.lower().endswith('.mp4'):
return _extract_mp4_cover_art(video_path, output_dir)
if output_dir is None:
output_dir = _create_temp_dir()
# Handle MKV files with ffmpeg (existing logic)
try:
# First, probe the video to find attachment streams (attached pics)
probe_cmd = [
'ffprobe', '-v', 'quiet', '-print_format', 'json',
'-show_streams', video_path
]
result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True)
import json as json_module
probe_data = json_module.loads(result.stdout)
# Find attachment streams (attached pics)
attachment_streams = []
for i, stream in enumerate(probe_data.get('streams', [])):
# Check for attachment streams in multiple ways:
# 1. Traditional attached_pic flag
# 2. Video streams with image-like metadata (filename, mimetype)
# 3. MJPEG codec which is commonly used for embedded images
is_attached_pic = stream.get('disposition', {}).get('attached_pic', 0) == 1
# Check for image metadata in video streams (our case after metadata embedding)
tags = stream.get('tags', {})
has_image_metadata = (
'FILENAME' in tags and tags['FILENAME'].lower().endswith(('.jpg', '.jpeg', '.png')) or
'filename' in tags and tags['filename'].lower().endswith(('.jpg', '.jpeg', '.png')) or
'MIMETYPE' in tags and tags['MIMETYPE'].startswith('image/') or
'mimetype' in tags and tags['mimetype'].startswith('image/')
)
# Check for MJPEG codec (common for embedded images)
is_mjpeg = stream.get('codec_name') == 'mjpeg'
if (stream.get('codec_type') == 'video' and
(is_attached_pic or (has_image_metadata and is_mjpeg))):
attachment_streams.append(i)
if not attachment_streams:
return []
# Extract each attachment stream
extracted_files = []
used_filenames = set() # Track filenames to avoid collisions
for stream_idx in attachment_streams:
# Get original filename from metadata if available
stream_info = probe_data['streams'][stream_idx]
tags = stream_info.get('tags', {})
original_filename = (
tags.get('filename') or
tags.get('FILENAME') or
f'attachment_{stream_idx}.png'
)
# Clean filename for filesystem
safe_filename = os.path.basename(original_filename)
if not safe_filename.lower().endswith(('.jpg', '.jpeg', '.png')):
safe_filename += '.png'
# Handle filename collisions
base_name, ext = os.path.splitext(safe_filename)
counter = 0
final_filename = safe_filename
while final_filename in used_filenames:
counter += 1
final_filename = f"{base_name}_{counter}{ext}"
used_filenames.add(final_filename)
output_file = os.path.join(output_dir, final_filename)
# Extract the attachment stream
extract_cmd = [
'ffmpeg', '-y', '-i', video_path,
'-map', f'0:{stream_idx}', '-frames:v', '1',
output_file
]
try:
subprocess.run(extract_cmd, capture_output=True, text=True, check=True)
if os.path.exists(output_file):
extracted_files.append(output_file)
except subprocess.CalledProcessError as e:
print(f"Failed to extract attachment {stream_idx} from {os.path.basename(video_path)}: {e.stderr}")
return extracted_files
except subprocess.CalledProcessError as e:
print(f"Error extracting source images from {os.path.basename(video_path)}: {e.stderr}")
return []