#!/usr/bin/env python3 """Maintain zipped render sequences for Git hooks. Default mode scans `Renders/`, produces ZIP archives under `Renders/_zipped/`, and stages any updated archives so commits only track compact files. Switch to `--mode expand` to inflate the tracked archives back into the ignored working directories after checkouts or pulls. """ from __future__ import annotations import argparse import json import subprocess import os import shutil import sys from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from typing import Iterator, Sequence RENDER_ROOT = Path("Renders") ARCHIVE_ROOT = RENDER_ROOT / "_zipped" SEQUENCE_EXTENSIONS = { ".png", ".jpg", ".jpeg", ".tif", ".tiff", ".exr", } STATE_SUFFIX = ".meta.json" CONFIG_PATH = Path(__file__).resolve().with_name("config.json") DEFAULT_CONFIG = { "zipper": "7z", "compression": 9, "dailyFormat": "daily_YYMMDD", } def load_config() -> dict: try: text = CONFIG_PATH.read_text(encoding="utf-8") except FileNotFoundError: return DEFAULT_CONFIG.copy() except OSError: return DEFAULT_CONFIG.copy() try: data = json.loads(text) except json.JSONDecodeError: return DEFAULT_CONFIG.copy() if not isinstance(data, dict): return DEFAULT_CONFIG.copy() merged = DEFAULT_CONFIG.copy() merged.update(data) return merged CONFIG = load_config() zipper_val = CONFIG.get("zipper", "7z") # Handle both old boolean format and new string format if isinstance(zipper_val, bool): ZIPPER_TYPE = "7z" if zipper_val else "zip" else: ZIPPER_TYPE = str(zipper_val).lower() USE_7Z = ZIPPER_TYPE == "7z" COMPRESSION_LEVEL = CONFIG.get("compression", 9) if isinstance(COMPRESSION_LEVEL, str): try: COMPRESSION_LEVEL = int(COMPRESSION_LEVEL) except ValueError: COMPRESSION_LEVEL = 9 if not isinstance(COMPRESSION_LEVEL, int): COMPRESSION_LEVEL = 9 COMPRESSION_LEVEL = max(0, min(9, COMPRESSION_LEVEL)) SEVEN_Z_EXE: str | None = None if USE_7Z: SEVEN_Z_EXE = shutil.which("7z") or shutil.which("7za") if SEVEN_Z_EXE is None: print("[zip] Requested 7z compression but no 7z executable was found; falling back to zipfile.", file=sys.stderr) USE_7Z = False def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Sync render sequences with zipped archives.") parser.add_argument( "--mode", choices=("zip", "expand"), default="zip", help="zip sequences for commit (default) or expand tracked archives", ) parser.add_argument("--jobs", type=int, help="max parallel workers") parser.add_argument("--verbose", action="store_true", help="print extra progress details") return parser.parse_args() def max_workers(requested: int | None) -> int: cpu = os.cpu_count() or 1 limit = max(1, min(8, cpu)) if requested and requested > 0: return min(requested, max(1, cpu)) return limit def log(mode: str, message: str, *, verbose_only: bool = False, verbose: bool = False) -> None: if verbose_only and not verbose: return print(f"[{mode}] {message}") def is_archive_path(path: Path) -> bool: return any(part == "_archive" for part in path.parts) def find_sequence_dirs(root: Path) -> Iterator[Path]: for dirpath, dirnames, filenames in os.walk(root): path = Path(dirpath) dirnames[:] = [d for d in dirnames if d != "_archive"] if is_archive_path(path): continue has_frames = any(Path(dirpath, f).suffix.lower() in SEQUENCE_EXTENSIONS for f in filenames) if has_frames: yield path def iter_sequence_files(seq_dir: Path) -> Iterator[Path]: for dirpath, dirnames, filenames in os.walk(seq_dir): path = Path(dirpath) dirnames[:] = [d for d in dirnames if d != "_archive"] if is_archive_path(path): continue for filename in filenames: yield path / filename def compute_state(seq_dir: Path) -> dict: entries = [] files = sorted( iter_sequence_files(seq_dir), key=lambda p: p.relative_to(seq_dir).as_posix(), ) for file_path in files: stat = file_path.stat() entries.append( { "path": file_path.relative_to(seq_dir).as_posix(), "size": stat.st_size, "mtime_ns": stat.st_mtime_ns, } ) return {"files": entries} def current_state(seq_dir: Path) -> dict: if not seq_dir.exists() or not seq_dir.is_dir(): return {"files": []} return compute_state(seq_dir) def load_state(state_path: Path) -> dict | None: if not state_path.exists(): return None try: return json.loads(state_path.read_text()) except json.JSONDecodeError: return None def state_changed(seq_state: dict, stored_state: dict | None) -> bool: if stored_state is None: return True return seq_state != stored_state def archive_path_for(seq_dir: Path) -> Path: rel = seq_dir.relative_to(RENDER_ROOT) return (ARCHIVE_ROOT / rel).with_suffix(".zip") def sequence_dir_for(zip_path: Path) -> Path: rel = zip_path.relative_to(ARCHIVE_ROOT) return (RENDER_ROOT / rel).with_suffix("") def state_path_for(zip_path: Path) -> Path: return zip_path.with_suffix(zip_path.suffix + STATE_SUFFIX) def zip_sequence(seq_dir: Path, zip_path: Path) -> None: if USE_7Z and SEVEN_Z_EXE: zip_path.parent.mkdir(parents=True, exist_ok=True) cmd = [ SEVEN_Z_EXE, "a", "-y", f"-mx={COMPRESSION_LEVEL}", "-tzip", str(zip_path), ".\\*", ] subprocess.run(cmd, cwd=seq_dir, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return from zipfile import ZIP_DEFLATED, ZIP_STORED, ZipFile zip_path.parent.mkdir(parents=True, exist_ok=True) if COMPRESSION_LEVEL <= 0: compression = ZIP_STORED zip_kwargs = {} else: compression = ZIP_DEFLATED zip_kwargs = {"compresslevel": COMPRESSION_LEVEL} with ZipFile(zip_path, "w", compression=compression, **zip_kwargs) as archive: for file_path in iter_sequence_files(seq_dir): archive.write(file_path, arcname=file_path.relative_to(seq_dir).as_posix()) def expand_sequence(zip_path: Path, seq_state: dict) -> None: target_dir = sequence_dir_for(zip_path) if target_dir.exists(): shutil.rmtree(target_dir) target_dir.mkdir(parents=True, exist_ok=True) if USE_7Z and SEVEN_Z_EXE: cmd = [ SEVEN_Z_EXE, "x", "-y", str(zip_path), f"-o{target_dir}", ] subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) else: from zipfile import ZipFile with ZipFile(zip_path, "r") as archive: archive.extractall(target_dir) for entry in seq_state.get("files", []): file_path = target_dir / entry["path"] if file_path.exists(): os.utime(file_path, ns=(entry["mtime_ns"], entry["mtime_ns"])) def process_zip(seq_dir: Path, zip_path: Path, state_path: Path, seq_state: dict, *, verbose: bool) -> Sequence[Path]: log("zip", f"{seq_dir} -> {zip_path}", verbose_only=True, verbose=verbose) zip_sequence(seq_dir, zip_path) state_path.write_text(json.dumps(seq_state, indent=2)) return (zip_path, state_path) def process_expand(zip_path: Path, state: dict, *, verbose: bool) -> None: log("expand", f"{zip_path} -> {sequence_dir_for(zip_path)}", verbose_only=True, verbose=verbose) expand_sequence(zip_path, state) def run_zip(worker_count: int, *, verbose: bool) -> int: work_items: list[tuple[Path, Path, Path, dict]] = [] if RENDER_ROOT.exists(): for seq_dir in find_sequence_dirs(RENDER_ROOT): seq_state = compute_state(seq_dir) if not seq_state["files"]: continue zip_path = archive_path_for(seq_dir) state_path = state_path_for(zip_path) stored_state = load_state(state_path) if not state_changed(seq_state, stored_state): continue work_items.append((seq_dir, zip_path, state_path, seq_state)) if not work_items: if not RENDER_ROOT.exists(): log("zip", "Render root 'Renders' not found; nothing to zip.") else: log("zip", "Archives already up to date; no sequences needed zipping.") return 0 updated_paths: list[Path] = [] total = len(work_items) completed = 0 with ThreadPoolExecutor(max_workers=worker_count) as executor: future_map = { executor.submit(process_zip, seq_dir, zip_path, state_path, seq_state, verbose=verbose): seq_dir for seq_dir, zip_path, state_path, seq_state in work_items } for future in as_completed(future_map): updated_paths.extend(future.result()) completed += 1 seq_dir = future_map[future] rel = seq_dir.relative_to(RENDER_ROOT) log("zip", f"{completed}/{total} {rel}") updated_count = len(updated_paths) // 2 log("zip", f"Updated {updated_count} sequence archive(s).", verbose=verbose) if updated_paths: log( "zip", "Archives updated. Stage manually with `git add Renders/_zipped`, if desired.", verbose_only=True, verbose=verbose, ) removed = cleanup_orphan_archives(verbose=verbose) if removed: log("zip", f"Removed {removed} orphan archive(s).", verbose=verbose) return updated_count def run_expand(worker_count: int, *, verbose: bool) -> int: if not ARCHIVE_ROOT.exists(): log("expand", "No archives to expand (missing 'Renders/_zipped').") return 0 work_items: list[tuple[Path, dict]] = [] for zip_path in ARCHIVE_ROOT.rglob("*.zip"): state_path = state_path_for(zip_path) seq_state = load_state(state_path) if seq_state is None: log("expand", f"Skipping {zip_path} (missing metadata)") continue target_dir = sequence_dir_for(zip_path) if current_state(target_dir) == seq_state: continue work_items.append((zip_path, seq_state)) if not work_items: log("expand", "Working folders already match archives; nothing to expand.") return 0 total = len(work_items) completed = 0 with ThreadPoolExecutor(max_workers=worker_count) as executor: future_map = { executor.submit(process_expand, zip_path, seq_state, verbose=verbose): zip_path for zip_path, seq_state in work_items } for future in as_completed(future_map): future.result() completed += 1 zip_path = future_map[future] rel = zip_path.relative_to(ARCHIVE_ROOT) log("expand", f"{completed}/{total} {rel}") log("expand", f"Refreshed {len(work_items)} sequence folder(s).", verbose=verbose) return len(work_items) def cleanup_orphan_archives(*, verbose: bool) -> int: if not ARCHIVE_ROOT.exists(): return 0 removed: list[Path] = [] for zip_path in ARCHIVE_ROOT.rglob("*.zip"): seq_dir = sequence_dir_for(zip_path) if seq_dir.exists(): continue rel = zip_path.relative_to(ARCHIVE_ROOT) log("zip", f"Removing orphan archive {rel}", verbose_only=True, verbose=verbose) zip_path.unlink(missing_ok=True) state_path = state_path_for(zip_path) if state_path.exists(): state_path.unlink() removed.append(zip_path) if not removed: return 0 for parent in sorted({p.parent for p in removed}, key=lambda p: len(p.parts), reverse=True): if not parent.exists(): continue while parent != ARCHIVE_ROOT and not any(parent.iterdir()): parent.rmdir() parent = parent.parent return len(removed) def main() -> int: args = parse_args() workers = max_workers(args.jobs) if args.mode == "expand": run_expand(workers, verbose=args.verbose) return 0 updated = run_zip(workers, verbose=args.verbose) return 0 if updated >= 0 else 1 if __name__ == "__main__": try: raise SystemExit(main()) except Exception as exc: # broad to surface unexpected errors cleanly print(f"Sequence sync failed: {exc}", file=sys.stderr) raise