Files
SequenceCompressor/compress_pngs.py

324 lines
12 KiB
Python
Raw Normal View History

2025-12-11 12:12:10 -07:00
#!/usr/bin/env python3
"""
Multithreaded PNG compression script.
Compresses all PNG files in subdirectories with maximum parallelism.
"""
import os
import sys
2025-12-11 14:56:14 -07:00
import argparse
2025-12-11 12:12:10 -07:00
from pathlib import Path
from concurrent.futures import ProcessPoolExecutor, as_completed
from PIL import Image
import multiprocessing
import time
2025-12-11 14:56:14 -07:00
def compress_png(input_path, output_path, force_bitdepth=None):
"""Compress a single PNG file.
Args:
input_path: Path to input image
output_path: Path to output image
force_bitdepth: None (auto-detect), '8' (force 8-bit), or '16' (force 16-bit)
"""
2025-12-11 12:12:10 -07:00
try:
input_path = Path(input_path)
output_path = Path(output_path)
# Skip if output file already exists
if output_path.exists():
original_size = os.path.getsize(input_path)
new_size = os.path.getsize(output_path)
savings = original_size - new_size
savings_pct = (savings / original_size * 100) if original_size > 0 else 0
return (str(input_path), True, None, original_size, new_size, savings_pct, True)
# Ensure output directory exists
output_path.parent.mkdir(parents=True, exist_ok=True)
original_size = os.path.getsize(input_path)
img = Image.open(input_path)
2025-12-11 14:56:14 -07:00
# Determine target bit depth
if force_bitdepth == '8':
is_16bit = False
elif force_bitdepth == '16':
2025-12-11 14:46:02 -07:00
is_16bit = True
2025-12-11 14:56:14 -07:00
else:
# Auto-detect bit depth
is_16bit = False
original_mode = img.mode
if img.mode == 'I':
# 'I' mode typically represents 16-bit grayscale
is_16bit = True
elif img.mode in ('RGB', 'RGBA', 'LA'):
# Check if max pixel value exceeds 8-bit range
try:
# Get a sample of pixels to check
pixels = list(img.getdata())
if pixels:
# Flatten if needed (for multi-channel modes)
if isinstance(pixels[0], (tuple, list)):
max_val = max(max(p) for p in pixels[:1000]) # Sample first 1000 pixels
else:
max_val = max(pixels[:1000])
if max_val > 255:
is_16bit = True
except:
# If we can't determine, assume 8-bit to be safe
pass
2025-12-11 14:46:02 -07:00
2025-12-11 14:56:14 -07:00
# Handle bit depth conversion based on target
2025-12-11 14:46:02 -07:00
if is_16bit:
2025-12-11 14:56:14 -07:00
# Force or preserve 16-bit
if force_bitdepth == '16':
# Force 16-bit: convert to appropriate 16-bit mode
if img.mode == 'I':
# Already 16-bit grayscale
pass
elif 'A' in img.mode or img.mode == 'LA':
# Convert to 16-bit RGBA (PIL limitation: may not fully preserve 16-bit color)
2025-12-11 14:46:02 -07:00
img = img.convert('RGBA')
else:
2025-12-11 14:56:14 -07:00
# Convert to 16-bit RGB
2025-12-11 14:46:02 -07:00
img = img.convert('RGB')
2025-12-11 14:56:14 -07:00
else:
# Preserve existing 16-bit
if img.mode == 'I':
# Keep 16-bit grayscale
pass
elif img.mode in ('RGB', 'RGBA'):
# Keep color mode - PIL may preserve 16-bit depending on how it was loaded
pass
else:
# Convert other modes while trying to preserve bit depth
if 'A' in img.mode:
img = img.convert('RGBA')
else:
img = img.convert('RGB')
2025-12-11 14:46:02 -07:00
else:
2025-12-11 14:56:14 -07:00
# Force or use 8-bit
if force_bitdepth == '8':
# Force 8-bit: ensure we're in 8-bit mode
if img.mode == 'I':
# Convert 16-bit grayscale to 8-bit
img = img.convert('L')
elif img.mode == 'RGBA':
# Keep RGBA for transparency support (8-bit)
img = img.convert('RGBA')
elif img.mode == 'RGB':
# Already 8-bit RGB
pass
else:
# Convert to 8-bit RGB
if 'A' in img.mode:
img = img.convert('RGBA')
else:
img = img.convert('RGB')
else:
# 8-bit images: convert as needed
if img.mode == 'RGBA':
# Keep RGBA for transparency support
img = img.convert('RGBA')
elif img.mode != 'RGB':
img = img.convert('RGB')
2025-12-11 14:46:02 -07:00
# Save with maximum compression
# PIL will preserve 16-bit for 'I' mode automatically
# For color 16-bit, PIL may convert to 8-bit, but we've preserved the mode
2025-12-11 12:12:10 -07:00
img.save(str(output_path), 'PNG', optimize=True, compress_level=9)
new_size = os.path.getsize(output_path)
savings = original_size - new_size
savings_pct = (savings / original_size * 100) if original_size > 0 else 0
return (str(input_path), True, None, original_size, new_size, savings_pct, False)
except Exception as e:
return (str(input_path), False, str(e), 0, 0, 0, False)
def find_image_files(input_dir):
"""Find all PNG and JPG files in subdirectories."""
png_files = []
jpg_files = []
root = Path(input_dir)
for img_file in root.rglob('*'):
if img_file.suffix.lower() == '.png':
png_files.append(img_file)
elif img_file.suffix.lower() in ['.jpg', '.jpeg']:
jpg_files.append(img_file)
return png_files, jpg_files
def get_output_path(input_path, input_dir, output_dir):
"""Convert input path to output path preserving directory structure."""
input_path = Path(input_path)
input_dir = Path(input_dir)
output_dir = Path(output_dir)
# Get relative path from input directory
try:
relative_path = input_path.relative_to(input_dir)
except ValueError:
# If input_path is not relative to input_dir, use the full path
relative_path = input_path
# Create output path
return output_dir / relative_path
def format_size(size_bytes):
"""Format file size in human readable format."""
for unit in ['B', 'KB', 'MB', 'GB']:
if size_bytes < 1024.0:
return f"{size_bytes:.2f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.2f} TB"
2025-12-11 14:51:58 -07:00
def format_time(seconds):
"""Format time as HH:MM:SS:MsMs (hours:minutes:seconds:centiseconds)."""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
centiseconds = int((seconds % 1) * 100)
return f"{hours:02d}:{minutes:02d}:{secs:02d}:{centiseconds:02d}"
2025-12-11 12:12:10 -07:00
def main():
2025-12-11 14:56:14 -07:00
# Parse command-line arguments
parser = argparse.ArgumentParser(
description='Multithreaded PNG compression script with maximum parallelism.'
)
parser.add_argument(
'--8bit', '-8',
action='store_true',
dest='force_8bit',
help='Force 8-bit color depth for all images'
)
parser.add_argument(
'--16bit', '-16',
action='store_true',
dest='force_16bit',
help='Force 16-bit color depth for all images'
)
args = parser.parse_args()
# Determine bit depth setting
if args.force_8bit and args.force_16bit:
print("Error: Cannot specify both --8bit and --16bit. Choose one.")
return
elif args.force_8bit:
force_bitdepth = '8'
print("Mode: Forcing 8-bit color depth")
elif args.force_16bit:
force_bitdepth = '16'
print("Mode: Forcing 16-bit color depth")
else:
force_bitdepth = None
print("Mode: Auto-detect bit depth (preserve 16-bit if present)")
2025-12-11 12:12:10 -07:00
input_dir = Path('input')
output_dir = Path('output')
# Check if input directory exists
if not input_dir.exists():
print(f"Error: Input directory '{input_dir}' does not exist.")
print("Please create an 'input' folder and place your PNG files there.")
return
print(f"Input directory: {input_dir}")
print(f"Output directory: {output_dir}")
print("Scanning for image files...")
png_files, jpg_files = find_image_files(input_dir)
if jpg_files:
print(f"Found {len(jpg_files)} JPG/JPEG files - ignoring (skipping)")
if not png_files:
print("No PNG files found in input directory.")
return
print(f"Found {len(png_files)} PNG files to process.")
# Create output directory
output_dir.mkdir(exist_ok=True)
# Use all available CPU cores
max_workers = multiprocessing.cpu_count()
print(f"Using {max_workers} worker processes for compression...")
print("-" * 80)
compressed = 0
skipped = 0
failed = 0
total_original_size = 0
total_new_size = 0
start_time = time.time()
last_update_time = start_time
with ProcessPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
future_to_file = {
2025-12-11 14:56:14 -07:00
executor.submit(compress_png, str(f), str(get_output_path(f, input_dir, output_dir)), force_bitdepth): f
2025-12-11 12:12:10 -07:00
for f in png_files
}
# Process results as they complete
for future in as_completed(future_to_file):
result = future.result()
file_path, success, error, orig_size, new_size, savings_pct, was_skipped = result
if success:
if was_skipped:
skipped += 1
else:
compressed += 1
total_original_size += orig_size
total_new_size += new_size
current_time = time.time()
elapsed = current_time - start_time
time_since_update = current_time - last_update_time
processed = compressed + skipped
# Update every file or every 0.5 seconds, whichever comes first
if processed == 1 or time_since_update >= 0.5:
rate = processed / elapsed if elapsed > 0 else 0
remaining = len(png_files) - processed
2025-12-11 14:51:58 -07:00
eta_seconds = remaining / rate if rate > 0 and remaining > 0 else 0
2025-12-11 12:12:10 -07:00
total_savings = total_original_size - total_new_size
total_savings_pct = (total_savings / total_original_size * 100) if total_original_size > 0 else 0
2025-12-11 14:51:58 -07:00
eta_str = format_time(eta_seconds) if eta_seconds > 0 else "calculating..."
elapsed_str = format_time(elapsed)
2025-12-11 12:12:10 -07:00
print(f"[{processed:5d}/{len(png_files)}] "
f"Compressed: {compressed} | Skipped: {skipped} | "
f"Speed: {rate:.1f} files/sec | "
2025-12-11 14:51:58 -07:00
f"Elapsed: {elapsed_str} | ETA: {eta_str} | "
f"Saved: {format_size(total_savings)} ({total_savings_pct:.1f}%)", end='\r')
2025-12-11 12:12:10 -07:00
last_update_time = current_time
else:
failed += 1
print(f"\n[ERROR] Failed to compress {file_path}: {error}")
total_time = time.time() - start_time
total_savings = total_original_size - total_new_size
total_savings_pct = (total_savings / total_original_size * 100) if total_original_size > 0 else 0
processed = compressed + skipped
avg_rate = processed / total_time if total_time > 0 else 0
print("\n" + "=" * 80)
print(f"Compression complete!")
print(f"Successfully compressed: {compressed} files")
print(f"Skipped (already exist): {skipped} files")
if jpg_files:
print(f"Ignored (JPG/JPEG): {len(jpg_files)} files")
print(f"Failed: {failed} files")
2025-12-11 14:51:58 -07:00
print(f"Total time: {format_time(total_time)}")
2025-12-11 12:12:10 -07:00
print(f"Average speed: {avg_rate:.2f} files/second")
print(f"Original size: {format_size(total_original_size)}")
print(f"Compressed size: {format_size(total_new_size)}")
print(f"Total savings: {format_size(total_savings)} ({total_savings_pct:.1f}%)")
if __name__ == '__main__':
main()