Files
SequenceCompressor/compress_pngs.py
2025-12-11 15:32:18 -07:00

429 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Multithreaded PNG compression script.
Compresses all PNG files in subdirectories with maximum parallelism.
"""
import os
import sys
import argparse
from pathlib import Path
from concurrent.futures import ProcessPoolExecutor, as_completed
from PIL import Image
import multiprocessing
import time
def compress_png(input_path, output_path, force_bitdepth=None):
"""Compress a single PNG file.
Args:
input_path: Path to input image
output_path: Path to output image
force_bitdepth: None (auto-detect), '8' (force 8-bit), or '16' (force 16-bit)
"""
try:
input_path = Path(input_path)
output_path = Path(output_path)
# Skip if output file already exists
if output_path.exists():
original_size = os.path.getsize(input_path)
new_size = os.path.getsize(output_path)
savings = original_size - new_size
savings_pct = (savings / original_size * 100) if original_size > 0 else 0
return (str(input_path), True, None, original_size, new_size, savings_pct, True, False)
# Ensure output directory exists
output_path.parent.mkdir(parents=True, exist_ok=True)
original_size = os.path.getsize(input_path)
# Check for corrupted/empty files
if original_size == 0:
return (str(input_path), False, "CORRUPTED: File is 0 bytes (empty/placeholder)", 0, 0, 0, False, True)
# Try to open and validate the image
try:
img = Image.open(input_path)
# Force load to detect corruption early
img.load()
except Exception as e:
return (str(input_path), False, f"CORRUPTED: Cannot open/load image - {str(e)}", original_size, 0, 0, False, True)
# Validate image dimensions
if img.width == 0 or img.height == 0:
return (str(input_path), False, f"CORRUPTED: Invalid dimensions ({img.width}x{img.height})", original_size, 0, 0, False, True)
# Check if image appears completely black (potential corruption indicator)
# But be careful - some images might legitimately be black
# We'll only flag this if the file is suspiciously small for its dimensions
try:
# Sample a few pixels to check if all are black
sample_size = min(100, img.width * img.height)
pixels = list(img.getdata()[:sample_size])
if pixels:
# Check if all sampled pixels are black (0 or (0,0,0) or (0,0,0,0))
all_black = True
for pixel in pixels:
if isinstance(pixel, (tuple, list)):
if any(p > 0 for p in pixel):
all_black = False
break
else:
if pixel > 0:
all_black = False
break
# If all black AND file is suspiciously small, flag as potentially corrupted
if all_black and original_size < (img.width * img.height * 0.1): # Less than 0.1 bytes per pixel
return (str(input_path), False, f"CORRUPTED: Image appears all black with suspiciously small file size", original_size, 0, 0, False, True)
except:
# If we can't check pixels, continue anyway
pass
# Determine target bit depth
if force_bitdepth == '8':
is_16bit = False
elif force_bitdepth == '16':
is_16bit = True
else:
# Auto-detect bit depth
is_16bit = False
original_mode = img.mode
if img.mode == 'I':
# 'I' mode typically represents 16-bit grayscale
is_16bit = True
elif img.mode in ('RGB', 'RGBA', 'LA'):
# Check if max pixel value exceeds 8-bit range
try:
# Get a sample of pixels to check
pixels = list(img.getdata())
if pixels:
# Flatten if needed (for multi-channel modes)
if isinstance(pixels[0], (tuple, list)):
max_val = max(max(p) for p in pixels[:1000]) # Sample first 1000 pixels
else:
max_val = max(pixels[:1000])
if max_val > 255:
is_16bit = True
except:
# If we can't determine, assume 8-bit to be safe
pass
# Handle bit depth conversion based on target
if is_16bit:
# Force or preserve 16-bit
if force_bitdepth == '16':
# Force 16-bit: convert to appropriate 16-bit mode
if img.mode == 'I':
# Already 16-bit grayscale
pass
elif 'A' in img.mode or img.mode == 'LA':
# Convert to 16-bit RGBA (PIL limitation: may not fully preserve 16-bit color)
img = img.convert('RGBA')
else:
# Convert to 16-bit RGB
img = img.convert('RGB')
else:
# Preserve existing 16-bit
if img.mode == 'I':
# Keep 16-bit grayscale
pass
elif img.mode in ('RGB', 'RGBA'):
# Keep color mode - PIL may preserve 16-bit depending on how it was loaded
pass
else:
# Convert other modes while trying to preserve bit depth
if 'A' in img.mode:
img = img.convert('RGBA')
else:
img = img.convert('RGB')
else:
# Force or use 8-bit
if force_bitdepth == '8':
# Force 8-bit: ensure we're in 8-bit mode
if img.mode == 'I':
# Convert 16-bit grayscale to 8-bit
img = img.convert('L')
elif img.mode == 'RGBA':
# Keep RGBA for transparency support (8-bit)
img = img.convert('RGBA')
elif img.mode == 'RGB':
# Already 8-bit RGB
pass
else:
# Convert to 8-bit RGB
if 'A' in img.mode:
img = img.convert('RGBA')
else:
img = img.convert('RGB')
else:
# 8-bit images: convert as needed
if img.mode == 'RGBA':
# Keep RGBA for transparency support
img = img.convert('RGBA')
elif img.mode != 'RGB':
img = img.convert('RGB')
# Save with maximum compression
# PIL will preserve 16-bit for 'I' mode automatically
# For color 16-bit, PIL may convert to 8-bit, but we've preserved the mode
img.save(str(output_path), 'PNG', optimize=True, compress_level=9)
new_size = os.path.getsize(output_path)
savings = original_size - new_size
savings_pct = (savings / original_size * 100) if original_size > 0 else 0
return (str(input_path), True, None, original_size, new_size, savings_pct, False, False)
except Exception as e:
# Check if error might indicate corruption
is_corrupted = "truncated" in str(e).lower() or "cannot identify" in str(e).lower() or "corrupt" in str(e).lower()
return (str(input_path), False, str(e), 0, 0, 0, False, is_corrupted)
def find_image_files(input_dir):
"""Find all PNG and JPG files in subdirectories."""
png_files = []
jpg_files = []
root = Path(input_dir)
for img_file in root.rglob('*'):
if img_file.suffix.lower() == '.png':
png_files.append(img_file)
elif img_file.suffix.lower() in ['.jpg', '.jpeg']:
jpg_files.append(img_file)
return png_files, jpg_files
def get_output_path(input_path, input_dir, output_dir):
"""Convert input path to output path preserving directory structure."""
input_path = Path(input_path)
input_dir = Path(input_dir)
output_dir = Path(output_dir)
# Get relative path from input directory
try:
relative_path = input_path.relative_to(input_dir)
except ValueError:
# If input_path is not relative to input_dir, use the full path
relative_path = input_path
# Create output path
return output_dir / relative_path
def move_to_corrupted(input_path, input_dir, corrupted_dir):
"""Move a corrupted file to the corrupted folder, preserving directory structure."""
try:
input_path = Path(input_path)
input_dir = Path(input_dir)
corrupted_dir = Path(corrupted_dir)
# Get relative path from input directory
try:
relative_path = input_path.relative_to(input_dir)
except ValueError:
relative_path = input_path.name
# Create destination path
dest_path = corrupted_dir / relative_path
# Ensure destination directory exists
dest_path.parent.mkdir(parents=True, exist_ok=True)
# Move the file
input_path.rename(dest_path)
return True, None
except Exception as e:
return False, str(e)
def format_size(size_bytes):
"""Format file size in human readable format."""
for unit in ['B', 'KB', 'MB', 'GB']:
if size_bytes < 1024.0:
return f"{size_bytes:.2f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.2f} TB"
def format_time(seconds):
"""Format time as HH:MM:SS:MsMs (hours:minutes:seconds:centiseconds)."""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
centiseconds = int((seconds % 1) * 100)
return f"{hours:02d}:{minutes:02d}:{secs:02d}:{centiseconds:02d}"
def main():
# Parse command-line arguments
parser = argparse.ArgumentParser(
description='Multithreaded PNG compression script with maximum parallelism.'
)
parser.add_argument(
'--8bit', '-8',
action='store_true',
dest='force_8bit',
help='Force 8-bit color depth for all images'
)
parser.add_argument(
'--16bit', '-16',
action='store_true',
dest='force_16bit',
help='Force 16-bit color depth for all images'
)
args = parser.parse_args()
# Determine bit depth setting
if args.force_8bit and args.force_16bit:
print("Error: Cannot specify both --8bit and --16bit. Choose one.")
return
elif args.force_8bit:
force_bitdepth = '8'
print("Mode: Forcing 8-bit color depth")
elif args.force_16bit:
force_bitdepth = '16'
print("Mode: Forcing 16-bit color depth")
else:
force_bitdepth = None
print("Mode: Auto-detect bit depth (preserve 16-bit if present)")
input_dir = Path('input')
output_dir = Path('output')
# Check if input directory exists
if not input_dir.exists():
print(f"Error: Input directory '{input_dir}' does not exist.")
print("Please create an 'input' folder and place your PNG files there.")
return
print(f"Input directory: {input_dir}")
print(f"Output directory: {output_dir}")
print("Scanning for image files...")
png_files, jpg_files = find_image_files(input_dir)
if jpg_files:
print(f"Found {len(jpg_files)} JPG/JPEG files - ignoring (skipping)")
if not png_files:
print("No PNG files found in input directory.")
return
print(f"Found {len(png_files)} PNG files to process.")
# Create output and corrupted directories
output_dir.mkdir(exist_ok=True)
corrupted_dir = Path('corrupted')
corrupted_dir.mkdir(exist_ok=True)
print(f"Corrupted files will be moved to: {corrupted_dir}")
# Use all available CPU cores
max_workers = multiprocessing.cpu_count()
print(f"Using {max_workers} worker processes for compression...")
print("-" * 80)
compressed = 0
skipped = len(jpg_files) # Start with JPG files counted as skipped
failed = 0
corrupted = 0
corrupted_files = []
total_original_size = 0
total_new_size = 0
start_time = time.time()
last_update_time = start_time
with ProcessPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
future_to_file = {
executor.submit(compress_png, str(f), str(get_output_path(f, input_dir, output_dir)), force_bitdepth): f
for f in png_files
}
# Process results as they complete
for future in as_completed(future_to_file):
result = future.result()
file_path, success, error, orig_size, new_size, savings_pct, was_skipped, is_corrupted = result
if success:
if was_skipped:
skipped += 1
else:
compressed += 1
total_original_size += orig_size
total_new_size += new_size
current_time = time.time()
elapsed = current_time - start_time
time_since_update = current_time - last_update_time
processed = compressed + skipped
# Update every file or every 0.5 seconds, whichever comes first
if processed == 1 or time_since_update >= 0.5:
rate = processed / elapsed if elapsed > 0 else 0
remaining = len(png_files) - processed - failed
eta_seconds = remaining / rate if rate > 0 and remaining > 0 else 0
total_savings = total_original_size - total_new_size
total_savings_pct = (total_savings / total_original_size * 100) if total_original_size > 0 else 0
eta_str = format_time(eta_seconds) if eta_seconds > 0 else "calculating..."
elapsed_str = format_time(elapsed)
print(f"[{processed:5d}/{len(png_files)}] "
f"Compressed: {compressed} | Skipped: {skipped} | Corrupted: {corrupted} | "
f"Speed: {rate:.1f} files/sec | "
f"Elapsed: {elapsed_str} | ETA: {eta_str} | "
f"Saved: {format_size(total_savings)} ({total_savings_pct:.1f}%)", end='\r')
last_update_time = current_time
else:
if is_corrupted:
corrupted += 1
# Move corrupted file to corrupted folder
move_success, move_error = move_to_corrupted(file_path, input_dir, corrupted_dir)
if move_success:
corrupted_files.append((file_path, error, f"Moved to {corrupted_dir / Path(file_path).relative_to(input_dir)}"))
print(f"\n[CORRUPTED] {file_path}: {error} -> Moved to corrupted folder")
else:
corrupted_files.append((file_path, error, f"Failed to move: {move_error}"))
print(f"\n[CORRUPTED] {file_path}: {error} (Failed to move: {move_error})")
else:
failed += 1
print(f"\n[ERROR] Failed to compress {file_path}: {error}")
total_time = time.time() - start_time
total_savings = total_original_size - total_new_size
total_savings_pct = (total_savings / total_original_size * 100) if total_original_size > 0 else 0
processed = compressed + skipped
avg_rate = processed / total_time if total_time > 0 else 0
print("\n" + "=" * 80)
print(f"Compression complete!")
print(f"Successfully compressed: {compressed} files")
skipped_existing = skipped - len(jpg_files) if skipped >= len(jpg_files) else 0
if jpg_files:
print(f"Skipped: {skipped} files ({skipped_existing} already exist, {len(jpg_files)} JPG/JPEG)")
else:
print(f"Skipped (already exist): {skipped} files")
if corrupted > 0:
print(f"Corrupted (bad PNGs): {corrupted} files")
print(f"Failed: {failed} files")
print(f"Total time: {format_time(total_time)}")
print(f"Average speed: {avg_rate:.2f} files/second")
print(f"Original size: {format_size(total_original_size)}")
print(f"Compressed size: {format_size(total_new_size)}")
print(f"Total savings: {format_size(total_savings)} ({total_savings_pct:.1f}%)")
# Print list of corrupted files if any
if corrupted_files:
print("\n" + "=" * 80)
print("CORRUPTED FILES LIST:")
print("=" * 80)
for item in corrupted_files:
if len(item) == 3:
file_path, error, move_status = item
print(f" {file_path}")
print(f" Reason: {error}")
print(f" Status: {move_status}")
else:
file_path, error = item
print(f" {file_path}")
print(f" Reason: {error}")
if __name__ == '__main__':
main()