#!/usr/bin/env python3 """Maintain zipped render sequences for Git hooks. Default mode scans `Renders/`, produces ZIP archives under `Renders/_zipped/`, and stages any updated archives so commits only track compact files. Switch to `--mode expand` to inflate the tracked archives back into the ignored working directories after checkouts or pulls. """ from __future__ import annotations import argparse import json import os import platform import shutil import subprocess import sys import tempfile import time import traceback from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from typing import Iterator, Sequence # Try to import psutil for cross-platform RAM detection try: import psutil HAS_PSUTIL = True except ImportError: HAS_PSUTIL = False # For Windows fallback if platform.system() == "Windows": try: import ctypes HAS_CTYPES = True except ImportError: HAS_CTYPES = False else: HAS_CTYPES = False RENDER_ROOT = Path("Renders") ARCHIVE_ROOT = RENDER_ROOT / "_zipped" SEQUENCE_EXTENSIONS = { ".png", ".jpg", ".jpeg", ".tif", ".tiff", ".exr", } STATE_SUFFIX = ".meta.json" DEFAULT_CONFIG = { "zipper": "7z", "compression": 9, "compressionMethod": "LZMA2", # Compression method: LZMA2 (multi-threaded), PPMd (single-threaded), BZip2, Deflate "dailyFormat": "daily_YYMMDD", "Max7zInst": 0, # Maximum concurrent 7z instances (0 = auto-calculate) } def log(mode: str, message: str, *, verbose_only: bool = False, verbose: bool = False) -> None: if verbose_only and not verbose: return print(f"[{mode}] {message}", flush=True) def load_config() -> dict: # First try to load from project's .config folder (current working directory) # Then fall back to ProjectStructure repo config (next to zip_sequences.py) cwd = Path.cwd() project_config = cwd / ".config" / "config.json" repo_config = Path(__file__).resolve().with_name("config.json") config_paths = [ ("project", project_config), ("repo", repo_config), ] log("init", "Loading configuration sources...") for source, config_path in config_paths: try: if config_path.exists(): log("init", f"Reading {source} config at {config_path}") text = config_path.read_text(encoding="utf-8") try: data = json.loads(text) if isinstance(data, dict): merged = DEFAULT_CONFIG.copy() merged.update(data) log("init", f"Configuration loaded from {source}") return merged except json.JSONDecodeError: log("init", f"Config file at {config_path} is invalid JSON; skipping") continue except OSError: log("init", f"Unable to read config at {config_path}; skipping") continue # If no config found, return defaults log("init", "No config files found; using default settings") return DEFAULT_CONFIG.copy() CONFIG = load_config() zipper_val = CONFIG.get("zipper", "7z") # Handle both old boolean format and new string format if isinstance(zipper_val, bool): ZIPPER_TYPE = "7z" if zipper_val else "zip" else: ZIPPER_TYPE = str(zipper_val).lower() COMPRESSION_LEVEL = CONFIG.get("compression", 9) if isinstance(COMPRESSION_LEVEL, str): try: COMPRESSION_LEVEL = int(COMPRESSION_LEVEL) except ValueError: COMPRESSION_LEVEL = 9 if not isinstance(COMPRESSION_LEVEL, int): COMPRESSION_LEVEL = 9 COMPRESSION_LEVEL = max(0, min(9, COMPRESSION_LEVEL)) COMPRESSION_METHOD = CONFIG.get("compressionMethod", "LZMA2") # Validate compression method valid_methods = {"LZMA2", "PPMd", "BZip2", "Deflate"} if COMPRESSION_METHOD not in valid_methods: COMPRESSION_METHOD = "LZMA2" # Default to LZMA2 for multi-threading support MAX_7Z_INSTANCES = CONFIG.get("Max7zInst", 0) if MAX_7Z_INSTANCES is not None: if isinstance(MAX_7Z_INSTANCES, str): try: MAX_7Z_INSTANCES = int(MAX_7Z_INSTANCES) except ValueError: MAX_7Z_INSTANCES = 0 if not isinstance(MAX_7Z_INSTANCES, int) or MAX_7Z_INSTANCES < 1: MAX_7Z_INSTANCES = 0 # Treat 0 as None (auto-calculate) if MAX_7Z_INSTANCES == 0: MAX_7Z_INSTANCES = None SEVEN_Z_EXE: str | None = None if ZIPPER_TYPE == "7z": SEVEN_Z_EXE = shutil.which("7z") or shutil.which("7za") def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Sync render sequences with zipped archives.") parser.add_argument( "--mode", choices=("zip", "expand"), default="zip", help="zip sequences for commit (default) or expand tracked archives", ) parser.add_argument("--jobs", type=int, help="max parallel workers") parser.add_argument("--verbose", action="store_true", help="print extra progress details") return parser.parse_args() def get_available_ram() -> int | None: """Get available RAM in bytes, reserving 20% for system. Returns: Available RAM in bytes, or None if detection fails. """ try: if HAS_PSUTIL: # Use psutil for cross-platform RAM detection mem = psutil.virtual_memory() # Reserve 20% for system, use 80% for compression jobs available = int(mem.total * 0.8) return available elif HAS_CTYPES and platform.system() == "Windows": # Windows fallback using ctypes class MEMORYSTATUSEX(ctypes.Structure): _fields_ = [ ("dwLength", ctypes.c_ulong), ("dwMemoryLoad", ctypes.c_ulong), ("ullTotalPhys", ctypes.c_ulonglong), ("ullAvailPhys", ctypes.c_ulonglong), ("ullTotalPageFile", ctypes.c_ulonglong), ("ullAvailPageFile", ctypes.c_ulonglong), ("ullTotalVirtual", ctypes.c_ulonglong), ("ullAvailVirtual", ctypes.c_ulonglong), ("ullAvailExtendedVirtual", ctypes.c_ulonglong), ] kernel32 = ctypes.windll.kernel32 kernel32.GlobalMemoryStatusEx.argtypes = [ctypes.POINTER(MEMORYSTATUSEX)] kernel32.GlobalMemoryStatusEx.restype = ctypes.c_bool mem_status = MEMORYSTATUSEX() mem_status.dwLength = ctypes.sizeof(MEMORYSTATUSEX) if kernel32.GlobalMemoryStatusEx(ctypes.byref(mem_status)): # Reserve 20% for system, use 80% for compression jobs available = int(mem_status.ullTotalPhys * 0.8) return available except Exception: pass return None def estimate_ram_per_job(seq_dir: Path, seq_state: dict) -> int: """Estimate RAM usage per compression job based on folder size. Args: seq_dir: Path to the sequence directory seq_state: State dictionary containing file information Returns: Estimated RAM usage in bytes """ # Calculate total folder size from seq_state total_bytes = sum(entry.get("size", 0) for entry in seq_state.get("files", [])) if ZIPPER_TYPE == "7z": # Fixed dictionary size: 1GB (1024MB) # Actual observed usage: ~2GB per process regardless of sequence size # Use a tight estimate to allow maximum concurrency # Estimate 3GB per job (50% safety margin over observed ~2GB) base_ram = 3 * 1024 * 1024 * 1024 # 3GB base estimate # For very large sequences (>50GB), add small buffer if total_bytes > 50 * 1024 * 1024 * 1024: # >50GB # Add 1GB for very large sequences estimated_ram = base_ram + (1 * 1024 * 1024 * 1024) else: estimated_ram = base_ram return estimated_ram else: # zip compression is more memory-efficient # Conservative estimate: 1GB per job return 1024 * 1024 * 1024 # 1 GB def max_workers( requested: int | None, work_items: list[tuple[Path, Path, Path, dict]] | None = None, *, verbose: bool = False ) -> tuple[int, int | None]: """Calculate maximum worker count based on CPU and RAM constraints. Args: requested: User-requested worker count (from --jobs) work_items: List of work items (seq_dir, zip_path, state_path, seq_state) verbose: Whether to log RAM-based calculations Returns: Tuple of (worker_count, per_job_memory_limit_bytes) per_job_memory_limit_bytes is None if not using 7z or RAM detection failed """ cpu = os.cpu_count() or 1 cpu_limit = max(1, min(8, cpu)) if requested and requested > 0: cpu_limit = min(requested, max(1, cpu)) # If no work items provided, return CPU-based limit if work_items is None or len(work_items) == 0: return (cpu_limit, {}) # Try to calculate RAM-based limit available_ram = get_available_ram() if available_ram is None: # RAM detection failed, fall back to CPU limit if verbose: log("zip", "RAM detection failed, using CPU-based worker limit", verbose_only=True, verbose=verbose) return (cpu_limit, {}) # For 7z: use fixed dictionary size and calculate workers if ZIPPER_TYPE == "7z": # Fixed dictionary size: 1GB (1024MB) FIXED_DICT_SIZE_MB = 1024 fixed_dict_size_bytes = FIXED_DICT_SIZE_MB * 1024 * 1024 # Check if Max7zInst is configured if MAX_7Z_INSTANCES is not None: # Use configured maximum instances, but still respect user's --jobs and work items final_limit = MAX_7Z_INSTANCES num_work_items = len(work_items) if work_items else 0 if num_work_items > 0: final_limit = min(final_limit, num_work_items) if requested and requested > 0: final_limit = min(final_limit, requested) # Create RAM limits dict (all use fixed dict size, but return as dict for consistency) ram_limits_dict: dict[Path, int] = {} if work_items: for seq_dir, _, _, _ in work_items: ram_limits_dict[seq_dir] = fixed_dict_size_bytes if verbose: log( "zip", f"Using Max7zInst={MAX_7Z_INSTANCES} from config → " f"work items: {num_work_items}, requested: {requested}, final: {final_limit}", verbose_only=True, verbose=verbose ) return (final_limit, ram_limits_dict) # Auto-calculate based on RAM if Max7zInst not configured # Prioritize maximum concurrency: one worker per sequence when possible if available_ram is not None: # available_ram is already 80% of total (20% reserved for system) # Use 95% of available RAM for compression jobs compression_ram = int(available_ram * 0.95) # Estimate RAM for each work item work_items_with_ram: list[tuple[Path, int, tuple[Path, Path, Path, dict]]] = [] ram_limits_dict: dict[Path, int] = {} for seq_dir, zip_path, state_path, seq_state in work_items: try: estimated_ram = estimate_ram_per_job(seq_dir, seq_state) work_items_with_ram.append((seq_dir, estimated_ram, (seq_dir, zip_path, state_path, seq_state))) ram_limits_dict[seq_dir] = estimated_ram except Exception: # If estimation fails, use a safe default (3GB) default_ram = 3 * 1024 * 1024 * 1024 # 3GB work_items_with_ram.append((seq_dir, default_ram, (seq_dir, zip_path, state_path, seq_state))) ram_limits_dict[seq_dir] = default_ram num_work_items = len(work_items) if work_items else 0 # Calculate total estimated RAM total_estimated_ram = sum(ram for _, ram, _ in work_items_with_ram) # If all sequences fit in available RAM, use one worker per sequence (maximum concurrency) if total_estimated_ram <= compression_ram: worker_count = num_work_items else: # Not all fit - use bin-packing to minimize workers # Sort by estimated RAM (largest first) for bin-packing work_items_with_ram.sort(key=lambda x: x[1], reverse=True) # Bin-packing algorithm: pack largest items first bins: list[list[tuple[Path, int, tuple[Path, Path, Path, dict]]]] = [] bin_remaining: list[int] = [] for seq_dir, estimated_ram, work_item in work_items_with_ram: # Try to fit in existing bin placed = False for i, remaining in enumerate(bin_remaining): if remaining >= estimated_ram: bins[i].append((seq_dir, estimated_ram, work_item)) bin_remaining[i] -= estimated_ram placed = True break # If doesn't fit, create new bin if not placed: bins.append([(seq_dir, estimated_ram, work_item)]) bin_remaining.append(compression_ram - estimated_ram) # Worker count is number of bins worker_count = len(bins) # Cap at number of actual work items (can't have more workers than jobs) if num_work_items > 0: worker_count = min(worker_count, num_work_items) # Respect user's --jobs if provided if requested and requested > 0: worker_count = min(worker_count, requested) if verbose: ram_gb = available_ram / (1024 ** 3) compression_ram_gb = compression_ram / (1024 ** 3) total_estimated_gb = total_estimated_ram / (1024 ** 3) log( "zip", f"RAM: {ram_gb:.1f}GB available (80% of total), {compression_ram_gb:.1f}GB for compression (95%)", verbose_only=True, verbose=verbose ) log( "zip", f"Estimated RAM per sequence: {total_estimated_gb:.1f}GB total across {num_work_items} sequences", verbose_only=True, verbose=verbose ) if total_estimated_ram <= compression_ram: log( "zip", f"All sequences fit in RAM → using {worker_count} workers (one per sequence)", verbose_only=True, verbose=verbose ) else: log( "zip", f"Using bin-packing: {worker_count} workers needed", verbose_only=True, verbose=verbose ) log( "zip", f"Final worker count: {worker_count} (requested: {requested})", verbose_only=True, verbose=verbose ) # Return worker count and RAM limits dict return (worker_count, ram_limits_dict) # RAM detection failed, use a safe default (no CPU limit) default_limit = 4 num_work_items = len(work_items) if work_items else 0 if num_work_items > 0: default_limit = min(default_limit, num_work_items) if requested and requested > 0: default_limit = min(default_limit, requested) # Create RAM limits dict with safe defaults (12GB per job for 1GB dict) ram_limits_dict: dict[Path, int] = {} if work_items: default_ram = 12 * 1024 * 1024 * 1024 # 12GB default for seq_dir, _, _, _ in work_items: ram_limits_dict[seq_dir] = default_ram if verbose: log( "zip", f"RAM detection failed and Max7zInst not set, using default worker limit → " f"work items: {num_work_items}, requested: {requested}, final: {default_limit}", verbose_only=True, verbose=verbose ) return (default_limit, ram_limits_dict) # For zip compression, use existing estimation-based approach # Estimate RAM per job for each work item ram_estimates = [] ram_limits_dict: dict[Path, int] = {} for seq_dir, zip_path, state_path, seq_state in work_items: try: estimated_ram = estimate_ram_per_job(seq_dir, seq_state) ram_estimates.append(estimated_ram) ram_limits_dict[seq_dir] = estimated_ram except Exception: # If estimation fails, use fallback estimate fallback_ram = 1024 * 1024 * 1024 # 1GB fallback for zip ram_estimates.append(fallback_ram) ram_limits_dict[seq_dir] = fallback_ram if not ram_estimates: return (cpu_limit, {}) max_ram_per_job = max(ram_estimates) ram_limit = max(1, available_ram // max_ram_per_job) ram_limit = min(ram_limit, 6) # Conservative limit for zip final_limit = min(cpu_limit, ram_limit) if verbose: ram_gb = available_ram / (1024 ** 3) max_ram_gb = max_ram_per_job / (1024 ** 3) log( "zip", f"RAM: {ram_gb:.1f}GB available, ~{max_ram_gb:.1f}GB per job → " f"RAM limit: {ram_limit}, CPU limit: {cpu_limit}, final: {final_limit}", verbose_only=True, verbose=verbose ) return (final_limit, ram_limits_dict) def is_archive_path(path: Path) -> bool: return any(part in ("_archive", "_CURRENT") for part in path.parts) def find_sequence_dirs(root: Path, *, verbose: bool = False) -> Iterator[Path]: seen_dirs = set() # Track directories we've already yielded to avoid duplicates for dirpath, dirnames, filenames in os.walk(root): path = Path(dirpath) dirnames[:] = [d for d in dirnames if d not in ("_archive", "_CURRENT")] if is_archive_path(path): if verbose: rel = path.relative_to(root) if path.is_relative_to(root) else path log("scan", f"Skipping archive path: {rel}", verbose_only=True, verbose=verbose) continue # Check if this directory has sequence files directly has_frames = any(Path(dirpath, f).suffix.lower() in SEQUENCE_EXTENSIONS for f in filenames) if has_frames: path_resolved = path.resolve() if path_resolved not in seen_dirs: seen_dirs.add(path_resolved) yield path elif verbose: # Log directories that don't have sequence files for debugging rel = path.relative_to(root) if path.is_relative_to(root) else path if "scab" in path.name.lower(): # Special logging for directories that might be sequences frame_extensions = [f for f in filenames if Path(f).suffix.lower() in SEQUENCE_EXTENSIONS] all_files = list(path.iterdir()) if path.exists() else [] log("scan", f"Directory {rel}: {len(filenames)} files in dir, {len(frame_extensions)} with sequence extensions, {len(all_files)} total items", verbose_only=True, verbose=verbose) # Check subdirectories for subdir in dirnames[:5]: # Check first 5 subdirs subdir_path = path / subdir try: subdir_files = list(subdir_path.iterdir()) if subdir_path.exists() else [] subdir_frame_files = [f for f in subdir_files if f.is_file() and f.suffix.lower() in SEQUENCE_EXTENSIONS] if subdir_frame_files: log("scan", f" Subdirectory {subdir} has {len(subdir_frame_files)} sequence files", verbose_only=True, verbose=verbose) except (OSError, PermissionError): pass def iter_sequence_files(seq_dir: Path) -> Iterator[Path]: for dirpath, dirnames, filenames in os.walk(seq_dir): path = Path(dirpath) dirnames[:] = [d for d in dirnames if d not in ("_archive", "_CURRENT")] if is_archive_path(path): continue for filename in filenames: yield path / filename def compute_state(seq_dir: Path) -> dict: entries = [] files = sorted( iter_sequence_files(seq_dir), key=lambda p: p.relative_to(seq_dir).as_posix(), ) for file_path in files: stat = file_path.stat() entries.append( { "path": file_path.relative_to(seq_dir).as_posix(), "size": stat.st_size, "mtime_ns": stat.st_mtime_ns, } ) return {"files": entries} def current_state(seq_dir: Path) -> dict: if not seq_dir.exists() or not seq_dir.is_dir(): return {"files": []} return compute_state(seq_dir) def load_state(state_path: Path) -> dict | None: if not state_path.exists(): return None try: return json.loads(state_path.read_text()) except json.JSONDecodeError: return None def state_changed(seq_state: dict, stored_state: dict | None) -> bool: if stored_state is None: return True return seq_state != stored_state def archive_path_for(seq_dir: Path) -> Path: rel = seq_dir.relative_to(RENDER_ROOT) suffix = ".7z" if ZIPPER_TYPE == "7z" else ".zip" # Append suffix instead of replacing, since directory names might have dots (e.g., "scab_v2.1") return ARCHIVE_ROOT / f"{rel}{suffix}" def sequence_dir_for(zip_path: Path) -> Path: rel = zip_path.relative_to(ARCHIVE_ROOT) # Remove the archive suffix (.7z or .zip) from the end # Handle both .7z and .zip extensions rel_str = str(rel) if rel_str.endswith(".7z"): rel_str = rel_str[:-3] elif rel_str.endswith(".zip"): rel_str = rel_str[:-4] return RENDER_ROOT / rel_str def state_path_for(zip_path: Path) -> Path: return zip_path.with_suffix(zip_path.suffix + STATE_SUFFIX) def zip_sequence(seq_dir: Path, zip_path: Path, per_job_memory_limit: int | None = None, worker_count: int = 1, *, verbose: bool = False) -> None: if ZIPPER_TYPE == "7z": if SEVEN_Z_EXE is None: raise RuntimeError( "7z compression requested but 7z executable not found in PATH. " "Please install 7z (e.g., via Chocolatey: choco install 7zip) " "or set zipper to 'zip' in config.json" ) zip_path.parent.mkdir(parents=True, exist_ok=True) # If creating a .7z file, remove any existing .zip file for the same sequence if zip_path.suffix == ".7z": old_zip_path = zip_path.with_suffix(".zip") if old_zip_path.exists(): old_zip_path.unlink(missing_ok=True) old_state_path = state_path_for(old_zip_path) if old_state_path.exists(): old_state_path.unlink(missing_ok=True) # Build list of files to archive with relative paths file_list = [] for file_path in iter_sequence_files(seq_dir): rel_path = file_path.relative_to(seq_dir).as_posix() file_list.append(rel_path) if not file_list: raise RuntimeError(f"No files found to archive in {seq_dir}") # Create zip in temporary location first to avoid issues with corrupted existing files temp_zip = None list_file_path = None try: # Create temporary archive file path (but don't create the file - let 7z create it) temp_zip_path = tempfile.mktemp(suffix=".7z", dir=zip_path.parent) temp_zip = Path(temp_zip_path) # Create list file with absolute path fd, temp_path = tempfile.mkstemp(suffix=".lst", text=True) list_file_path = Path(temp_path) with os.fdopen(fd, "w", encoding="utf-8") as list_file: for rel_path in file_list: list_file.write(rel_path + "\n") list_file.flush() os.fsync(list_file.fileno()) # Ensure data is written to disk # File is closed here by context manager, small delay to ensure OS releases handle time.sleep(0.1) # Use absolute paths for both list file and temp zip list_file_abs = list_file_path.resolve() temp_zip_abs = temp_zip.resolve() # Create archive in temp location first (7z will create it fresh) cmd = [ SEVEN_Z_EXE, "a", "-y", "-bb0", # Suppress progress output f"-mx={COMPRESSION_LEVEL}", "-t7z", # Use 7z format, not zip ] # Set compression method and memory/dictionary size based on method # At compression level 0, use Copy (store) method for maximum speed FIXED_DICT_SIZE_MB = 1024 if COMPRESSION_LEVEL == 0: # Level 0 = no compression, just store files (fastest) cmd.append("-m0=Copy") # Copy method doesn't need threading, but enable it anyway for consistency cmd.append("-mmt=on") else: # Compression levels 1-9: use configured compression method if COMPRESSION_METHOD == "PPMd": # PPMd: specify memory as part of method string cmd.append(f"-m0=PPMd:mem={FIXED_DICT_SIZE_MB}m") elif COMPRESSION_METHOD == "LZMA2": # LZMA2: use -md for dictionary size cmd.append("-m0=LZMA2") cmd.append(f"-md={FIXED_DICT_SIZE_MB}m") elif COMPRESSION_METHOD == "BZip2": # BZip2: use -md for dictionary size (smaller max: 900KB) max_bzip2_dict = min(FIXED_DICT_SIZE_MB, 900) # BZip2 max is 900KB cmd.append("-m0=BZip2") cmd.append(f"-md={max_bzip2_dict}k") elif COMPRESSION_METHOD == "Deflate": # Deflate: doesn't use dictionary size parameter cmd.append("-m0=Deflate") else: # Fallback: use LZMA2 cmd.append("-m0=LZMA2") cmd.append(f"-md={FIXED_DICT_SIZE_MB}m") # CPU thread allocation: when there's only 1 worker, use all CPU cores # When there are multiple workers, use auto mode to let 7z decide # Note: PPMd is single-threaded and won't benefit from -mmt cpu_cores = os.cpu_count() or 1 if COMPRESSION_METHOD == "PPMd": # PPMd is single-threaded, so -mmt won't help # But we can still set it for consistency cmd.append("-mmt=on") elif worker_count == 1: # Single worker: use all CPU cores for maximum speed (LZMA2, BZip2, Deflate support this) cmd.append(f"-mmt={cpu_cores}") else: # Multiple workers: use auto mode (7z will manage threads) cmd.append("-mmt=on") cmd.extend([ str(temp_zip_abs), f"@{list_file_abs}", ]) # Log the command in verbose mode for debugging if verbose: cmd_str = " ".join(cmd) log("zip", f"7z command: {cmd_str}", verbose_only=True, verbose=verbose) result = subprocess.run( cmd, cwd=seq_dir, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) if result.returncode != 0: error_msg = result.stderr.strip() if result.stderr else "Unknown error" if result.stdout: error_msg += f"\nstdout: {result.stdout.strip()}" raise RuntimeError(f"7z compression failed: {error_msg}") # Move temp zip to final location, replacing any existing file if zip_path.exists(): zip_path.unlink() temp_zip.replace(zip_path) temp_zip = None # Mark as moved so we don't delete it finally: # Clean up temp zip if it wasn't moved if temp_zip and temp_zip.exists(): try: temp_zip.unlink(missing_ok=True) except OSError: pass # Clean up list file, with retry in case 7z still has it open if list_file_path and list_file_path.exists(): for attempt in range(3): try: list_file_path.unlink(missing_ok=True) break except PermissionError: if attempt < 2: time.sleep(0.1) # Wait 100ms before retry else: # Last attempt failed, just log and continue # The temp file will be cleaned up by the OS eventually pass return # Use zipfile (only if ZIPPER_TYPE == "zip") if ZIPPER_TYPE == "zip": from zipfile import ZIP_DEFLATED, ZIP_STORED, ZipFile zip_path.parent.mkdir(parents=True, exist_ok=True) if COMPRESSION_LEVEL <= 0: compression = ZIP_STORED zip_kwargs = {} else: compression = ZIP_DEFLATED zip_kwargs = {"compresslevel": COMPRESSION_LEVEL} with ZipFile(zip_path, "w", compression=compression, **zip_kwargs) as archive: for file_path in iter_sequence_files(seq_dir): archive.write(file_path, arcname=file_path.relative_to(seq_dir).as_posix()) return # Unknown ZIPPER_TYPE - fail with clear error raise RuntimeError( f"Unsupported ZIPPER_TYPE: {ZIPPER_TYPE!r}. " f"Expected '7z' or 'zip'. " f"Config zipper value: {CONFIG.get('zipper', 'not set')!r}" ) def expand_sequence(zip_path: Path, seq_state: dict) -> None: target_dir = sequence_dir_for(zip_path) if target_dir.exists(): shutil.rmtree(target_dir) target_dir.mkdir(parents=True, exist_ok=True) if ZIPPER_TYPE == "7z": if SEVEN_Z_EXE is None: raise RuntimeError( "7z extraction requested but 7z executable not found in PATH. " "Please install 7z or set zipper to 'zip' in config.json" ) cmd = [ SEVEN_Z_EXE, "x", "-y", str(zip_path), f"-o{target_dir}", ] result = subprocess.run( cmd, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) if result.returncode != 0: error_msg = result.stderr.strip() if result.stderr else "Unknown error" if result.stdout: error_msg += f"\nstdout: {result.stdout.strip()}" raise RuntimeError(f"7z extraction failed: {error_msg}") elif ZIPPER_TYPE == "zip": from zipfile import ZipFile with ZipFile(zip_path, "r") as archive: archive.extractall(target_dir) else: raise RuntimeError( f"Unsupported ZIPPER_TYPE: {ZIPPER_TYPE!r}. " f"Expected '7z' or 'zip'. " f"Config zipper value: {CONFIG.get('zipper', 'not set')!r}" ) for entry in seq_state.get("files", []): file_path = target_dir / entry["path"] if file_path.exists(): os.utime(file_path, ns=(entry["mtime_ns"], entry["mtime_ns"])) def process_zip(seq_dir: Path, zip_path: Path, state_path: Path, seq_state: dict, per_job_memory_limit: int | None, worker_count: int, *, verbose: bool) -> Sequence[Path]: log("zip", f"{seq_dir} -> {zip_path}", verbose_only=True, verbose=verbose) zip_sequence(seq_dir, zip_path, per_job_memory_limit, worker_count, verbose=verbose) state_path.write_text(json.dumps(seq_state, indent=2)) return (zip_path, state_path) def process_expand(zip_path: Path, state: dict, *, verbose: bool) -> None: log("expand", f"{zip_path} -> {sequence_dir_for(zip_path)}", verbose_only=True, verbose=verbose) expand_sequence(zip_path, state) def run_zip(requested_workers: int | None, *, verbose: bool) -> int: work_items: list[tuple[Path, Path, Path, dict]] = [] log("init", f"Scanning sequences under {RENDER_ROOT.resolve()}") total_scanned = 0 quick_skipped = 0 state_skipped = 0 empty_dirs = 0 queued = 0 if RENDER_ROOT.exists(): for seq_dir in find_sequence_dirs(RENDER_ROOT, verbose=verbose): total_scanned += 1 rel = seq_dir.relative_to(RENDER_ROOT) if total_scanned <= 5 or total_scanned % 10 == 0: log("scan", f"[{total_scanned}] Inspecting {rel}") # Get the target archive path (will be .7z if ZIPPER_TYPE is "7z") zip_path = archive_path_for(seq_dir) state_path = state_path_for(zip_path) # Quick check: if archive exists, load stored state first (fast) stored_state = load_state(state_path) # Check if we need to upgrade from .zip to .7z old_zip_path = None old_stored_state = None if ZIPPER_TYPE == "7z": old_zip_path = zip_path.with_suffix(".zip") if old_zip_path.exists(): old_state_path = state_path_for(old_zip_path) old_stored_state = load_state(old_state_path) # If old .zip exists and .7z doesn't, use old .zip's state for comparison if not zip_path.exists() and old_stored_state is not None: stored_state = old_stored_state # If .7z archive exists and we have stored state, do quick check before computing full state if zip_path.exists() and stored_state is not None: # Quick check: if directory mtime is older than archive, likely unchanged try: dir_mtime = seq_dir.stat().st_mtime_ns archive_mtime = zip_path.stat().st_mtime_ns # If directory wasn't modified since archive was created, skip state computation if dir_mtime <= archive_mtime: quick_skipped += 1 if quick_skipped <= 5: log("scan", f"Skipping {rel} (unchanged since archive)") # Still need to check for old .zip cleanup (we have .7z, so .zip is obsolete) if old_zip_path and old_zip_path.exists(): old_zip_path.unlink(missing_ok=True) old_state_path = state_path_for(old_zip_path) if old_state_path.exists(): old_state_path.unlink(missing_ok=True) continue except OSError: # If stat fails, fall through to full state computation pass # Compute current state only if we need to seq_state = compute_state(seq_dir) if not seq_state["files"]: empty_dirs += 1 if empty_dirs <= 5: log("scan", f"{rel} has no files; skipping") continue # Check if state changed if stored_state is not None and not state_changed(seq_state, stored_state): # Metadata matches stored state state_skipped += 1 if state_skipped <= 5: log("scan", f"{rel} metadata unchanged; archive up to date") if zip_path.exists(): # .7z exists and is up to date, clean up old .zip if it exists if old_zip_path and old_zip_path.exists(): old_zip_path.unlink(missing_ok=True) old_state_path = state_path_for(old_zip_path) if old_state_path.exists(): old_state_path.unlink(missing_ok=True) elif old_zip_path and old_zip_path.exists() and old_stored_state is not None: # .7z doesn't exist, but .zip exists and metadata matches # Keep the .zip file, don't create .7z continue else: # No archive exists, but state matches (shouldn't happen, but be safe) continue work_items.append((seq_dir, zip_path, state_path, seq_state)) queued += 1 if queued <= 5 or queued % 5 == 0: total_bytes = sum(entry.get("size", 0) for entry in seq_state.get("files", [])) size_gb = total_bytes / (1024 ** 3) log("scan", f"Queued {rel} for compression (~{size_gb:.2f}GB) [{queued} total]") if not work_items: if not RENDER_ROOT.exists(): log("zip", "Render root 'Renders' not found; nothing to zip.") else: log("zip", "Archives already up to date; no sequences needed zipping.") log( "scan", f"Summary: scanned {total_scanned}, quick-skipped {quick_skipped}, " f"state-skipped {state_skipped}, empty {empty_dirs}, queued {queued}", ) removed = cleanup_orphan_archives(verbose=verbose) if removed: log("zip", f"Removed {removed} orphan archive(s).", verbose=verbose) return 0 # Calculate RAM-aware worker count based on work items worker_count, ram_limits_dict = max_workers(requested_workers, work_items, verbose=verbose) log( "init", f"Preparing to compress {len(work_items)} sequence(s) with {worker_count} worker(s)", ) updated_paths: list[Path] = [] total = len(work_items) completed = 0 with ThreadPoolExecutor(max_workers=worker_count) as executor: future_map = { executor.submit(process_zip, seq_dir, zip_path, state_path, seq_state, ram_limits_dict.get(seq_dir), worker_count, verbose=verbose): seq_dir for seq_dir, zip_path, state_path, seq_state in work_items } for future in as_completed(future_map): updated_paths.extend(future.result()) completed += 1 seq_dir = future_map[future] rel = seq_dir.relative_to(RENDER_ROOT) log("zip", f"{completed}/{total} {rel}") updated_count = len(updated_paths) // 2 log("zip", f"Updated {updated_count} sequence archive(s).", verbose=verbose) if updated_paths: log( "zip", "Archives updated. Stage manually with `git add Renders/_zipped`, if desired.", verbose_only=True, verbose=verbose, ) log( "scan", f"Summary: scanned {total_scanned}, quick-skipped {quick_skipped}, " f"state-skipped {state_skipped}, empty {empty_dirs}, queued {queued}", ) removed = cleanup_orphan_archives(verbose=verbose) if removed: log("zip", f"Removed {removed} orphan archive(s).", verbose=verbose) return updated_count def run_expand(worker_count: int, *, verbose: bool) -> int: if not ARCHIVE_ROOT.exists(): log("expand", "No archives to expand (missing 'Renders/_zipped').") return 0 work_items: list[tuple[Path, dict]] = [] # Look for both .zip and .7z archives archive_patterns = ["*.zip", "*.7z"] for pattern in archive_patterns: for zip_path in ARCHIVE_ROOT.rglob(pattern): state_path = state_path_for(zip_path) seq_state = load_state(state_path) if seq_state is None: log("expand", f"Skipping {zip_path} (missing metadata)") continue target_dir = sequence_dir_for(zip_path) if current_state(target_dir) == seq_state: continue work_items.append((zip_path, seq_state)) if not work_items: log("expand", "Working folders already match archives; nothing to expand.") return 0 total = len(work_items) completed = 0 with ThreadPoolExecutor(max_workers=worker_count) as executor: future_map = { executor.submit(process_expand, zip_path, seq_state, verbose=verbose): zip_path for zip_path, seq_state in work_items } for future in as_completed(future_map): future.result() completed += 1 zip_path = future_map[future] rel = zip_path.relative_to(ARCHIVE_ROOT) log("expand", f"{completed}/{total} {rel}") log("expand", f"Refreshed {len(work_items)} sequence folder(s).", verbose=verbose) return len(work_items) def cleanup_orphan_archives(*, verbose: bool) -> int: if not ARCHIVE_ROOT.exists(): log("zip", "Archive root does not exist; nothing to clean up.", verbose_only=True, verbose=verbose) return 0 removed: list[Path] = [] log("zip", f"Scanning for orphan archives in {ARCHIVE_ROOT.resolve()}", verbose_only=True, verbose=verbose) # Look for both .zip and .7z archives archive_patterns = ["*.zip", "*.7z"] for pattern in archive_patterns: try: for zip_path in ARCHIVE_ROOT.rglob(pattern): try: # Resolve to absolute paths for consistent checking zip_path_abs = zip_path.resolve() # Calculate state path BEFORE checking/removing archive state_path = state_path_for(zip_path) state_path_abs = state_path.resolve() # Calculate sequence directory using sequence_dir_for # This function works with paths relative to ARCHIVE_ROOT seq_dir = sequence_dir_for(zip_path) seq_dir_abs = seq_dir.resolve() # Check if sequence directory exists and is actually a directory if seq_dir_abs.exists() and seq_dir_abs.is_dir(): log("zip", f"Archive {zip_path.relative_to(ARCHIVE_ROOT)} has matching sequence directory; keeping", verbose_only=True, verbose=verbose) continue # Sequence directory doesn't exist - this is an orphan archive rel = zip_path.relative_to(ARCHIVE_ROOT) log("zip", f"Removing orphan archive {rel}", verbose_only=False, verbose=verbose) # Remove archive file if zip_path_abs.exists(): zip_path_abs.unlink() log("zip", f"Deleted archive: {rel}", verbose_only=True, verbose=verbose) # Remove state file if it exists if state_path_abs.exists(): state_path_abs.unlink() state_rel = state_path.relative_to(ARCHIVE_ROOT) log("zip", f"Removed orphan metadata {state_rel}", verbose_only=False, verbose=verbose) removed.append(zip_path_abs) except Exception as e: # Log error but continue processing other archives try: rel = zip_path.relative_to(ARCHIVE_ROOT) except: rel = zip_path log("zip", f"Error processing archive {rel}: {e}", verbose_only=True, verbose=verbose) log("zip", f"Traceback: {traceback.format_exc()}", verbose_only=True, verbose=verbose) continue except Exception as e: log("zip", f"Error scanning for {pattern} archives: {e}", verbose_only=True, verbose=verbose) log("zip", f"Traceback: {traceback.format_exc()}", verbose_only=True, verbose=verbose) continue if not removed: log("zip", "No orphan archives found.", verbose_only=True, verbose=verbose) return 0 # Clean up empty parent directories archive_root_abs = ARCHIVE_ROOT.resolve() for parent in sorted({p.resolve().parent for p in removed}, key=lambda p: len(p.parts), reverse=True): parent_resolved = parent.resolve() if not parent_resolved.exists(): continue try: while parent_resolved != archive_root_abs and not any(parent_resolved.iterdir()): parent_resolved.rmdir() parent_resolved = parent_resolved.parent.resolve() except OSError: # Ignore errors when removing directories (might be in use) pass return len(removed) def main() -> int: args = parse_args() log("init", "zip_sequences starting up...") log("init", f"Working directory: {Path.cwd()}") log("init", f"Mode: {args.mode}, zipper: {ZIPPER_TYPE}, jobs arg: {args.jobs or 'auto'}") if ZIPPER_TYPE == "7z": exe = SEVEN_Z_EXE or "not found" max_inst = MAX_7Z_INSTANCES if MAX_7Z_INSTANCES is not None else "auto" log("init", f"7z executable: {exe}, Max7zInst: {max_inst}, method: {COMPRESSION_METHOD}, level: {COMPRESSION_LEVEL}") if args.mode == "expand": # For expand mode, use simple CPU-based worker calculation workers, _ = max_workers(args.jobs, work_items=None, verbose=args.verbose) run_expand(workers, verbose=args.verbose) return 0 # For zip mode, work items will be calculated in run_zip updated = run_zip(args.jobs, verbose=args.verbose) return 0 if updated >= 0 else 1 if __name__ == "__main__": try: raise SystemExit(main()) except Exception as exc: # broad to surface unexpected errors cleanly print(f"Sequence sync failed: {exc}", file=sys.stderr) raise