448 lines
18 KiB
Python
448 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Multithreaded PNG compression script.
|
|
Compresses all PNG files in subdirectories with maximum parallelism.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import argparse
|
|
import platform
|
|
from pathlib import Path
|
|
from concurrent.futures import ProcessPoolExecutor, as_completed
|
|
from PIL import Image
|
|
import multiprocessing
|
|
import time
|
|
|
|
# Try to unlock ProcessPoolExecutor on Windows to bypass 61-worker limit
|
|
try:
|
|
import unlock_processpool
|
|
unlock_processpool.please()
|
|
UNLOCKED = True
|
|
except ImportError:
|
|
UNLOCKED = False
|
|
|
|
def compress_png(input_path, output_path, force_bitdepth=None):
|
|
"""Compress a single PNG file.
|
|
|
|
Args:
|
|
input_path: Path to input image
|
|
output_path: Path to output image
|
|
force_bitdepth: None (auto-detect), '8' (force 8-bit), or '16' (force 16-bit)
|
|
"""
|
|
try:
|
|
input_path = Path(input_path)
|
|
output_path = Path(output_path)
|
|
|
|
# Skip if output file already exists
|
|
if output_path.exists():
|
|
original_size = os.path.getsize(input_path)
|
|
new_size = os.path.getsize(output_path)
|
|
savings = original_size - new_size
|
|
savings_pct = (savings / original_size * 100) if original_size > 0 else 0
|
|
return (str(input_path), True, None, original_size, new_size, savings_pct, True, False)
|
|
|
|
# Ensure output directory exists
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
original_size = os.path.getsize(input_path)
|
|
|
|
# Check for corrupted/empty files
|
|
if original_size == 0:
|
|
return (str(input_path), False, "CORRUPTED: File is 0 bytes (empty/placeholder)", 0, 0, 0, False, True)
|
|
|
|
# Try to open and validate the image
|
|
try:
|
|
img = Image.open(input_path)
|
|
# Force load to detect corruption early
|
|
img.load()
|
|
except Exception as e:
|
|
return (str(input_path), False, f"CORRUPTED: Cannot open/load image - {str(e)}", original_size, 0, 0, False, True)
|
|
|
|
# Validate image dimensions
|
|
if img.width == 0 or img.height == 0:
|
|
return (str(input_path), False, f"CORRUPTED: Invalid dimensions ({img.width}x{img.height})", original_size, 0, 0, False, True)
|
|
|
|
# Check if image appears completely black (potential corruption indicator)
|
|
# But be careful - some images might legitimately be black
|
|
# We'll only flag this if the file is suspiciously small for its dimensions
|
|
try:
|
|
# Sample a few pixels to check if all are black
|
|
sample_size = min(100, img.width * img.height)
|
|
pixels = list(img.getdata()[:sample_size])
|
|
if pixels:
|
|
# Check if all sampled pixels are black (0 or (0,0,0) or (0,0,0,0))
|
|
all_black = True
|
|
for pixel in pixels:
|
|
if isinstance(pixel, (tuple, list)):
|
|
if any(p > 0 for p in pixel):
|
|
all_black = False
|
|
break
|
|
else:
|
|
if pixel > 0:
|
|
all_black = False
|
|
break
|
|
|
|
# If all black AND file is suspiciously small, flag as potentially corrupted
|
|
if all_black and original_size < (img.width * img.height * 0.1): # Less than 0.1 bytes per pixel
|
|
return (str(input_path), False, f"CORRUPTED: Image appears all black with suspiciously small file size", original_size, 0, 0, False, True)
|
|
except:
|
|
# If we can't check pixels, continue anyway
|
|
pass
|
|
|
|
# Determine target bit depth
|
|
if force_bitdepth == '8':
|
|
is_16bit = False
|
|
elif force_bitdepth == '16':
|
|
is_16bit = True
|
|
else:
|
|
# Auto-detect bit depth
|
|
is_16bit = False
|
|
original_mode = img.mode
|
|
|
|
if img.mode == 'I':
|
|
# 'I' mode typically represents 16-bit grayscale
|
|
is_16bit = True
|
|
elif img.mode in ('RGB', 'RGBA', 'LA'):
|
|
# Check if max pixel value exceeds 8-bit range
|
|
try:
|
|
# Get a sample of pixels to check
|
|
pixels = list(img.getdata())
|
|
if pixels:
|
|
# Flatten if needed (for multi-channel modes)
|
|
if isinstance(pixels[0], (tuple, list)):
|
|
max_val = max(max(p) for p in pixels[:1000]) # Sample first 1000 pixels
|
|
else:
|
|
max_val = max(pixels[:1000])
|
|
if max_val > 255:
|
|
is_16bit = True
|
|
except:
|
|
# If we can't determine, assume 8-bit to be safe
|
|
pass
|
|
|
|
# Handle bit depth conversion based on target
|
|
if is_16bit:
|
|
# Force or preserve 16-bit
|
|
if force_bitdepth == '16':
|
|
# Force 16-bit: convert to appropriate 16-bit mode
|
|
if img.mode == 'I':
|
|
# Already 16-bit grayscale
|
|
pass
|
|
elif 'A' in img.mode or img.mode == 'LA':
|
|
# Convert to 16-bit RGBA (PIL limitation: may not fully preserve 16-bit color)
|
|
img = img.convert('RGBA')
|
|
else:
|
|
# Convert to 16-bit RGB
|
|
img = img.convert('RGB')
|
|
else:
|
|
# Preserve existing 16-bit
|
|
if img.mode == 'I':
|
|
# Keep 16-bit grayscale
|
|
pass
|
|
elif img.mode in ('RGB', 'RGBA'):
|
|
# Keep color mode - PIL may preserve 16-bit depending on how it was loaded
|
|
pass
|
|
else:
|
|
# Convert other modes while trying to preserve bit depth
|
|
if 'A' in img.mode:
|
|
img = img.convert('RGBA')
|
|
else:
|
|
img = img.convert('RGB')
|
|
else:
|
|
# Force or use 8-bit
|
|
if force_bitdepth == '8':
|
|
# Force 8-bit: ensure we're in 8-bit mode
|
|
if img.mode == 'I':
|
|
# Convert 16-bit grayscale to 8-bit
|
|
img = img.convert('L')
|
|
elif img.mode == 'RGBA':
|
|
# Keep RGBA for transparency support (8-bit)
|
|
img = img.convert('RGBA')
|
|
elif img.mode == 'RGB':
|
|
# Already 8-bit RGB
|
|
pass
|
|
else:
|
|
# Convert to 8-bit RGB
|
|
if 'A' in img.mode:
|
|
img = img.convert('RGBA')
|
|
else:
|
|
img = img.convert('RGB')
|
|
else:
|
|
# 8-bit images: convert as needed
|
|
if img.mode == 'RGBA':
|
|
# Keep RGBA for transparency support
|
|
img = img.convert('RGBA')
|
|
elif img.mode != 'RGB':
|
|
img = img.convert('RGB')
|
|
|
|
# Save with maximum compression
|
|
# PIL will preserve 16-bit for 'I' mode automatically
|
|
# For color 16-bit, PIL may convert to 8-bit, but we've preserved the mode
|
|
img.save(str(output_path), 'PNG', optimize=True, compress_level=9)
|
|
new_size = os.path.getsize(output_path)
|
|
savings = original_size - new_size
|
|
savings_pct = (savings / original_size * 100) if original_size > 0 else 0
|
|
return (str(input_path), True, None, original_size, new_size, savings_pct, False, False)
|
|
except Exception as e:
|
|
# Check if error might indicate corruption
|
|
is_corrupted = "truncated" in str(e).lower() or "cannot identify" in str(e).lower() or "corrupt" in str(e).lower()
|
|
return (str(input_path), False, str(e), 0, 0, 0, False, is_corrupted)
|
|
|
|
def find_image_files(input_dir):
|
|
"""Find all PNG and JPG files in subdirectories."""
|
|
png_files = []
|
|
jpg_files = []
|
|
root = Path(input_dir)
|
|
for img_file in root.rglob('*'):
|
|
if img_file.suffix.lower() == '.png':
|
|
png_files.append(img_file)
|
|
elif img_file.suffix.lower() in ['.jpg', '.jpeg']:
|
|
jpg_files.append(img_file)
|
|
return png_files, jpg_files
|
|
|
|
def get_output_path(input_path, input_dir, output_dir):
|
|
"""Convert input path to output path preserving directory structure."""
|
|
input_path = Path(input_path)
|
|
input_dir = Path(input_dir)
|
|
output_dir = Path(output_dir)
|
|
|
|
# Get relative path from input directory
|
|
try:
|
|
relative_path = input_path.relative_to(input_dir)
|
|
except ValueError:
|
|
# If input_path is not relative to input_dir, use the full path
|
|
relative_path = input_path
|
|
|
|
# Create output path
|
|
return output_dir / relative_path
|
|
|
|
def move_to_corrupted(input_path, input_dir, corrupted_dir):
|
|
"""Move a corrupted file to the corrupted folder, preserving directory structure."""
|
|
try:
|
|
input_path = Path(input_path)
|
|
input_dir = Path(input_dir)
|
|
corrupted_dir = Path(corrupted_dir)
|
|
|
|
# Get relative path from input directory
|
|
try:
|
|
relative_path = input_path.relative_to(input_dir)
|
|
except ValueError:
|
|
relative_path = input_path.name
|
|
|
|
# Create destination path
|
|
dest_path = corrupted_dir / relative_path
|
|
|
|
# Ensure destination directory exists
|
|
dest_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Move the file
|
|
input_path.rename(dest_path)
|
|
return True, None
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
def format_size(size_bytes):
|
|
"""Format file size in human readable format."""
|
|
for unit in ['B', 'KB', 'MB', 'GB']:
|
|
if size_bytes < 1024.0:
|
|
return f"{size_bytes:.2f} {unit}"
|
|
size_bytes /= 1024.0
|
|
return f"{size_bytes:.2f} TB"
|
|
|
|
def format_time(seconds):
|
|
"""Format time as HH:MM:SS:MsMs (hours:minutes:seconds:centiseconds)."""
|
|
hours = int(seconds // 3600)
|
|
minutes = int((seconds % 3600) // 60)
|
|
secs = int(seconds % 60)
|
|
centiseconds = int((seconds % 1) * 100)
|
|
return f"{hours:02d}:{minutes:02d}:{secs:02d}:{centiseconds:02d}"
|
|
|
|
def main():
|
|
# Parse command-line arguments
|
|
parser = argparse.ArgumentParser(
|
|
description='Multithreaded PNG compression script with maximum parallelism.'
|
|
)
|
|
parser.add_argument(
|
|
'--8bit', '-8',
|
|
action='store_true',
|
|
dest='force_8bit',
|
|
help='Force 8-bit color depth for all images'
|
|
)
|
|
parser.add_argument(
|
|
'--16bit', '-16',
|
|
action='store_true',
|
|
dest='force_16bit',
|
|
help='Force 16-bit color depth for all images'
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
# Determine bit depth setting
|
|
if args.force_8bit and args.force_16bit:
|
|
print("Error: Cannot specify both --8bit and --16bit. Choose one.")
|
|
return
|
|
elif args.force_8bit:
|
|
force_bitdepth = '8'
|
|
print("Mode: Forcing 8-bit color depth")
|
|
elif args.force_16bit:
|
|
force_bitdepth = '16'
|
|
print("Mode: Forcing 16-bit color depth")
|
|
else:
|
|
force_bitdepth = None
|
|
print("Mode: Auto-detect bit depth (preserve 16-bit if present)")
|
|
|
|
input_dir = Path('input')
|
|
output_dir = Path('output')
|
|
|
|
# Check if input directory exists
|
|
if not input_dir.exists():
|
|
print(f"Error: Input directory '{input_dir}' does not exist.")
|
|
print("Please create an 'input' folder and place your PNG files there.")
|
|
return
|
|
|
|
print(f"Input directory: {input_dir}")
|
|
print(f"Output directory: {output_dir}")
|
|
print("Scanning for image files...")
|
|
png_files, jpg_files = find_image_files(input_dir)
|
|
|
|
if jpg_files:
|
|
print(f"Found {len(jpg_files)} JPG/JPEG files - ignoring (skipping)")
|
|
|
|
if not png_files:
|
|
print("No PNG files found in input directory.")
|
|
return
|
|
|
|
print(f"Found {len(png_files)} PNG files to process.")
|
|
|
|
# Create output and corrupted directories
|
|
output_dir.mkdir(exist_ok=True)
|
|
corrupted_dir = Path('corrupted')
|
|
corrupted_dir.mkdir(exist_ok=True)
|
|
print(f"Corrupted files will be moved to: {corrupted_dir}")
|
|
|
|
# Use all available CPU cores
|
|
cpu_count = multiprocessing.cpu_count()
|
|
if platform.system() == 'Windows' and not UNLOCKED:
|
|
# Windows ProcessPoolExecutor has a maximum of 61 workers (unless unlocked)
|
|
max_workers = min(cpu_count, 61)
|
|
if cpu_count > 61:
|
|
print(f"Detected {cpu_count} CPU threads, but Windows limits ProcessPoolExecutor to 61 workers.")
|
|
print("Install 'unlock-processpool-win' package to use all cores: pip install unlock-processpool-win")
|
|
else:
|
|
max_workers = cpu_count
|
|
if UNLOCKED:
|
|
print(f"Using unlock-processpool-win to bypass Windows 61-worker limit")
|
|
print(f"Using {max_workers} worker processes for compression...")
|
|
print("-" * 80)
|
|
|
|
compressed = 0
|
|
skipped = len(jpg_files) # Start with JPG files counted as skipped
|
|
failed = 0
|
|
corrupted = 0
|
|
corrupted_files = []
|
|
total_original_size = 0
|
|
total_new_size = 0
|
|
start_time = time.time()
|
|
last_update_time = start_time
|
|
|
|
with ProcessPoolExecutor(max_workers=max_workers) as executor:
|
|
# Submit all tasks
|
|
future_to_file = {
|
|
executor.submit(compress_png, str(f), str(get_output_path(f, input_dir, output_dir)), force_bitdepth): f
|
|
for f in png_files
|
|
}
|
|
|
|
# Process results as they complete
|
|
for future in as_completed(future_to_file):
|
|
result = future.result()
|
|
file_path, success, error, orig_size, new_size, savings_pct, was_skipped, is_corrupted = result
|
|
|
|
if success:
|
|
if was_skipped:
|
|
skipped += 1
|
|
else:
|
|
compressed += 1
|
|
total_original_size += orig_size
|
|
total_new_size += new_size
|
|
|
|
current_time = time.time()
|
|
elapsed = current_time - start_time
|
|
time_since_update = current_time - last_update_time
|
|
|
|
processed = compressed + skipped
|
|
|
|
# Update every file or every 0.5 seconds, whichever comes first
|
|
if processed == 1 or time_since_update >= 0.5:
|
|
rate = processed / elapsed if elapsed > 0 else 0
|
|
remaining = len(png_files) - processed - failed
|
|
eta_seconds = remaining / rate if rate > 0 and remaining > 0 else 0
|
|
|
|
total_savings = total_original_size - total_new_size
|
|
total_savings_pct = (total_savings / total_original_size * 100) if total_original_size > 0 else 0
|
|
|
|
eta_str = format_time(eta_seconds) if eta_seconds > 0 else "calculating..."
|
|
elapsed_str = format_time(elapsed)
|
|
|
|
print(f"[{processed:5d}/{len(png_files)}] "
|
|
f"Compressed: {compressed} | Skipped: {skipped} | Corrupted: {corrupted} | "
|
|
f"Speed: {rate:.1f} files/sec | "
|
|
f"Elapsed: {elapsed_str} | ETA: {eta_str} | "
|
|
f"Saved: {format_size(total_savings)} ({total_savings_pct:.1f}%)", end='\r')
|
|
last_update_time = current_time
|
|
else:
|
|
if is_corrupted:
|
|
corrupted += 1
|
|
# Move corrupted file to corrupted folder
|
|
move_success, move_error = move_to_corrupted(file_path, input_dir, corrupted_dir)
|
|
if move_success:
|
|
corrupted_files.append((file_path, error, f"Moved to {corrupted_dir / Path(file_path).relative_to(input_dir)}"))
|
|
print(f"\n[CORRUPTED] {file_path}: {error} -> Moved to corrupted folder")
|
|
else:
|
|
corrupted_files.append((file_path, error, f"Failed to move: {move_error}"))
|
|
print(f"\n[CORRUPTED] {file_path}: {error} (Failed to move: {move_error})")
|
|
else:
|
|
failed += 1
|
|
print(f"\n[ERROR] Failed to compress {file_path}: {error}")
|
|
|
|
total_time = time.time() - start_time
|
|
total_savings = total_original_size - total_new_size
|
|
total_savings_pct = (total_savings / total_original_size * 100) if total_original_size > 0 else 0
|
|
processed = compressed + skipped
|
|
avg_rate = processed / total_time if total_time > 0 else 0
|
|
|
|
print("\n" + "=" * 80)
|
|
print(f"Compression complete!")
|
|
print(f"Successfully compressed: {compressed} files")
|
|
skipped_existing = skipped - len(jpg_files) if skipped >= len(jpg_files) else 0
|
|
if jpg_files:
|
|
print(f"Skipped: {skipped} files ({skipped_existing} already exist, {len(jpg_files)} JPG/JPEG)")
|
|
else:
|
|
print(f"Skipped (already exist): {skipped} files")
|
|
if corrupted > 0:
|
|
print(f"Corrupted (bad PNGs): {corrupted} files")
|
|
print(f"Failed: {failed} files")
|
|
print(f"Total time: {format_time(total_time)}")
|
|
print(f"Average speed: {avg_rate:.2f} files/second")
|
|
print(f"Original size: {format_size(total_original_size)}")
|
|
print(f"Compressed size: {format_size(total_new_size)}")
|
|
print(f"Total savings: {format_size(total_savings)} ({total_savings_pct:.1f}%)")
|
|
|
|
# Print list of corrupted files if any
|
|
if corrupted_files:
|
|
print("\n" + "=" * 80)
|
|
print("CORRUPTED FILES LIST:")
|
|
print("=" * 80)
|
|
for item in corrupted_files:
|
|
if len(item) == 3:
|
|
file_path, error, move_status = item
|
|
print(f" {file_path}")
|
|
print(f" Reason: {error}")
|
|
print(f" Status: {move_status}")
|
|
else:
|
|
file_path, error = item
|
|
print(f" {file_path}")
|
|
print(f" Reason: {error}")
|
|
|
|
if __name__ == '__main__':
|
|
main()
|
|
|