Files
ProjectStructure_HOME/zip_sequences.py

1144 lines
46 KiB
Python
Raw Normal View History

2025-11-06 10:10:29 -07:00
#!/usr/bin/env python3
"""Maintain zipped render sequences for Git hooks.
Default mode scans `Renders/`, produces ZIP archives under `Renders/_zipped/`,
and stages any updated archives so commits only track compact files. Switch to
`--mode expand` to inflate the tracked archives back into the ignored working
directories after checkouts or pulls.
"""
from __future__ import annotations
import argparse
import json
import os
import platform
2025-11-06 10:10:29 -07:00
import shutil
2025-11-10 10:36:28 -07:00
import subprocess
2025-11-06 10:10:29 -07:00
import sys
2025-11-10 10:36:28 -07:00
import tempfile
import time
2025-11-25 13:17:46 -07:00
import traceback
2025-11-06 10:10:29 -07:00
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Iterator, Sequence
# Try to import psutil for cross-platform RAM detection
try:
import psutil
HAS_PSUTIL = True
except ImportError:
HAS_PSUTIL = False
# For Windows fallback
if platform.system() == "Windows":
try:
import ctypes
HAS_CTYPES = True
except ImportError:
HAS_CTYPES = False
else:
HAS_CTYPES = False
2025-11-06 10:10:29 -07:00
RENDER_ROOT = Path("Renders")
ARCHIVE_ROOT = RENDER_ROOT / "_zipped"
SEQUENCE_EXTENSIONS = {
".png",
".jpg",
".jpeg",
".tif",
".tiff",
".exr",
}
STATE_SUFFIX = ".meta.json"
2025-11-08 02:36:20 -07:00
DEFAULT_CONFIG = {
2025-11-10 10:24:55 -07:00
"zipper": "7z",
2025-11-08 02:36:20 -07:00
"compression": 9,
"compressionMethod": "LZMA2", # Compression method: LZMA2 (multi-threaded), PPMd (single-threaded), BZip2, Deflate
2025-11-10 10:24:55 -07:00
"dailyFormat": "daily_YYMMDD",
"Max7zInst": 0, # Maximum concurrent 7z instances (0 = auto-calculate)
2025-11-08 02:36:20 -07:00
}
2025-11-12 09:00:46 -07:00
def log(mode: str, message: str, *, verbose_only: bool = False, verbose: bool = False) -> None:
if verbose_only and not verbose:
return
print(f"[{mode}] {message}", flush=True)
2025-11-08 02:36:20 -07:00
def load_config() -> dict:
2025-11-10 11:27:44 -07:00
# First try to load from project's .config folder (current working directory)
# Then fall back to ProjectStructure repo config (next to zip_sequences.py)
cwd = Path.cwd()
project_config = cwd / ".config" / "config.json"
repo_config = Path(__file__).resolve().with_name("config.json")
config_paths = [
("project", project_config),
("repo", repo_config),
]
2025-11-12 09:00:46 -07:00
log("init", "Loading configuration sources...")
2025-11-10 11:27:44 -07:00
for source, config_path in config_paths:
try:
if config_path.exists():
2025-11-12 09:00:46 -07:00
log("init", f"Reading {source} config at {config_path}")
2025-11-10 11:27:44 -07:00
text = config_path.read_text(encoding="utf-8")
try:
data = json.loads(text)
if isinstance(data, dict):
merged = DEFAULT_CONFIG.copy()
merged.update(data)
2025-11-12 09:00:46 -07:00
log("init", f"Configuration loaded from {source}")
2025-11-10 11:27:44 -07:00
return merged
except json.JSONDecodeError:
2025-11-12 09:00:46 -07:00
log("init", f"Config file at {config_path} is invalid JSON; skipping")
2025-11-10 11:27:44 -07:00
continue
except OSError:
2025-11-12 09:00:46 -07:00
log("init", f"Unable to read config at {config_path}; skipping")
2025-11-10 11:27:44 -07:00
continue
# If no config found, return defaults
2025-11-12 09:00:46 -07:00
log("init", "No config files found; using default settings")
2025-11-10 11:27:44 -07:00
return DEFAULT_CONFIG.copy()
2025-11-08 02:36:20 -07:00
CONFIG = load_config()
2025-11-10 10:24:55 -07:00
zipper_val = CONFIG.get("zipper", "7z")
# Handle both old boolean format and new string format
if isinstance(zipper_val, bool):
ZIPPER_TYPE = "7z" if zipper_val else "zip"
else:
ZIPPER_TYPE = str(zipper_val).lower()
2025-11-10 11:27:44 -07:00
2025-11-08 02:36:20 -07:00
COMPRESSION_LEVEL = CONFIG.get("compression", 9)
if isinstance(COMPRESSION_LEVEL, str):
try:
COMPRESSION_LEVEL = int(COMPRESSION_LEVEL)
except ValueError:
COMPRESSION_LEVEL = 9
if not isinstance(COMPRESSION_LEVEL, int):
COMPRESSION_LEVEL = 9
COMPRESSION_LEVEL = max(0, min(9, COMPRESSION_LEVEL))
COMPRESSION_METHOD = CONFIG.get("compressionMethod", "LZMA2")
# Validate compression method
valid_methods = {"LZMA2", "PPMd", "BZip2", "Deflate"}
if COMPRESSION_METHOD not in valid_methods:
COMPRESSION_METHOD = "LZMA2" # Default to LZMA2 for multi-threading support
MAX_7Z_INSTANCES = CONFIG.get("Max7zInst", 0)
if MAX_7Z_INSTANCES is not None:
if isinstance(MAX_7Z_INSTANCES, str):
try:
MAX_7Z_INSTANCES = int(MAX_7Z_INSTANCES)
except ValueError:
MAX_7Z_INSTANCES = 0
if not isinstance(MAX_7Z_INSTANCES, int) or MAX_7Z_INSTANCES < 1:
MAX_7Z_INSTANCES = 0
# Treat 0 as None (auto-calculate)
if MAX_7Z_INSTANCES == 0:
MAX_7Z_INSTANCES = None
2025-11-08 02:36:20 -07:00
SEVEN_Z_EXE: str | None = None
2025-11-10 10:36:28 -07:00
if ZIPPER_TYPE == "7z":
2025-11-08 02:36:20 -07:00
SEVEN_Z_EXE = shutil.which("7z") or shutil.which("7za")
2025-11-06 10:10:29 -07:00
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Sync render sequences with zipped archives.")
parser.add_argument(
"--mode",
choices=("zip", "expand"),
default="zip",
help="zip sequences for commit (default) or expand tracked archives",
)
parser.add_argument("--jobs", type=int, help="max parallel workers")
parser.add_argument("--verbose", action="store_true", help="print extra progress details")
return parser.parse_args()
def get_available_ram() -> int | None:
"""Get available RAM in bytes, reserving 20% for system.
Returns:
Available RAM in bytes, or None if detection fails.
"""
try:
if HAS_PSUTIL:
# Use psutil for cross-platform RAM detection
mem = psutil.virtual_memory()
# Reserve 20% for system, use 80% for compression jobs
available = int(mem.total * 0.8)
return available
elif HAS_CTYPES and platform.system() == "Windows":
# Windows fallback using ctypes
class MEMORYSTATUSEX(ctypes.Structure):
_fields_ = [
("dwLength", ctypes.c_ulong),
("dwMemoryLoad", ctypes.c_ulong),
("ullTotalPhys", ctypes.c_ulonglong),
("ullAvailPhys", ctypes.c_ulonglong),
("ullTotalPageFile", ctypes.c_ulonglong),
("ullAvailPageFile", ctypes.c_ulonglong),
("ullTotalVirtual", ctypes.c_ulonglong),
("ullAvailVirtual", ctypes.c_ulonglong),
("ullAvailExtendedVirtual", ctypes.c_ulonglong),
]
kernel32 = ctypes.windll.kernel32
kernel32.GlobalMemoryStatusEx.argtypes = [ctypes.POINTER(MEMORYSTATUSEX)]
kernel32.GlobalMemoryStatusEx.restype = ctypes.c_bool
mem_status = MEMORYSTATUSEX()
mem_status.dwLength = ctypes.sizeof(MEMORYSTATUSEX)
if kernel32.GlobalMemoryStatusEx(ctypes.byref(mem_status)):
# Reserve 20% for system, use 80% for compression jobs
available = int(mem_status.ullTotalPhys * 0.8)
return available
except Exception:
pass
return None
def estimate_ram_per_job(seq_dir: Path, seq_state: dict) -> int:
"""Estimate RAM usage per compression job based on folder size.
Args:
seq_dir: Path to the sequence directory
seq_state: State dictionary containing file information
Returns:
Estimated RAM usage in bytes
"""
# Calculate total folder size from seq_state
total_bytes = sum(entry.get("size", 0) for entry in seq_state.get("files", []))
if ZIPPER_TYPE == "7z":
2025-11-12 09:00:46 -07:00
# Fixed dictionary size: 1GB (1024MB)
# Actual observed usage: ~2GB per process regardless of sequence size
# Use a tight estimate to allow maximum concurrency
# Estimate 3GB per job (50% safety margin over observed ~2GB)
base_ram = 3 * 1024 * 1024 * 1024 # 3GB base estimate
# For very large sequences (>50GB), add small buffer
if total_bytes > 50 * 1024 * 1024 * 1024: # >50GB
# Add 1GB for very large sequences
estimated_ram = base_ram + (1 * 1024 * 1024 * 1024)
else:
estimated_ram = base_ram
return estimated_ram
else:
# zip compression is more memory-efficient
# Conservative estimate: 1GB per job
return 1024 * 1024 * 1024 # 1 GB
def max_workers(
requested: int | None,
work_items: list[tuple[Path, Path, Path, dict]] | None = None,
*,
verbose: bool = False
) -> tuple[int, int | None]:
"""Calculate maximum worker count based on CPU and RAM constraints.
Args:
requested: User-requested worker count (from --jobs)
work_items: List of work items (seq_dir, zip_path, state_path, seq_state)
verbose: Whether to log RAM-based calculations
Returns:
Tuple of (worker_count, per_job_memory_limit_bytes)
per_job_memory_limit_bytes is None if not using 7z or RAM detection failed
"""
2025-11-06 10:10:29 -07:00
cpu = os.cpu_count() or 1
cpu_limit = max(1, min(8, cpu))
2025-11-06 10:10:29 -07:00
if requested and requested > 0:
cpu_limit = min(requested, max(1, cpu))
# If no work items provided, return CPU-based limit
if work_items is None or len(work_items) == 0:
2025-11-12 09:00:46 -07:00
return (cpu_limit, {})
# Try to calculate RAM-based limit
available_ram = get_available_ram()
if available_ram is None:
# RAM detection failed, fall back to CPU limit
if verbose:
log("zip", "RAM detection failed, using CPU-based worker limit", verbose_only=True, verbose=verbose)
2025-11-12 09:00:46 -07:00
return (cpu_limit, {})
# For 7z: use fixed dictionary size and calculate workers
if ZIPPER_TYPE == "7z":
# Fixed dictionary size: 1GB (1024MB)
FIXED_DICT_SIZE_MB = 1024
fixed_dict_size_bytes = FIXED_DICT_SIZE_MB * 1024 * 1024
# Check if Max7zInst is configured
if MAX_7Z_INSTANCES is not None:
2025-11-11 16:31:09 -07:00
# Use configured maximum instances, but still respect user's --jobs and work items
final_limit = MAX_7Z_INSTANCES
2025-11-11 16:31:09 -07:00
num_work_items = len(work_items) if work_items else 0
if num_work_items > 0:
final_limit = min(final_limit, num_work_items)
if requested and requested > 0:
final_limit = min(final_limit, requested)
2025-11-12 09:00:46 -07:00
# Create RAM limits dict (all use fixed dict size, but return as dict for consistency)
ram_limits_dict: dict[Path, int] = {}
if work_items:
for seq_dir, _, _, _ in work_items:
ram_limits_dict[seq_dir] = fixed_dict_size_bytes
if verbose:
log(
"zip",
f"Using Max7zInst={MAX_7Z_INSTANCES} from config → "
2025-11-11 16:31:09 -07:00
f"work items: {num_work_items}, requested: {requested}, final: {final_limit}",
verbose_only=True,
verbose=verbose
)
2025-11-12 09:00:46 -07:00
return (final_limit, ram_limits_dict)
# Auto-calculate based on RAM if Max7zInst not configured
# Prioritize maximum concurrency: one worker per sequence when possible
if available_ram is not None:
# available_ram is already 80% of total (20% reserved for system)
2025-11-12 09:00:46 -07:00
# Use 95% of available RAM for compression jobs
2025-11-11 16:31:09 -07:00
compression_ram = int(available_ram * 0.95)
# Estimate RAM for each work item
2025-11-12 09:00:46 -07:00
work_items_with_ram: list[tuple[Path, int, tuple[Path, Path, Path, dict]]] = []
ram_limits_dict: dict[Path, int] = {}
for seq_dir, zip_path, state_path, seq_state in work_items:
try:
estimated_ram = estimate_ram_per_job(seq_dir, seq_state)
work_items_with_ram.append((seq_dir, estimated_ram, (seq_dir, zip_path, state_path, seq_state)))
ram_limits_dict[seq_dir] = estimated_ram
except Exception:
# If estimation fails, use a safe default (3GB)
default_ram = 3 * 1024 * 1024 * 1024 # 3GB
2025-11-12 09:00:46 -07:00
work_items_with_ram.append((seq_dir, default_ram, (seq_dir, zip_path, state_path, seq_state)))
ram_limits_dict[seq_dir] = default_ram
num_work_items = len(work_items) if work_items else 0
2025-11-12 09:00:46 -07:00
# Calculate total estimated RAM
total_estimated_ram = sum(ram for _, ram, _ in work_items_with_ram)
2025-11-12 09:00:46 -07:00
# If all sequences fit in available RAM, use one worker per sequence (maximum concurrency)
if total_estimated_ram <= compression_ram:
worker_count = num_work_items
else:
# Not all fit - use bin-packing to minimize workers
# Sort by estimated RAM (largest first) for bin-packing
work_items_with_ram.sort(key=lambda x: x[1], reverse=True)
2025-11-12 09:00:46 -07:00
# Bin-packing algorithm: pack largest items first
bins: list[list[tuple[Path, int, tuple[Path, Path, Path, dict]]]] = []
bin_remaining: list[int] = []
for seq_dir, estimated_ram, work_item in work_items_with_ram:
# Try to fit in existing bin
placed = False
for i, remaining in enumerate(bin_remaining):
if remaining >= estimated_ram:
bins[i].append((seq_dir, estimated_ram, work_item))
bin_remaining[i] -= estimated_ram
placed = True
break
# If doesn't fit, create new bin
if not placed:
bins.append([(seq_dir, estimated_ram, work_item)])
bin_remaining.append(compression_ram - estimated_ram)
# Worker count is number of bins
worker_count = len(bins)
2025-11-11 16:31:09 -07:00
# Cap at number of actual work items (can't have more workers than jobs)
if num_work_items > 0:
2025-11-12 09:00:46 -07:00
worker_count = min(worker_count, num_work_items)
2025-11-11 16:31:09 -07:00
2025-11-12 09:00:46 -07:00
# Respect user's --jobs if provided
if requested and requested > 0:
2025-11-12 09:00:46 -07:00
worker_count = min(worker_count, requested)
if verbose:
ram_gb = available_ram / (1024 ** 3)
compression_ram_gb = compression_ram / (1024 ** 3)
total_estimated_gb = total_estimated_ram / (1024 ** 3)
log(
"zip",
2025-11-12 09:00:46 -07:00
f"RAM: {ram_gb:.1f}GB available (80% of total), {compression_ram_gb:.1f}GB for compression (95%)",
verbose_only=True,
verbose=verbose
)
log(
"zip",
f"Estimated RAM per sequence: {total_estimated_gb:.1f}GB total across {num_work_items} sequences",
verbose_only=True,
verbose=verbose
)
if total_estimated_ram <= compression_ram:
log(
"zip",
f"All sequences fit in RAM → using {worker_count} workers (one per sequence)",
verbose_only=True,
verbose=verbose
)
else:
2025-11-12 09:00:46 -07:00
log(
"zip",
f"Using bin-packing: {worker_count} workers needed",
2025-11-12 09:00:46 -07:00
verbose_only=True,
verbose=verbose
)
log(
"zip",
f"Final worker count: {worker_count} (requested: {requested})",
verbose_only=True,
verbose=verbose
)
2025-11-12 09:00:46 -07:00
# Return worker count and RAM limits dict
return (worker_count, ram_limits_dict)
# RAM detection failed, use a safe default (no CPU limit)
default_limit = 4
2025-11-11 16:31:09 -07:00
num_work_items = len(work_items) if work_items else 0
if num_work_items > 0:
default_limit = min(default_limit, num_work_items)
if requested and requested > 0:
2025-11-11 16:31:09 -07:00
default_limit = min(default_limit, requested)
2025-11-12 09:00:46 -07:00
# Create RAM limits dict with safe defaults (12GB per job for 1GB dict)
ram_limits_dict: dict[Path, int] = {}
if work_items:
default_ram = 12 * 1024 * 1024 * 1024 # 12GB default
for seq_dir, _, _, _ in work_items:
ram_limits_dict[seq_dir] = default_ram
2025-11-11 16:31:09 -07:00
if verbose:
log(
"zip",
f"RAM detection failed and Max7zInst not set, using default worker limit → "
f"work items: {num_work_items}, requested: {requested}, final: {default_limit}",
verbose_only=True,
verbose=verbose
)
2025-11-12 09:00:46 -07:00
return (default_limit, ram_limits_dict)
# For zip compression, use existing estimation-based approach
# Estimate RAM per job for each work item
ram_estimates = []
2025-11-12 09:00:46 -07:00
ram_limits_dict: dict[Path, int] = {}
for seq_dir, zip_path, state_path, seq_state in work_items:
try:
estimated_ram = estimate_ram_per_job(seq_dir, seq_state)
ram_estimates.append(estimated_ram)
2025-11-12 09:00:46 -07:00
ram_limits_dict[seq_dir] = estimated_ram
except Exception:
# If estimation fails, use fallback estimate
2025-11-12 09:00:46 -07:00
fallback_ram = 1024 * 1024 * 1024 # 1GB fallback for zip
ram_estimates.append(fallback_ram)
ram_limits_dict[seq_dir] = fallback_ram
if not ram_estimates:
2025-11-12 09:00:46 -07:00
return (cpu_limit, {})
max_ram_per_job = max(ram_estimates)
ram_limit = max(1, available_ram // max_ram_per_job)
ram_limit = min(ram_limit, 6) # Conservative limit for zip
final_limit = min(cpu_limit, ram_limit)
if verbose:
ram_gb = available_ram / (1024 ** 3)
max_ram_gb = max_ram_per_job / (1024 ** 3)
log(
"zip",
f"RAM: {ram_gb:.1f}GB available, ~{max_ram_gb:.1f}GB per job → "
f"RAM limit: {ram_limit}, CPU limit: {cpu_limit}, final: {final_limit}",
verbose_only=True,
verbose=verbose
)
2025-11-12 09:00:46 -07:00
return (final_limit, ram_limits_dict)
2025-11-06 10:10:29 -07:00
2025-11-06 12:35:07 -07:00
def is_archive_path(path: Path) -> bool:
2025-11-10 19:32:31 -07:00
return any(part in ("_archive", "_CURRENT") for part in path.parts)
2025-11-06 12:35:07 -07:00
2025-11-25 14:22:44 -07:00
def find_sequence_dirs(root: Path, *, verbose: bool = False) -> Iterator[Path]:
seen_dirs = set() # Track directories we've already yielded to avoid duplicates
2025-11-06 10:10:29 -07:00
for dirpath, dirnames, filenames in os.walk(root):
path = Path(dirpath)
2025-11-10 19:32:31 -07:00
dirnames[:] = [d for d in dirnames if d not in ("_archive", "_CURRENT")]
2025-11-06 12:35:07 -07:00
if is_archive_path(path):
2025-11-25 14:22:44 -07:00
if verbose:
rel = path.relative_to(root) if path.is_relative_to(root) else path
log("scan", f"Skipping archive path: {rel}", verbose_only=True, verbose=verbose)
2025-11-06 10:10:29 -07:00
continue
2025-11-25 14:22:44 -07:00
# Check if this directory has sequence files directly
2025-11-06 10:10:29 -07:00
has_frames = any(Path(dirpath, f).suffix.lower() in SEQUENCE_EXTENSIONS for f in filenames)
if has_frames:
2025-11-25 14:22:44 -07:00
path_resolved = path.resolve()
if path_resolved not in seen_dirs:
seen_dirs.add(path_resolved)
yield path
elif verbose:
# Log directories that don't have sequence files for debugging
rel = path.relative_to(root) if path.is_relative_to(root) else path
if "scab" in path.name.lower():
# Special logging for directories that might be sequences
frame_extensions = [f for f in filenames if Path(f).suffix.lower() in SEQUENCE_EXTENSIONS]
all_files = list(path.iterdir()) if path.exists() else []
log("scan", f"Directory {rel}: {len(filenames)} files in dir, {len(frame_extensions)} with sequence extensions, {len(all_files)} total items", verbose_only=True, verbose=verbose)
# Check subdirectories
for subdir in dirnames[:5]: # Check first 5 subdirs
subdir_path = path / subdir
try:
subdir_files = list(subdir_path.iterdir()) if subdir_path.exists() else []
subdir_frame_files = [f for f in subdir_files if f.is_file() and f.suffix.lower() in SEQUENCE_EXTENSIONS]
if subdir_frame_files:
log("scan", f" Subdirectory {subdir} has {len(subdir_frame_files)} sequence files", verbose_only=True, verbose=verbose)
except (OSError, PermissionError):
pass
2025-11-06 10:10:29 -07:00
2025-11-06 12:35:07 -07:00
def iter_sequence_files(seq_dir: Path) -> Iterator[Path]:
for dirpath, dirnames, filenames in os.walk(seq_dir):
path = Path(dirpath)
2025-11-10 19:32:31 -07:00
dirnames[:] = [d for d in dirnames if d not in ("_archive", "_CURRENT")]
2025-11-06 12:35:07 -07:00
if is_archive_path(path):
continue
for filename in filenames:
yield path / filename
2025-11-06 10:10:29 -07:00
def compute_state(seq_dir: Path) -> dict:
entries = []
2025-11-06 12:35:07 -07:00
files = sorted(
iter_sequence_files(seq_dir),
key=lambda p: p.relative_to(seq_dir).as_posix(),
)
for file_path in files:
2025-11-06 10:10:29 -07:00
stat = file_path.stat()
entries.append(
{
2025-11-06 12:35:07 -07:00
"path": file_path.relative_to(seq_dir).as_posix(),
2025-11-06 10:10:29 -07:00
"size": stat.st_size,
"mtime_ns": stat.st_mtime_ns,
}
)
return {"files": entries}
def current_state(seq_dir: Path) -> dict:
if not seq_dir.exists() or not seq_dir.is_dir():
return {"files": []}
return compute_state(seq_dir)
def load_state(state_path: Path) -> dict | None:
if not state_path.exists():
return None
try:
return json.loads(state_path.read_text())
except json.JSONDecodeError:
return None
def state_changed(seq_state: dict, stored_state: dict | None) -> bool:
if stored_state is None:
return True
return seq_state != stored_state
def archive_path_for(seq_dir: Path) -> Path:
rel = seq_dir.relative_to(RENDER_ROOT)
2025-11-10 11:27:44 -07:00
suffix = ".7z" if ZIPPER_TYPE == "7z" else ".zip"
2025-11-25 14:22:44 -07:00
# Append suffix instead of replacing, since directory names might have dots (e.g., "scab_v2.1")
return ARCHIVE_ROOT / f"{rel}{suffix}"
2025-11-06 10:10:29 -07:00
def sequence_dir_for(zip_path: Path) -> Path:
rel = zip_path.relative_to(ARCHIVE_ROOT)
2025-11-25 14:22:44 -07:00
# Remove the archive suffix (.7z or .zip) from the end
# Handle both .7z and .zip extensions
rel_str = str(rel)
if rel_str.endswith(".7z"):
rel_str = rel_str[:-3]
elif rel_str.endswith(".zip"):
rel_str = rel_str[:-4]
return RENDER_ROOT / rel_str
2025-11-06 10:10:29 -07:00
def state_path_for(zip_path: Path) -> Path:
return zip_path.with_suffix(zip_path.suffix + STATE_SUFFIX)
def zip_sequence(seq_dir: Path, zip_path: Path, per_job_memory_limit: int | None = None, worker_count: int = 1, *, verbose: bool = False) -> None:
2025-11-10 10:36:28 -07:00
if ZIPPER_TYPE == "7z":
if SEVEN_Z_EXE is None:
2025-11-10 11:27:44 -07:00
raise RuntimeError(
"7z compression requested but 7z executable not found in PATH. "
"Please install 7z (e.g., via Chocolatey: choco install 7zip) "
"or set zipper to 'zip' in config.json"
)
2025-11-08 02:36:20 -07:00
zip_path.parent.mkdir(parents=True, exist_ok=True)
2025-11-10 11:27:44 -07:00
# If creating a .7z file, remove any existing .zip file for the same sequence
if zip_path.suffix == ".7z":
old_zip_path = zip_path.with_suffix(".zip")
if old_zip_path.exists():
old_zip_path.unlink(missing_ok=True)
old_state_path = state_path_for(old_zip_path)
if old_state_path.exists():
old_state_path.unlink(missing_ok=True)
2025-11-10 10:36:28 -07:00
# Build list of files to archive with relative paths
file_list = []
for file_path in iter_sequence_files(seq_dir):
rel_path = file_path.relative_to(seq_dir).as_posix()
file_list.append(rel_path)
if not file_list:
raise RuntimeError(f"No files found to archive in {seq_dir}")
2025-11-10 10:36:28 -07:00
2025-11-10 11:27:44 -07:00
# Create zip in temporary location first to avoid issues with corrupted existing files
temp_zip = None
list_file_path = None
2025-11-10 10:36:28 -07:00
try:
2025-11-10 11:27:44 -07:00
# Create temporary archive file path (but don't create the file - let 7z create it)
temp_zip_path = tempfile.mktemp(suffix=".7z", dir=zip_path.parent)
temp_zip = Path(temp_zip_path)
# Create list file with absolute path
fd, temp_path = tempfile.mkstemp(suffix=".lst", text=True)
list_file_path = Path(temp_path)
with os.fdopen(fd, "w", encoding="utf-8") as list_file:
for rel_path in file_list:
list_file.write(rel_path + "\n")
list_file.flush()
os.fsync(list_file.fileno()) # Ensure data is written to disk
# File is closed here by context manager, small delay to ensure OS releases handle
time.sleep(0.1)
2025-11-10 11:27:44 -07:00
# Use absolute paths for both list file and temp zip
list_file_abs = list_file_path.resolve()
2025-11-10 11:27:44 -07:00
temp_zip_abs = temp_zip.resolve()
# Create archive in temp location first (7z will create it fresh)
2025-11-10 10:36:28 -07:00
cmd = [
SEVEN_Z_EXE,
"a",
"-y",
"-bb0", # Suppress progress output
2025-11-10 10:36:28 -07:00
f"-mx={COMPRESSION_LEVEL}",
2025-11-10 11:27:44 -07:00
"-t7z", # Use 7z format, not zip
]
# Set compression method and memory/dictionary size based on method
2025-11-25 14:22:44 -07:00
# At compression level 0, use Copy (store) method for maximum speed
2025-11-12 09:00:46 -07:00
FIXED_DICT_SIZE_MB = 1024
2025-11-25 14:22:44 -07:00
if COMPRESSION_LEVEL == 0:
# Level 0 = no compression, just store files (fastest)
cmd.append("-m0=Copy")
# Copy method doesn't need threading, but enable it anyway for consistency
cmd.append("-mmt=on")
else:
2025-11-25 14:22:44 -07:00
# Compression levels 1-9: use configured compression method
if COMPRESSION_METHOD == "PPMd":
# PPMd: specify memory as part of method string
cmd.append(f"-m0=PPMd:mem={FIXED_DICT_SIZE_MB}m")
elif COMPRESSION_METHOD == "LZMA2":
# LZMA2: use -md for dictionary size
cmd.append("-m0=LZMA2")
cmd.append(f"-md={FIXED_DICT_SIZE_MB}m")
elif COMPRESSION_METHOD == "BZip2":
# BZip2: use -md for dictionary size (smaller max: 900KB)
max_bzip2_dict = min(FIXED_DICT_SIZE_MB, 900) # BZip2 max is 900KB
cmd.append("-m0=BZip2")
cmd.append(f"-md={max_bzip2_dict}k")
elif COMPRESSION_METHOD == "Deflate":
# Deflate: doesn't use dictionary size parameter
cmd.append("-m0=Deflate")
else:
# Fallback: use LZMA2
cmd.append("-m0=LZMA2")
cmd.append(f"-md={FIXED_DICT_SIZE_MB}m")
# CPU thread allocation: when there's only 1 worker, use all CPU cores
# When there are multiple workers, use auto mode to let 7z decide
# Note: PPMd is single-threaded and won't benefit from -mmt
cpu_cores = os.cpu_count() or 1
if COMPRESSION_METHOD == "PPMd":
# PPMd is single-threaded, so -mmt won't help
# But we can still set it for consistency
cmd.append("-mmt=on")
elif worker_count == 1:
# Single worker: use all CPU cores for maximum speed (LZMA2, BZip2, Deflate support this)
cmd.append(f"-mmt={cpu_cores}")
else:
# Multiple workers: use auto mode (7z will manage threads)
cmd.append("-mmt=on")
cmd.extend([
2025-11-10 11:27:44 -07:00
str(temp_zip_abs),
f"@{list_file_abs}",
])
# Log the command in verbose mode for debugging
if verbose:
cmd_str = " ".join(cmd)
log("zip", f"7z command: {cmd_str}", verbose_only=True, verbose=verbose)
2025-11-10 10:36:28 -07:00
result = subprocess.run(
cmd,
cwd=seq_dir,
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if result.returncode != 0:
error_msg = result.stderr.strip() if result.stderr else "Unknown error"
if result.stdout:
error_msg += f"\nstdout: {result.stdout.strip()}"
raise RuntimeError(f"7z compression failed: {error_msg}")
2025-11-10 11:27:44 -07:00
# Move temp zip to final location, replacing any existing file
if zip_path.exists():
zip_path.unlink()
temp_zip.replace(zip_path)
temp_zip = None # Mark as moved so we don't delete it
2025-11-10 10:36:28 -07:00
finally:
2025-11-10 11:27:44 -07:00
# Clean up temp zip if it wasn't moved
if temp_zip and temp_zip.exists():
try:
temp_zip.unlink(missing_ok=True)
except OSError:
pass
# Clean up list file, with retry in case 7z still has it open
if list_file_path and list_file_path.exists():
for attempt in range(3):
try:
list_file_path.unlink(missing_ok=True)
break
except PermissionError:
if attempt < 2:
time.sleep(0.1) # Wait 100ms before retry
else:
# Last attempt failed, just log and continue
# The temp file will be cleaned up by the OS eventually
pass
2025-11-08 02:36:20 -07:00
return
2025-11-10 11:27:44 -07:00
# Use zipfile (only if ZIPPER_TYPE == "zip")
if ZIPPER_TYPE == "zip":
from zipfile import ZIP_DEFLATED, ZIP_STORED, ZipFile
2025-11-06 10:10:29 -07:00
2025-11-10 11:27:44 -07:00
zip_path.parent.mkdir(parents=True, exist_ok=True)
if COMPRESSION_LEVEL <= 0:
compression = ZIP_STORED
zip_kwargs = {}
else:
compression = ZIP_DEFLATED
zip_kwargs = {"compresslevel": COMPRESSION_LEVEL}
2025-11-08 02:36:20 -07:00
2025-11-10 11:27:44 -07:00
with ZipFile(zip_path, "w", compression=compression, **zip_kwargs) as archive:
for file_path in iter_sequence_files(seq_dir):
archive.write(file_path, arcname=file_path.relative_to(seq_dir).as_posix())
return
# Unknown ZIPPER_TYPE - fail with clear error
raise RuntimeError(
f"Unsupported ZIPPER_TYPE: {ZIPPER_TYPE!r}. "
f"Expected '7z' or 'zip'. "
f"Config zipper value: {CONFIG.get('zipper', 'not set')!r}"
)
2025-11-06 10:10:29 -07:00
def expand_sequence(zip_path: Path, seq_state: dict) -> None:
target_dir = sequence_dir_for(zip_path)
if target_dir.exists():
shutil.rmtree(target_dir)
target_dir.mkdir(parents=True, exist_ok=True)
2025-11-10 10:36:28 -07:00
if ZIPPER_TYPE == "7z":
if SEVEN_Z_EXE is None:
2025-11-10 11:27:44 -07:00
raise RuntimeError(
"7z extraction requested but 7z executable not found in PATH. "
"Please install 7z or set zipper to 'zip' in config.json"
)
2025-11-08 02:36:20 -07:00
cmd = [
SEVEN_Z_EXE,
"x",
"-y",
str(zip_path),
f"-o{target_dir}",
]
2025-11-10 11:27:44 -07:00
result = subprocess.run(
cmd,
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if result.returncode != 0:
error_msg = result.stderr.strip() if result.stderr else "Unknown error"
if result.stdout:
error_msg += f"\nstdout: {result.stdout.strip()}"
raise RuntimeError(f"7z extraction failed: {error_msg}")
elif ZIPPER_TYPE == "zip":
2025-11-08 02:36:20 -07:00
from zipfile import ZipFile
with ZipFile(zip_path, "r") as archive:
archive.extractall(target_dir)
2025-11-10 11:27:44 -07:00
else:
raise RuntimeError(
f"Unsupported ZIPPER_TYPE: {ZIPPER_TYPE!r}. "
f"Expected '7z' or 'zip'. "
f"Config zipper value: {CONFIG.get('zipper', 'not set')!r}"
)
2025-11-06 10:10:29 -07:00
for entry in seq_state.get("files", []):
file_path = target_dir / entry["path"]
if file_path.exists():
os.utime(file_path, ns=(entry["mtime_ns"], entry["mtime_ns"]))
def process_zip(seq_dir: Path, zip_path: Path, state_path: Path, seq_state: dict, per_job_memory_limit: int | None, worker_count: int, *, verbose: bool) -> Sequence[Path]:
2025-11-06 10:10:29 -07:00
log("zip", f"{seq_dir} -> {zip_path}", verbose_only=True, verbose=verbose)
zip_sequence(seq_dir, zip_path, per_job_memory_limit, worker_count, verbose=verbose)
2025-11-06 10:10:29 -07:00
state_path.write_text(json.dumps(seq_state, indent=2))
return (zip_path, state_path)
def process_expand(zip_path: Path, state: dict, *, verbose: bool) -> None:
log("expand", f"{zip_path} -> {sequence_dir_for(zip_path)}", verbose_only=True, verbose=verbose)
expand_sequence(zip_path, state)
def run_zip(requested_workers: int | None, *, verbose: bool) -> int:
2025-11-06 10:10:29 -07:00
work_items: list[tuple[Path, Path, Path, dict]] = []
2025-11-12 09:00:46 -07:00
log("init", f"Scanning sequences under {RENDER_ROOT.resolve()}")
total_scanned = 0
quick_skipped = 0
state_skipped = 0
empty_dirs = 0
queued = 0
2025-11-06 10:10:29 -07:00
if RENDER_ROOT.exists():
2025-11-25 14:22:44 -07:00
for seq_dir in find_sequence_dirs(RENDER_ROOT, verbose=verbose):
2025-11-12 09:00:46 -07:00
total_scanned += 1
rel = seq_dir.relative_to(RENDER_ROOT)
if total_scanned <= 5 or total_scanned % 10 == 0:
log("scan", f"[{total_scanned}] Inspecting {rel}")
2025-11-10 11:27:44 -07:00
# Get the target archive path (will be .7z if ZIPPER_TYPE is "7z")
2025-11-06 10:10:29 -07:00
zip_path = archive_path_for(seq_dir)
state_path = state_path_for(zip_path)
2025-11-10 11:27:44 -07:00
# Quick check: if archive exists, load stored state first (fast)
stored_state = load_state(state_path)
2025-11-10 11:27:44 -07:00
# Check if we need to upgrade from .zip to .7z
old_zip_path = None
old_stored_state = None
2025-11-10 11:27:44 -07:00
if ZIPPER_TYPE == "7z":
old_zip_path = zip_path.with_suffix(".zip")
if old_zip_path.exists():
old_state_path = state_path_for(old_zip_path)
old_stored_state = load_state(old_state_path)
# If old .zip exists and .7z doesn't, use old .zip's state for comparison
if not zip_path.exists() and old_stored_state is not None:
stored_state = old_stored_state
# If .7z archive exists and we have stored state, do quick check before computing full state
if zip_path.exists() and stored_state is not None:
# Quick check: if directory mtime is older than archive, likely unchanged
try:
dir_mtime = seq_dir.stat().st_mtime_ns
archive_mtime = zip_path.stat().st_mtime_ns
# If directory wasn't modified since archive was created, skip state computation
if dir_mtime <= archive_mtime:
2025-11-12 09:00:46 -07:00
quick_skipped += 1
if quick_skipped <= 5:
log("scan", f"Skipping {rel} (unchanged since archive)")
# Still need to check for old .zip cleanup (we have .7z, so .zip is obsolete)
if old_zip_path and old_zip_path.exists():
old_zip_path.unlink(missing_ok=True)
old_state_path = state_path_for(old_zip_path)
if old_state_path.exists():
old_state_path.unlink(missing_ok=True)
2025-11-10 11:27:44 -07:00
continue
except OSError:
# If stat fails, fall through to full state computation
pass
2025-11-10 11:27:44 -07:00
# Compute current state only if we need to
seq_state = compute_state(seq_dir)
if not seq_state["files"]:
2025-11-12 09:00:46 -07:00
empty_dirs += 1
if empty_dirs <= 5:
log("scan", f"{rel} has no files; skipping")
continue
# Check if state changed
if stored_state is not None and not state_changed(seq_state, stored_state):
# Metadata matches stored state
2025-11-12 09:00:46 -07:00
state_skipped += 1
if state_skipped <= 5:
log("scan", f"{rel} metadata unchanged; archive up to date")
if zip_path.exists():
# .7z exists and is up to date, clean up old .zip if it exists
if old_zip_path and old_zip_path.exists():
old_zip_path.unlink(missing_ok=True)
old_state_path = state_path_for(old_zip_path)
if old_state_path.exists():
old_state_path.unlink(missing_ok=True)
elif old_zip_path and old_zip_path.exists() and old_stored_state is not None:
# .7z doesn't exist, but .zip exists and metadata matches
# Keep the .zip file, don't create .7z
continue
else:
# No archive exists, but state matches (shouldn't happen, but be safe)
continue
2025-11-06 10:10:29 -07:00
work_items.append((seq_dir, zip_path, state_path, seq_state))
2025-11-12 09:00:46 -07:00
queued += 1
if queued <= 5 or queued % 5 == 0:
total_bytes = sum(entry.get("size", 0) for entry in seq_state.get("files", []))
size_gb = total_bytes / (1024 ** 3)
log("scan", f"Queued {rel} for compression (~{size_gb:.2f}GB) [{queued} total]")
2025-11-06 10:10:29 -07:00
if not work_items:
2025-11-06 11:02:59 -07:00
if not RENDER_ROOT.exists():
log("zip", "Render root 'Renders' not found; nothing to zip.")
else:
log("zip", "Archives already up to date; no sequences needed zipping.")
2025-11-12 09:00:46 -07:00
log(
"scan",
f"Summary: scanned {total_scanned}, quick-skipped {quick_skipped}, "
f"state-skipped {state_skipped}, empty {empty_dirs}, queued {queued}",
)
2025-11-25 13:17:46 -07:00
removed = cleanup_orphan_archives(verbose=verbose)
if removed:
log("zip", f"Removed {removed} orphan archive(s).", verbose=verbose)
2025-11-06 10:10:29 -07:00
return 0
# Calculate RAM-aware worker count based on work items
2025-11-12 09:00:46 -07:00
worker_count, ram_limits_dict = max_workers(requested_workers, work_items, verbose=verbose)
log(
"init",
f"Preparing to compress {len(work_items)} sequence(s) with {worker_count} worker(s)",
)
2025-11-06 10:10:29 -07:00
updated_paths: list[Path] = []
total = len(work_items)
completed = 0
with ThreadPoolExecutor(max_workers=worker_count) as executor:
future_map = {
executor.submit(process_zip, seq_dir, zip_path, state_path, seq_state, ram_limits_dict.get(seq_dir), worker_count, verbose=verbose): seq_dir
2025-11-06 10:10:29 -07:00
for seq_dir, zip_path, state_path, seq_state in work_items
}
for future in as_completed(future_map):
updated_paths.extend(future.result())
completed += 1
seq_dir = future_map[future]
rel = seq_dir.relative_to(RENDER_ROOT)
log("zip", f"{completed}/{total} {rel}")
updated_count = len(updated_paths) // 2
log("zip", f"Updated {updated_count} sequence archive(s).", verbose=verbose)
if updated_paths:
log(
"zip",
"Archives updated. Stage manually with `git add Renders/_zipped`, if desired.",
verbose_only=True,
verbose=verbose,
)
2025-11-12 09:00:46 -07:00
log(
"scan",
f"Summary: scanned {total_scanned}, quick-skipped {quick_skipped}, "
f"state-skipped {state_skipped}, empty {empty_dirs}, queued {queued}",
)
2025-11-06 10:10:29 -07:00
removed = cleanup_orphan_archives(verbose=verbose)
if removed:
log("zip", f"Removed {removed} orphan archive(s).", verbose=verbose)
return updated_count
def run_expand(worker_count: int, *, verbose: bool) -> int:
if not ARCHIVE_ROOT.exists():
2025-11-06 11:02:59 -07:00
log("expand", "No archives to expand (missing 'Renders/_zipped').")
2025-11-06 10:10:29 -07:00
return 0
work_items: list[tuple[Path, dict]] = []
2025-11-10 11:27:44 -07:00
# Look for both .zip and .7z archives
archive_patterns = ["*.zip", "*.7z"]
for pattern in archive_patterns:
for zip_path in ARCHIVE_ROOT.rglob(pattern):
state_path = state_path_for(zip_path)
seq_state = load_state(state_path)
if seq_state is None:
log("expand", f"Skipping {zip_path} (missing metadata)")
continue
2025-11-06 10:10:29 -07:00
2025-11-10 11:27:44 -07:00
target_dir = sequence_dir_for(zip_path)
if current_state(target_dir) == seq_state:
continue
2025-11-06 10:10:29 -07:00
2025-11-10 11:27:44 -07:00
work_items.append((zip_path, seq_state))
2025-11-06 10:10:29 -07:00
if not work_items:
2025-11-06 11:02:59 -07:00
log("expand", "Working folders already match archives; nothing to expand.")
2025-11-06 10:10:29 -07:00
return 0
total = len(work_items)
completed = 0
with ThreadPoolExecutor(max_workers=worker_count) as executor:
future_map = {
executor.submit(process_expand, zip_path, seq_state, verbose=verbose): zip_path
for zip_path, seq_state in work_items
}
for future in as_completed(future_map):
future.result()
completed += 1
zip_path = future_map[future]
rel = zip_path.relative_to(ARCHIVE_ROOT)
log("expand", f"{completed}/{total} {rel}")
log("expand", f"Refreshed {len(work_items)} sequence folder(s).", verbose=verbose)
return len(work_items)
def cleanup_orphan_archives(*, verbose: bool) -> int:
if not ARCHIVE_ROOT.exists():
2025-11-25 13:17:46 -07:00
log("zip", "Archive root does not exist; nothing to clean up.", verbose_only=True, verbose=verbose)
2025-11-06 10:10:29 -07:00
return 0
removed: list[Path] = []
2025-11-25 13:17:46 -07:00
log("zip", f"Scanning for orphan archives in {ARCHIVE_ROOT.resolve()}", verbose_only=True, verbose=verbose)
2025-11-06 10:10:29 -07:00
2025-11-10 11:27:44 -07:00
# Look for both .zip and .7z archives
archive_patterns = ["*.zip", "*.7z"]
for pattern in archive_patterns:
2025-11-25 13:17:46 -07:00
try:
for zip_path in ARCHIVE_ROOT.rglob(pattern):
try:
# Resolve to absolute paths for consistent checking
zip_path_abs = zip_path.resolve()
# Calculate state path BEFORE checking/removing archive
state_path = state_path_for(zip_path)
state_path_abs = state_path.resolve()
# Calculate sequence directory using sequence_dir_for
# This function works with paths relative to ARCHIVE_ROOT
seq_dir = sequence_dir_for(zip_path)
seq_dir_abs = seq_dir.resolve()
# Check if sequence directory exists and is actually a directory
if seq_dir_abs.exists() and seq_dir_abs.is_dir():
log("zip", f"Archive {zip_path.relative_to(ARCHIVE_ROOT)} has matching sequence directory; keeping", verbose_only=True, verbose=verbose)
continue
2025-11-06 10:10:29 -07:00
2025-11-25 13:17:46 -07:00
# Sequence directory doesn't exist - this is an orphan archive
rel = zip_path.relative_to(ARCHIVE_ROOT)
log("zip", f"Removing orphan archive {rel}", verbose_only=False, verbose=verbose)
2025-11-06 10:10:29 -07:00
2025-11-25 13:17:46 -07:00
# Remove archive file
if zip_path_abs.exists():
zip_path_abs.unlink()
log("zip", f"Deleted archive: {rel}", verbose_only=True, verbose=verbose)
# Remove state file if it exists
if state_path_abs.exists():
state_path_abs.unlink()
state_rel = state_path.relative_to(ARCHIVE_ROOT)
log("zip", f"Removed orphan metadata {state_rel}", verbose_only=False, verbose=verbose)
removed.append(zip_path_abs)
except Exception as e:
# Log error but continue processing other archives
try:
rel = zip_path.relative_to(ARCHIVE_ROOT)
except:
rel = zip_path
log("zip", f"Error processing archive {rel}: {e}", verbose_only=True, verbose=verbose)
log("zip", f"Traceback: {traceback.format_exc()}", verbose_only=True, verbose=verbose)
continue
except Exception as e:
log("zip", f"Error scanning for {pattern} archives: {e}", verbose_only=True, verbose=verbose)
log("zip", f"Traceback: {traceback.format_exc()}", verbose_only=True, verbose=verbose)
continue
2025-11-06 10:10:29 -07:00
if not removed:
2025-11-25 13:17:46 -07:00
log("zip", "No orphan archives found.", verbose_only=True, verbose=verbose)
2025-11-06 10:10:29 -07:00
return 0
2025-11-25 13:17:46 -07:00
# Clean up empty parent directories
archive_root_abs = ARCHIVE_ROOT.resolve()
for parent in sorted({p.resolve().parent for p in removed}, key=lambda p: len(p.parts), reverse=True):
parent_resolved = parent.resolve()
if not parent_resolved.exists():
2025-11-06 10:10:29 -07:00
continue
2025-11-25 13:17:46 -07:00
try:
while parent_resolved != archive_root_abs and not any(parent_resolved.iterdir()):
parent_resolved.rmdir()
parent_resolved = parent_resolved.parent.resolve()
except OSError:
# Ignore errors when removing directories (might be in use)
pass
2025-11-06 10:10:29 -07:00
return len(removed)
def main() -> int:
args = parse_args()
2025-11-12 09:00:46 -07:00
log("init", "zip_sequences starting up...")
log("init", f"Working directory: {Path.cwd()}")
log("init", f"Mode: {args.mode}, zipper: {ZIPPER_TYPE}, jobs arg: {args.jobs or 'auto'}")
if ZIPPER_TYPE == "7z":
exe = SEVEN_Z_EXE or "not found"
max_inst = MAX_7Z_INSTANCES if MAX_7Z_INSTANCES is not None else "auto"
log("init", f"7z executable: {exe}, Max7zInst: {max_inst}, method: {COMPRESSION_METHOD}, level: {COMPRESSION_LEVEL}")
2025-11-06 10:10:29 -07:00
if args.mode == "expand":
# For expand mode, use simple CPU-based worker calculation
workers, _ = max_workers(args.jobs, work_items=None, verbose=args.verbose)
2025-11-06 10:10:29 -07:00
run_expand(workers, verbose=args.verbose)
return 0
# For zip mode, work items will be calculated in run_zip
updated = run_zip(args.jobs, verbose=args.verbose)
2025-11-06 10:10:29 -07:00
return 0 if updated >= 0 else 1
if __name__ == "__main__":
try:
raise SystemExit(main())
except Exception as exc: # broad to surface unexpected errors cleanly
print(f"Sequence sync failed: {exc}", file=sys.stderr)
raise