#!/usr/bin/env python3 """ Multithreaded PNG compression script. Compresses all PNG files in subdirectories with maximum parallelism. """ import os import sys import argparse from pathlib import Path from concurrent.futures import ProcessPoolExecutor, as_completed from PIL import Image import multiprocessing import time def compress_png(input_path, output_path, force_bitdepth=None): """Compress a single PNG file. Args: input_path: Path to input image output_path: Path to output image force_bitdepth: None (auto-detect), '8' (force 8-bit), or '16' (force 16-bit) """ try: input_path = Path(input_path) output_path = Path(output_path) # Skip if output file already exists if output_path.exists(): original_size = os.path.getsize(input_path) new_size = os.path.getsize(output_path) savings = original_size - new_size savings_pct = (savings / original_size * 100) if original_size > 0 else 0 return (str(input_path), True, None, original_size, new_size, savings_pct, True, False) # Ensure output directory exists output_path.parent.mkdir(parents=True, exist_ok=True) original_size = os.path.getsize(input_path) # Check for corrupted/empty files if original_size == 0: return (str(input_path), False, "CORRUPTED: File is 0 bytes (empty/placeholder)", 0, 0, 0, False, True) # Try to open and validate the image try: img = Image.open(input_path) # Force load to detect corruption early img.load() except Exception as e: return (str(input_path), False, f"CORRUPTED: Cannot open/load image - {str(e)}", original_size, 0, 0, False, True) # Validate image dimensions if img.width == 0 or img.height == 0: return (str(input_path), False, f"CORRUPTED: Invalid dimensions ({img.width}x{img.height})", original_size, 0, 0, False, True) # Check if image appears completely black (potential corruption indicator) # But be careful - some images might legitimately be black # We'll only flag this if the file is suspiciously small for its dimensions try: # Sample a few pixels to check if all are black sample_size = min(100, img.width * img.height) pixels = list(img.getdata()[:sample_size]) if pixels: # Check if all sampled pixels are black (0 or (0,0,0) or (0,0,0,0)) all_black = True for pixel in pixels: if isinstance(pixel, (tuple, list)): if any(p > 0 for p in pixel): all_black = False break else: if pixel > 0: all_black = False break # If all black AND file is suspiciously small, flag as potentially corrupted if all_black and original_size < (img.width * img.height * 0.1): # Less than 0.1 bytes per pixel return (str(input_path), False, f"CORRUPTED: Image appears all black with suspiciously small file size", original_size, 0, 0, False, True) except: # If we can't check pixels, continue anyway pass # Determine target bit depth if force_bitdepth == '8': is_16bit = False elif force_bitdepth == '16': is_16bit = True else: # Auto-detect bit depth is_16bit = False original_mode = img.mode if img.mode == 'I': # 'I' mode typically represents 16-bit grayscale is_16bit = True elif img.mode in ('RGB', 'RGBA', 'LA'): # Check if max pixel value exceeds 8-bit range try: # Get a sample of pixels to check pixels = list(img.getdata()) if pixels: # Flatten if needed (for multi-channel modes) if isinstance(pixels[0], (tuple, list)): max_val = max(max(p) for p in pixels[:1000]) # Sample first 1000 pixels else: max_val = max(pixels[:1000]) if max_val > 255: is_16bit = True except: # If we can't determine, assume 8-bit to be safe pass # Handle bit depth conversion based on target if is_16bit: # Force or preserve 16-bit if force_bitdepth == '16': # Force 16-bit: convert to appropriate 16-bit mode if img.mode == 'I': # Already 16-bit grayscale pass elif 'A' in img.mode or img.mode == 'LA': # Convert to 16-bit RGBA (PIL limitation: may not fully preserve 16-bit color) img = img.convert('RGBA') else: # Convert to 16-bit RGB img = img.convert('RGB') else: # Preserve existing 16-bit if img.mode == 'I': # Keep 16-bit grayscale pass elif img.mode in ('RGB', 'RGBA'): # Keep color mode - PIL may preserve 16-bit depending on how it was loaded pass else: # Convert other modes while trying to preserve bit depth if 'A' in img.mode: img = img.convert('RGBA') else: img = img.convert('RGB') else: # Force or use 8-bit if force_bitdepth == '8': # Force 8-bit: ensure we're in 8-bit mode if img.mode == 'I': # Convert 16-bit grayscale to 8-bit img = img.convert('L') elif img.mode == 'RGBA': # Keep RGBA for transparency support (8-bit) img = img.convert('RGBA') elif img.mode == 'RGB': # Already 8-bit RGB pass else: # Convert to 8-bit RGB if 'A' in img.mode: img = img.convert('RGBA') else: img = img.convert('RGB') else: # 8-bit images: convert as needed if img.mode == 'RGBA': # Keep RGBA for transparency support img = img.convert('RGBA') elif img.mode != 'RGB': img = img.convert('RGB') # Save with maximum compression # PIL will preserve 16-bit for 'I' mode automatically # For color 16-bit, PIL may convert to 8-bit, but we've preserved the mode img.save(str(output_path), 'PNG', optimize=True, compress_level=9) new_size = os.path.getsize(output_path) savings = original_size - new_size savings_pct = (savings / original_size * 100) if original_size > 0 else 0 return (str(input_path), True, None, original_size, new_size, savings_pct, False, False) except Exception as e: # Check if error might indicate corruption is_corrupted = "truncated" in str(e).lower() or "cannot identify" in str(e).lower() or "corrupt" in str(e).lower() return (str(input_path), False, str(e), 0, 0, 0, False, is_corrupted) def find_image_files(input_dir): """Find all PNG and JPG files in subdirectories.""" png_files = [] jpg_files = [] root = Path(input_dir) for img_file in root.rglob('*'): if img_file.suffix.lower() == '.png': png_files.append(img_file) elif img_file.suffix.lower() in ['.jpg', '.jpeg']: jpg_files.append(img_file) return png_files, jpg_files def get_output_path(input_path, input_dir, output_dir): """Convert input path to output path preserving directory structure.""" input_path = Path(input_path) input_dir = Path(input_dir) output_dir = Path(output_dir) # Get relative path from input directory try: relative_path = input_path.relative_to(input_dir) except ValueError: # If input_path is not relative to input_dir, use the full path relative_path = input_path # Create output path return output_dir / relative_path def move_to_corrupted(input_path, input_dir, corrupted_dir): """Move a corrupted file to the corrupted folder, preserving directory structure.""" try: input_path = Path(input_path) input_dir = Path(input_dir) corrupted_dir = Path(corrupted_dir) # Get relative path from input directory try: relative_path = input_path.relative_to(input_dir) except ValueError: relative_path = input_path.name # Create destination path dest_path = corrupted_dir / relative_path # Ensure destination directory exists dest_path.parent.mkdir(parents=True, exist_ok=True) # Move the file input_path.rename(dest_path) return True, None except Exception as e: return False, str(e) def format_size(size_bytes): """Format file size in human readable format.""" for unit in ['B', 'KB', 'MB', 'GB']: if size_bytes < 1024.0: return f"{size_bytes:.2f} {unit}" size_bytes /= 1024.0 return f"{size_bytes:.2f} TB" def format_time(seconds): """Format time as HH:MM:SS:MsMs (hours:minutes:seconds:centiseconds).""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = int(seconds % 60) centiseconds = int((seconds % 1) * 100) return f"{hours:02d}:{minutes:02d}:{secs:02d}:{centiseconds:02d}" def main(): # Parse command-line arguments parser = argparse.ArgumentParser( description='Multithreaded PNG compression script with maximum parallelism.' ) parser.add_argument( '--8bit', '-8', action='store_true', dest='force_8bit', help='Force 8-bit color depth for all images' ) parser.add_argument( '--16bit', '-16', action='store_true', dest='force_16bit', help='Force 16-bit color depth for all images' ) args = parser.parse_args() # Determine bit depth setting if args.force_8bit and args.force_16bit: print("Error: Cannot specify both --8bit and --16bit. Choose one.") return elif args.force_8bit: force_bitdepth = '8' print("Mode: Forcing 8-bit color depth") elif args.force_16bit: force_bitdepth = '16' print("Mode: Forcing 16-bit color depth") else: force_bitdepth = None print("Mode: Auto-detect bit depth (preserve 16-bit if present)") input_dir = Path('input') output_dir = Path('output') # Check if input directory exists if not input_dir.exists(): print(f"Error: Input directory '{input_dir}' does not exist.") print("Please create an 'input' folder and place your PNG files there.") return print(f"Input directory: {input_dir}") print(f"Output directory: {output_dir}") print("Scanning for image files...") png_files, jpg_files = find_image_files(input_dir) if jpg_files: print(f"Found {len(jpg_files)} JPG/JPEG files - ignoring (skipping)") if not png_files: print("No PNG files found in input directory.") return print(f"Found {len(png_files)} PNG files to process.") # Create output and corrupted directories output_dir.mkdir(exist_ok=True) corrupted_dir = Path('corrupted') corrupted_dir.mkdir(exist_ok=True) print(f"Corrupted files will be moved to: {corrupted_dir}") # Use all available CPU cores max_workers = multiprocessing.cpu_count() print(f"Using {max_workers} worker processes for compression...") print("-" * 80) compressed = 0 skipped = len(jpg_files) # Start with JPG files counted as skipped failed = 0 corrupted = 0 corrupted_files = [] total_original_size = 0 total_new_size = 0 start_time = time.time() last_update_time = start_time with ProcessPoolExecutor(max_workers=max_workers) as executor: # Submit all tasks future_to_file = { executor.submit(compress_png, str(f), str(get_output_path(f, input_dir, output_dir)), force_bitdepth): f for f in png_files } # Process results as they complete for future in as_completed(future_to_file): result = future.result() file_path, success, error, orig_size, new_size, savings_pct, was_skipped, is_corrupted = result if success: if was_skipped: skipped += 1 else: compressed += 1 total_original_size += orig_size total_new_size += new_size current_time = time.time() elapsed = current_time - start_time time_since_update = current_time - last_update_time processed = compressed + skipped # Update every file or every 0.5 seconds, whichever comes first if processed == 1 or time_since_update >= 0.5: rate = processed / elapsed if elapsed > 0 else 0 remaining = len(png_files) - processed - failed eta_seconds = remaining / rate if rate > 0 and remaining > 0 else 0 total_savings = total_original_size - total_new_size total_savings_pct = (total_savings / total_original_size * 100) if total_original_size > 0 else 0 eta_str = format_time(eta_seconds) if eta_seconds > 0 else "calculating..." elapsed_str = format_time(elapsed) print(f"[{processed:5d}/{len(png_files)}] " f"Compressed: {compressed} | Skipped: {skipped} | Corrupted: {corrupted} | " f"Speed: {rate:.1f} files/sec | " f"Elapsed: {elapsed_str} | ETA: {eta_str} | " f"Saved: {format_size(total_savings)} ({total_savings_pct:.1f}%)", end='\r') last_update_time = current_time else: if is_corrupted: corrupted += 1 # Move corrupted file to corrupted folder move_success, move_error = move_to_corrupted(file_path, input_dir, corrupted_dir) if move_success: corrupted_files.append((file_path, error, f"Moved to {corrupted_dir / Path(file_path).relative_to(input_dir)}")) print(f"\n[CORRUPTED] {file_path}: {error} -> Moved to corrupted folder") else: corrupted_files.append((file_path, error, f"Failed to move: {move_error}")) print(f"\n[CORRUPTED] {file_path}: {error} (Failed to move: {move_error})") else: failed += 1 print(f"\n[ERROR] Failed to compress {file_path}: {error}") total_time = time.time() - start_time total_savings = total_original_size - total_new_size total_savings_pct = (total_savings / total_original_size * 100) if total_original_size > 0 else 0 processed = compressed + skipped avg_rate = processed / total_time if total_time > 0 else 0 print("\n" + "=" * 80) print(f"Compression complete!") print(f"Successfully compressed: {compressed} files") skipped_existing = skipped - len(jpg_files) if skipped >= len(jpg_files) else 0 if jpg_files: print(f"Skipped: {skipped} files ({skipped_existing} already exist, {len(jpg_files)} JPG/JPEG)") else: print(f"Skipped (already exist): {skipped} files") if corrupted > 0: print(f"Corrupted (bad PNGs): {corrupted} files") print(f"Failed: {failed} files") print(f"Total time: {format_time(total_time)}") print(f"Average speed: {avg_rate:.2f} files/second") print(f"Original size: {format_size(total_original_size)}") print(f"Compressed size: {format_size(total_new_size)}") print(f"Total savings: {format_size(total_savings)} ({total_savings_pct:.1f}%)") # Print list of corrupted files if any if corrupted_files: print("\n" + "=" * 80) print("CORRUPTED FILES LIST:") print("=" * 80) for item in corrupted_files: if len(item) == 3: file_path, error, move_status = item print(f" {file_path}") print(f" Reason: {error}") print(f" Status: {move_status}") else: file_path, error = item print(f" {file_path}") print(f" Reason: {error}") if __name__ == '__main__': main()