#!/usr/bin/env python3 """Compress PNGs in unchanged render sequences. Writes to staging, prompts for Y/N, then overwrites originals.""" from __future__ import annotations import argparse import json import multiprocessing import os import platform import shutil import sys import time from concurrent.futures import ProcessPoolExecutor, as_completed from pathlib import Path try: from PIL import Image except ImportError: print("Error: Pillow (PIL) is required. Install with: pip install Pillow", file=sys.stderr) sys.exit(1) try: import unlock_processpool unlock_processpool.please() UNLOCKED = True except ImportError: UNLOCKED = False RENDER_ROOT = Path("Renders") ARCHIVE_ROOT = RENDER_ROOT / "_zipped" STAGING_DIR = "_compressed_staging" CORRUPTED_DIR = "corrupted" STATE_SUFFIX = ".meta.json" PNG_EXT = ".png" def is_archive_path(path: Path) -> bool: return any(part in ("_archive", "_CURRENT", "_zipped", STAGING_DIR, CORRUPTED_DIR) for part in path.parts) def find_sequence_dirs(root: Path) -> list[Path]: seen = set() out = [] for dirpath, dirnames, filenames in os.walk(root): path = Path(dirpath) dirnames[:] = [d for d in dirnames if d not in ("_archive", "_CURRENT", STAGING_DIR, CORRUPTED_DIR, "_zipped")] if is_archive_path(path): continue if any(Path(dirpath, f).suffix.lower() == PNG_EXT for f in filenames): resolved = path.resolve() if resolved not in seen: seen.add(resolved) out.append(path) return out def iter_png_files(seq_dir: Path): for dirpath, dirnames, filenames in os.walk(seq_dir): path = Path(dirpath) dirnames[:] = [d for d in dirnames if d not in ("_archive", "_CURRENT", "_zipped", STAGING_DIR, CORRUPTED_DIR)] if is_archive_path(path): continue for f in filenames: if Path(f).suffix.lower() == PNG_EXT: yield path / f def load_state(state_path: Path) -> dict | None: if not state_path.exists(): return None try: state = json.loads(state_path.read_text()) if "files" in state: state["files"] = [e for e in state.get("files", []) if Path(e.get("path", "")).name.lower() != "thumbs.db"] if platform.system() == "Windows" and "files" in state: for e in state.get("files", []): if "mtime_ns" in e: e["mtime_ns"] = (e["mtime_ns"] // 100) * 100 return state except json.JSONDecodeError: return None def state_changed(seq_state: dict, stored_state: dict | None) -> bool: if stored_state is None: return True is_windows = platform.system() == "Windows" def png_only(files): return [e for e in files if Path(e.get("path", "")).suffix.lower() == ".png"] def norm(s): out = [] for e in png_only(s.get("files", [])): if Path(e.get("path", "")).name.lower() == "thumbs.db": continue m = e.get("mtime_ns", 0) if is_windows: m = (m // 100) * 100 out.append({"path": e["path"], "size": e["size"], "mtime_ns": m}) return {"files": out} return norm(seq_state) != norm(stored_state) def compute_state(seq_dir: Path) -> dict: entries = [] is_windows = platform.system() == "Windows" for p in sorted(iter_png_files(seq_dir), key=lambda x: x.relative_to(seq_dir).as_posix()): stat = p.stat() mtime_ns = stat.st_mtime_ns if is_windows: mtime_ns = (mtime_ns // 100) * 100 entries.append({"path": p.relative_to(seq_dir).as_posix(), "size": stat.st_size, "mtime_ns": mtime_ns}) return {"files": entries} def find_archive_and_state(seq_dir: Path) -> tuple[Path | None, Path | None]: """Return (archive_path, state_path) if sequence is archived and unchanged; else (None, None).""" try: rel = seq_dir.relative_to(RENDER_ROOT) except ValueError: return None, None for suffix in (".7z", ".zip"): archive = ARCHIVE_ROOT / f"{rel}{suffix}" if archive.exists(): state = archive.with_suffix(archive.suffix + STATE_SUFFIX) return archive, state return None, None def compress_png(input_path: Path, output_path: Path, force_bitdepth: str | None) -> tuple[str, bool, str | None, int, int, float, bool, bool]: """Returns (path, success, error, orig_size, new_size, savings_pct, was_skipped, is_corrupted).""" try: output_path = Path(output_path) output_path.parent.mkdir(parents=True, exist_ok=True) orig_size = input_path.stat().st_size if orig_size == 0: return (str(input_path), False, "CORRUPTED: File is 0 bytes (empty/placeholder)", 0, 0, 0, False, True) try: img = Image.open(input_path) img.load() except Exception as e: return (str(input_path), False, f"CORRUPTED: Cannot open/load image - {e}", orig_size, 0, 0, False, True) if img.width == 0 or img.height == 0: return (str(input_path), False, f"CORRUPTED: Invalid dimensions ({img.width}x{img.height})", orig_size, 0, 0, False, True) # All-black + suspiciously small (corruption indicator) try: sample_size = min(100, img.width * img.height) pixels = list(img.getdata()[:sample_size]) if pixels: all_black = True for pixel in pixels: if isinstance(pixel, (tuple, list)): if any(p > 0 for p in pixel): all_black = False break else: if pixel > 0: all_black = False break if all_black and orig_size < (img.width * img.height * 0.1): return (str(input_path), False, "CORRUPTED: Image appears all black with suspiciously small file size", orig_size, 0, 0, False, True) except Exception: pass # Determine target bit depth if force_bitdepth == "8": is_16bit = False elif force_bitdepth == "16": is_16bit = True else: is_16bit = img.mode == "I" if not is_16bit and img.mode in ("RGB", "RGBA", "LA"): try: pixels = list(img.getdata()) if pixels: if isinstance(pixels[0], (tuple, list)): max_val = max(max(p) for p in pixels[:1000]) else: max_val = max(pixels[:1000]) if max_val > 255: is_16bit = True except Exception: pass # Handle bit depth conversion (8-bit to 8-bit, 16-bit to 16-bit) if is_16bit: if force_bitdepth == "16": if img.mode != "I": img = img.convert("RGBA" if ("A" in img.mode or img.mode == "LA") else "RGB") else: if img.mode == "I": pass elif img.mode in ("RGB", "RGBA"): pass else: img = img.convert("RGBA" if "A" in img.mode else "RGB") else: if force_bitdepth == "8": if img.mode == "I": img = img.convert("L") elif img.mode == "RGBA": pass elif img.mode == "RGB": pass else: img = img.convert("RGBA" if "A" in img.mode else "RGB") else: if img.mode == "RGBA": pass elif img.mode != "RGB": img = img.convert("RGB") img.save(str(output_path), "PNG", optimize=True, compress_level=9) new_size = output_path.stat().st_size savings = (orig_size - new_size) / orig_size * 100 if orig_size > 0 else 0 return (str(input_path), True, None, orig_size, new_size, savings, False, False) except Exception as e: corrupt = "truncated" in str(e).lower() or "cannot identify" in str(e).lower() or "corrupt" in str(e).lower() return (str(input_path), False, str(e), 0, 0, 0, False, corrupt) def move_to_corrupted(src: Path, base: Path, corrupted_root: Path) -> bool: try: rel = src.relative_to(base) except ValueError: rel = src.name dest = corrupted_root / rel dest.parent.mkdir(parents=True, exist_ok=True) src.rename(dest) return True def format_size(size_bytes: float) -> str: for unit in ("B", "KB", "MB", "GB"): if size_bytes < 1024.0: return f"{size_bytes:.2f} {unit}" size_bytes /= 1024.0 return f"{size_bytes:.2f} TB" def format_time(seconds: float) -> str: hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = int(seconds % 60) centiseconds = int((seconds % 1) * 100) return f"{hours:02d}:{minutes:02d}:{secs:02d}:{centiseconds:02d}" def main() -> int: parser = argparse.ArgumentParser(description="Compress PNGs in unchanged sequences; staging + Y/N overwrite.") parser.add_argument("--8bit", "-8", action="store_true", dest="force_8bit") parser.add_argument("--16bit", "-16", action="store_true", dest="force_16bit") parser.add_argument("--verbose", action="store_true") args = parser.parse_args() force_bitdepth = "8" if args.force_8bit else ("16" if args.force_16bit else None) if args.force_8bit and args.force_16bit: print("Error: Cannot specify both --8bit and --16bit. Choose one.") return 1 if force_bitdepth == "8": print("Mode: Forcing 8-bit color depth") elif force_bitdepth == "16": print("Mode: Forcing 16-bit color depth") else: print("Mode: Auto-detect bit depth (preserve 16-bit if present)") if not RENDER_ROOT.exists(): print("[ERROR] Renders directory not found.") return 1 staging_root = RENDER_ROOT / STAGING_DIR corrupted_root = RENDER_ROOT / CORRUPTED_DIR # Find changed sequences (no archive, or state differs from stored) changed: list[Path] = [] for seq_dir in find_sequence_dirs(RENDER_ROOT): current = compute_state(seq_dir) if not current["files"]: continue _, state_path = find_archive_and_state(seq_dir) if state_path is None or not state_path.exists(): changed.append(seq_dir) # New sequence, never zipped continue stored = load_state(state_path) if state_changed(current, stored): changed.append(seq_dir) if not changed: print("No changed sequences with PNGs found. All sequences are up to date.") return 0 print(f"Found {len(changed)} changed sequence(s) to compress.") staging_root.mkdir(parents=True, exist_ok=True) corrupted_root.mkdir(parents=True, exist_ok=True) # Collect PNGs to compress work: list[tuple[Path, Path]] = [] for seq_dir in changed: rel = seq_dir.relative_to(RENDER_ROOT) staging_seq = staging_root / rel for png in iter_png_files(seq_dir): out = staging_seq / png.relative_to(seq_dir) work.append((png, out)) if not work: print("No PNG files in changed sequences.") return 0 max_workers = multiprocessing.cpu_count() if platform.system() == "Windows" and not UNLOCKED: max_workers = min(max_workers, 61) if max_workers < multiprocessing.cpu_count(): print(f"Detected {multiprocessing.cpu_count()} CPU threads, but Windows limits ProcessPoolExecutor to 61 workers.") print("Install 'unlock-processpool-win' package to use all cores: pip install unlock-processpool-win") else: if UNLOCKED: print("Using unlock-processpool-win to bypass Windows 61-worker limit") print(f"Compressing {len(work)} PNGs with {max_workers} workers...") print("-" * 80) compressed = 0 corrupted_count = 0 corrupted_paths: list[tuple[str, str]] = [] total_orig = 0 total_new = 0 failed = 0 start_time = time.time() last_update_time = start_time with ProcessPoolExecutor(max_workers=max_workers) as ex: futures = {ex.submit(compress_png, inp, out, force_bitdepth): (inp, out) for inp, out in work} for future in as_completed(futures): inp, out = futures[future] path, ok, err, o_sz, n_sz, _, _, is_corrupt = future.result() if ok: compressed += 1 total_orig += o_sz total_new += n_sz elif is_corrupt: corrupted_count += 1 try: if move_to_corrupted(Path(path), RENDER_ROOT, corrupted_root): corrupted_paths.append((path, f"Moved to {corrupted_root / Path(path).relative_to(RENDER_ROOT)}")) else: corrupted_paths.append((path, str(err))) except Exception as e: corrupted_paths.append((path, str(e))) print(f"\n[CORRUPTED] {path} -> {err}") else: failed += 1 print(f"\n[ERROR] {path}: {err}") # Live progress (update every 0.5s or on first completion) current_time = time.time() time_since_update = current_time - last_update_time processed = compressed + corrupted_count + failed if processed == 1 or time_since_update >= 0.5: elapsed = current_time - start_time rate = processed / elapsed if elapsed > 0 else 0 remaining = len(work) - processed eta_seconds = remaining / rate if rate > 0 and remaining > 0 else 0 total_savings = total_orig - total_new total_savings_pct = (total_savings / total_orig * 100) if total_orig > 0 else 0 eta_str = format_time(eta_seconds) if eta_seconds > 0 else "calculating..." elapsed_str = format_time(elapsed) print( f"[{processed:5d}/{len(work)}] " f"Compressed: {compressed} | Corrupted: {corrupted_count} | Failed: {failed} | " f"Speed: {rate:.1f} files/sec | " f"Elapsed: {elapsed_str} | ETA: {eta_str} | " f"Saved: {format_size(total_savings)} ({total_savings_pct:.1f}%)", end="\r", ) last_update_time = current_time print() # Clear progress line total_time = time.time() - start_time total_savings = total_orig - total_new total_savings_pct = (total_savings / total_orig * 100) if total_orig > 0 else 0 avg_rate = (compressed + corrupted_count + failed) / total_time if total_time > 0 else 0 print("=" * 80) print("Compression complete!") print(f"Successfully compressed: {compressed} files") if corrupted_count > 0: print(f"Corrupted (bad PNGs): {corrupted_count} files") if failed > 0: print(f"Failed: {failed} files") print(f"Total time: {format_time(total_time)}") print(f"Average speed: {avg_rate:.2f} files/second") print(f"Original size: {format_size(total_orig)}") print(f"Compressed size: {format_size(total_new)}") print(f"Total savings: {format_size(total_savings)} ({total_savings_pct:.1f}%)") if corrupted_paths: print(f"\nCorrupted files moved to {corrupted_root}") print("\n" + "=" * 80) print("CORRUPTED FILES LIST:") print("=" * 80) for item in corrupted_paths: file_path, move_status = item print(f" {file_path}") print(f" Status: {move_status}") check_dirs = [f"Renders\\{STAGING_DIR}"] if corrupted_count > 0: check_dirs.append(f"Renders\\{CORRUPTED_DIR}") resp = input(f"\nCheck files in {' and '.join(check_dirs)}. Ready to overwrite originals? (Y/N): ").strip().upper() if resp != "Y": print("Exiting. Staging and corrupted folders left for inspection.") return 0 # Overwrite originals with staged files for seq_dir in changed: rel = seq_dir.relative_to(RENDER_ROOT) staging_seq = staging_root / rel if not staging_seq.exists(): continue for staged in staging_seq.rglob("*.png"): rel_file = staged.relative_to(staging_seq) orig = seq_dir / rel_file if orig.exists(): shutil.copy2(staged, orig) # Cleanup staging and corrupted shutil.rmtree(staging_root, ignore_errors=True) if corrupted_root.exists(): shutil.rmtree(corrupted_root, ignore_errors=True) print("Overwrite complete. Staging and corrupted folders cleaned up.") return 0 if __name__ == "__main__": sys.exit(main())