#!/usr/bin/env python3 """ Multithreaded PNG compression script. Compresses all PNG files in subdirectories with maximum parallelism. """ import os import sys from pathlib import Path from concurrent.futures import ProcessPoolExecutor, as_completed from PIL import Image import multiprocessing import time def compress_png(input_path, output_path): """Compress a single PNG file.""" try: input_path = Path(input_path) output_path = Path(output_path) # Skip if output file already exists if output_path.exists(): original_size = os.path.getsize(input_path) new_size = os.path.getsize(output_path) savings = original_size - new_size savings_pct = (savings / original_size * 100) if original_size > 0 else 0 return (str(input_path), True, None, original_size, new_size, savings_pct, True) # Ensure output directory exists output_path.parent.mkdir(parents=True, exist_ok=True) original_size = os.path.getsize(input_path) img = Image.open(input_path) # Check if image is 16-bit # PIL represents 16-bit grayscale as 'I' mode (32-bit integer) # For color images, check pixel values to determine bit depth is_16bit = False original_mode = img.mode if img.mode == 'I': # 'I' mode typically represents 16-bit grayscale is_16bit = True elif img.mode in ('RGB', 'RGBA', 'LA'): # Check if max pixel value exceeds 8-bit range try: # Get a sample of pixels to check pixels = list(img.getdata()) if pixels: # Flatten if needed (for multi-channel modes) if isinstance(pixels[0], (tuple, list)): max_val = max(max(p) for p in pixels[:1000]) # Sample first 1000 pixels else: max_val = max(pixels[:1000]) if max_val > 255: is_16bit = True except: # If we can't determine, assume 8-bit to be safe pass # Preserve 16-bit depth if present, otherwise convert as needed if is_16bit: # For 16-bit images, preserve the mode # PIL will save 16-bit when mode is 'I' (grayscale) or when using specific modes if img.mode == 'I': # Keep 16-bit grayscale pass elif img.mode in ('RGB', 'RGBA'): # Keep color mode - PIL may preserve 16-bit depending on how it was loaded # Note: PIL's PNG save may convert 16-bit RGB to 8-bit, but we preserve the mode pass else: # Convert other modes while trying to preserve bit depth if 'A' in img.mode: img = img.convert('RGBA') else: img = img.convert('RGB') else: # 8-bit images: convert as needed if img.mode == 'RGBA': # Keep RGBA for transparency support img = img.convert('RGBA') elif img.mode != 'RGB': img = img.convert('RGB') # Save with maximum compression # PIL will preserve 16-bit for 'I' mode automatically # For color 16-bit, PIL may convert to 8-bit, but we've preserved the mode img.save(str(output_path), 'PNG', optimize=True, compress_level=9) new_size = os.path.getsize(output_path) savings = original_size - new_size savings_pct = (savings / original_size * 100) if original_size > 0 else 0 return (str(input_path), True, None, original_size, new_size, savings_pct, False) except Exception as e: return (str(input_path), False, str(e), 0, 0, 0, False) def find_image_files(input_dir): """Find all PNG and JPG files in subdirectories.""" png_files = [] jpg_files = [] root = Path(input_dir) for img_file in root.rglob('*'): if img_file.suffix.lower() == '.png': png_files.append(img_file) elif img_file.suffix.lower() in ['.jpg', '.jpeg']: jpg_files.append(img_file) return png_files, jpg_files def get_output_path(input_path, input_dir, output_dir): """Convert input path to output path preserving directory structure.""" input_path = Path(input_path) input_dir = Path(input_dir) output_dir = Path(output_dir) # Get relative path from input directory try: relative_path = input_path.relative_to(input_dir) except ValueError: # If input_path is not relative to input_dir, use the full path relative_path = input_path # Create output path return output_dir / relative_path def format_size(size_bytes): """Format file size in human readable format.""" for unit in ['B', 'KB', 'MB', 'GB']: if size_bytes < 1024.0: return f"{size_bytes:.2f} {unit}" size_bytes /= 1024.0 return f"{size_bytes:.2f} TB" def main(): input_dir = Path('input') output_dir = Path('output') # Check if input directory exists if not input_dir.exists(): print(f"Error: Input directory '{input_dir}' does not exist.") print("Please create an 'input' folder and place your PNG files there.") return print(f"Input directory: {input_dir}") print(f"Output directory: {output_dir}") print("Scanning for image files...") png_files, jpg_files = find_image_files(input_dir) if jpg_files: print(f"Found {len(jpg_files)} JPG/JPEG files - ignoring (skipping)") if not png_files: print("No PNG files found in input directory.") return print(f"Found {len(png_files)} PNG files to process.") # Create output directory output_dir.mkdir(exist_ok=True) # Use all available CPU cores max_workers = multiprocessing.cpu_count() print(f"Using {max_workers} worker processes for compression...") print("-" * 80) compressed = 0 skipped = 0 failed = 0 total_original_size = 0 total_new_size = 0 start_time = time.time() last_update_time = start_time with ProcessPoolExecutor(max_workers=max_workers) as executor: # Submit all tasks future_to_file = { executor.submit(compress_png, str(f), str(get_output_path(f, input_dir, output_dir))): f for f in png_files } # Process results as they complete for future in as_completed(future_to_file): result = future.result() file_path, success, error, orig_size, new_size, savings_pct, was_skipped = result if success: if was_skipped: skipped += 1 else: compressed += 1 total_original_size += orig_size total_new_size += new_size current_time = time.time() elapsed = current_time - start_time time_since_update = current_time - last_update_time processed = compressed + skipped # Update every file or every 0.5 seconds, whichever comes first if processed == 1 or time_since_update >= 0.5: rate = processed / elapsed if elapsed > 0 else 0 remaining = len(png_files) - processed eta = remaining / rate if rate > 0 else 0 total_savings = total_original_size - total_new_size total_savings_pct = (total_savings / total_original_size * 100) if total_original_size > 0 else 0 print(f"[{processed:5d}/{len(png_files)}] " f"Compressed: {compressed} | Skipped: {skipped} | " f"Speed: {rate:.1f} files/sec | " f"ETA: {eta:.1f}s | " f"Saved: {format_size(total_savings)} ({total_savings_pct:.1f}%) | " f"Elapsed: {elapsed:.1f}s", end='\r') last_update_time = current_time else: failed += 1 print(f"\n[ERROR] Failed to compress {file_path}: {error}") total_time = time.time() - start_time total_savings = total_original_size - total_new_size total_savings_pct = (total_savings / total_original_size * 100) if total_original_size > 0 else 0 processed = compressed + skipped avg_rate = processed / total_time if total_time > 0 else 0 print("\n" + "=" * 80) print(f"Compression complete!") print(f"Successfully compressed: {compressed} files") print(f"Skipped (already exist): {skipped} files") if jpg_files: print(f"Ignored (JPG/JPEG): {len(jpg_files)} files") print(f"Failed: {failed} files") print(f"Total time: {total_time:.2f} seconds") print(f"Average speed: {avg_rate:.2f} files/second") print(f"Original size: {format_size(total_original_size)}") print(f"Compressed size: {format_size(total_new_size)}") print(f"Total savings: {format_size(total_savings)} ({total_savings_pct:.1f}%)") if __name__ == '__main__': main()