Files
ProjectStructure_HOME/zip_sequences.py

486 lines
16 KiB
Python

#!/usr/bin/env python3
"""Maintain zipped render sequences for Git hooks.
Default mode scans `Renders/`, produces ZIP archives under `Renders/_zipped/`,
and stages any updated archives so commits only track compact files. Switch to
`--mode expand` to inflate the tracked archives back into the ignored working
directories after checkouts or pulls.
"""
from __future__ import annotations
import argparse
import json
import os
import shutil
import subprocess
import sys
import tempfile
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Iterator, Sequence
RENDER_ROOT = Path("Renders")
ARCHIVE_ROOT = RENDER_ROOT / "_zipped"
SEQUENCE_EXTENSIONS = {
".png",
".jpg",
".jpeg",
".tif",
".tiff",
".exr",
}
STATE_SUFFIX = ".meta.json"
CONFIG_PATH = Path(__file__).resolve().with_name("config.json")
DEFAULT_CONFIG = {
"zipper": "7z",
"compression": 9,
"dailyFormat": "daily_YYMMDD",
}
def load_config() -> dict:
try:
text = CONFIG_PATH.read_text(encoding="utf-8")
except FileNotFoundError:
return DEFAULT_CONFIG.copy()
except OSError:
return DEFAULT_CONFIG.copy()
try:
data = json.loads(text)
except json.JSONDecodeError:
return DEFAULT_CONFIG.copy()
if not isinstance(data, dict):
return DEFAULT_CONFIG.copy()
merged = DEFAULT_CONFIG.copy()
merged.update(data)
return merged
CONFIG = load_config()
zipper_val = CONFIG.get("zipper", "7z")
# Handle both old boolean format and new string format
if isinstance(zipper_val, bool):
ZIPPER_TYPE = "7z" if zipper_val else "zip"
else:
ZIPPER_TYPE = str(zipper_val).lower()
COMPRESSION_LEVEL = CONFIG.get("compression", 9)
if isinstance(COMPRESSION_LEVEL, str):
try:
COMPRESSION_LEVEL = int(COMPRESSION_LEVEL)
except ValueError:
COMPRESSION_LEVEL = 9
if not isinstance(COMPRESSION_LEVEL, int):
COMPRESSION_LEVEL = 9
COMPRESSION_LEVEL = max(0, min(9, COMPRESSION_LEVEL))
SEVEN_Z_EXE: str | None = None
if ZIPPER_TYPE == "7z":
SEVEN_Z_EXE = shutil.which("7z") or shutil.which("7za")
if SEVEN_Z_EXE is None:
print("[zip] Warning: 7z compression requested but no 7z executable was found in PATH.", file=sys.stderr)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Sync render sequences with zipped archives.")
parser.add_argument(
"--mode",
choices=("zip", "expand"),
default="zip",
help="zip sequences for commit (default) or expand tracked archives",
)
parser.add_argument("--jobs", type=int, help="max parallel workers")
parser.add_argument("--verbose", action="store_true", help="print extra progress details")
return parser.parse_args()
def max_workers(requested: int | None) -> int:
cpu = os.cpu_count() or 1
limit = max(1, min(8, cpu))
if requested and requested > 0:
return min(requested, max(1, cpu))
return limit
def log(mode: str, message: str, *, verbose_only: bool = False, verbose: bool = False) -> None:
if verbose_only and not verbose:
return
print(f"[{mode}] {message}")
def is_archive_path(path: Path) -> bool:
return any(part == "_archive" for part in path.parts)
def find_sequence_dirs(root: Path) -> Iterator[Path]:
for dirpath, dirnames, filenames in os.walk(root):
path = Path(dirpath)
dirnames[:] = [d for d in dirnames if d != "_archive"]
if is_archive_path(path):
continue
has_frames = any(Path(dirpath, f).suffix.lower() in SEQUENCE_EXTENSIONS for f in filenames)
if has_frames:
yield path
def iter_sequence_files(seq_dir: Path) -> Iterator[Path]:
for dirpath, dirnames, filenames in os.walk(seq_dir):
path = Path(dirpath)
dirnames[:] = [d for d in dirnames if d != "_archive"]
if is_archive_path(path):
continue
for filename in filenames:
yield path / filename
def compute_state(seq_dir: Path) -> dict:
entries = []
files = sorted(
iter_sequence_files(seq_dir),
key=lambda p: p.relative_to(seq_dir).as_posix(),
)
for file_path in files:
stat = file_path.stat()
entries.append(
{
"path": file_path.relative_to(seq_dir).as_posix(),
"size": stat.st_size,
"mtime_ns": stat.st_mtime_ns,
}
)
return {"files": entries}
def current_state(seq_dir: Path) -> dict:
if not seq_dir.exists() or not seq_dir.is_dir():
return {"files": []}
return compute_state(seq_dir)
def load_state(state_path: Path) -> dict | None:
if not state_path.exists():
return None
try:
return json.loads(state_path.read_text())
except json.JSONDecodeError:
return None
def state_changed(seq_state: dict, stored_state: dict | None) -> bool:
if stored_state is None:
return True
return seq_state != stored_state
def archive_path_for(seq_dir: Path) -> Path:
rel = seq_dir.relative_to(RENDER_ROOT)
return (ARCHIVE_ROOT / rel).with_suffix(".zip")
def sequence_dir_for(zip_path: Path) -> Path:
rel = zip_path.relative_to(ARCHIVE_ROOT)
return (RENDER_ROOT / rel).with_suffix("")
def state_path_for(zip_path: Path) -> Path:
return zip_path.with_suffix(zip_path.suffix + STATE_SUFFIX)
def zip_sequence(seq_dir: Path, zip_path: Path) -> None:
if ZIPPER_TYPE == "7z":
if SEVEN_Z_EXE is None:
raise RuntimeError("7z compression requested but 7z executable not found in PATH")
zip_path.parent.mkdir(parents=True, exist_ok=True)
# Remove existing zip file if it exists to avoid corruption issues
if zip_path.exists():
try:
zip_path.unlink()
except OSError:
# If deletion fails, try to overwrite with -aoa flag
pass
# Build list of files to archive with relative paths
file_list = []
for file_path in iter_sequence_files(seq_dir):
rel_path = file_path.relative_to(seq_dir).as_posix()
file_list.append(rel_path)
if not file_list:
raise RuntimeError(f"No files found to archive in {seq_dir}")
# Use a temporary list file to avoid Windows command line length limits
list_file_path = None
try:
# Create list file with absolute path
fd, temp_path = tempfile.mkstemp(suffix=".lst", text=True)
list_file_path = Path(temp_path)
with os.fdopen(fd, "w", encoding="utf-8") as list_file:
for rel_path in file_list:
list_file.write(rel_path + "\n")
list_file.flush()
os.fsync(list_file.fileno()) # Ensure data is written to disk
# File is closed here by context manager, small delay to ensure OS releases handle
time.sleep(0.1)
# Use absolute path for list file in 7z command
list_file_abs = list_file_path.resolve()
# Use -aoa to overwrite all files, -bb0 to suppress progress output
cmd = [
SEVEN_Z_EXE,
"a",
"-y",
"-aoa", # Overwrite all existing files
"-bb0", # Suppress progress output
f"-mx={COMPRESSION_LEVEL}",
"-tzip",
str(zip_path),
f"@{list_file_abs}",
]
result = subprocess.run(
cmd,
cwd=seq_dir,
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if result.returncode != 0:
error_msg = result.stderr.strip() if result.stderr else "Unknown error"
if result.stdout:
error_msg += f"\nstdout: {result.stdout.strip()}"
raise RuntimeError(f"7z compression failed: {error_msg}")
finally:
# Clean up list file, with retry in case 7z still has it open
if list_file_path and list_file_path.exists():
for attempt in range(3):
try:
list_file_path.unlink(missing_ok=True)
break
except PermissionError:
if attempt < 2:
time.sleep(0.1) # Wait 100ms before retry
else:
# Last attempt failed, just log and continue
# The temp file will be cleaned up by the OS eventually
pass
return
# Use zipfile (ZIPPER_TYPE == "zip" or fallback)
from zipfile import ZIP_DEFLATED, ZIP_STORED, ZipFile
zip_path.parent.mkdir(parents=True, exist_ok=True)
if COMPRESSION_LEVEL <= 0:
compression = ZIP_STORED
zip_kwargs = {}
else:
compression = ZIP_DEFLATED
zip_kwargs = {"compresslevel": COMPRESSION_LEVEL}
with ZipFile(zip_path, "w", compression=compression, **zip_kwargs) as archive:
for file_path in iter_sequence_files(seq_dir):
archive.write(file_path, arcname=file_path.relative_to(seq_dir).as_posix())
def expand_sequence(zip_path: Path, seq_state: dict) -> None:
target_dir = sequence_dir_for(zip_path)
if target_dir.exists():
shutil.rmtree(target_dir)
target_dir.mkdir(parents=True, exist_ok=True)
if ZIPPER_TYPE == "7z":
if SEVEN_Z_EXE is None:
raise RuntimeError("7z extraction requested but 7z executable not found in PATH")
cmd = [
SEVEN_Z_EXE,
"x",
"-y",
str(zip_path),
f"-o{target_dir}",
]
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
else:
from zipfile import ZipFile
with ZipFile(zip_path, "r") as archive:
archive.extractall(target_dir)
for entry in seq_state.get("files", []):
file_path = target_dir / entry["path"]
if file_path.exists():
os.utime(file_path, ns=(entry["mtime_ns"], entry["mtime_ns"]))
def process_zip(seq_dir: Path, zip_path: Path, state_path: Path, seq_state: dict, *, verbose: bool) -> Sequence[Path]:
log("zip", f"{seq_dir} -> {zip_path}", verbose_only=True, verbose=verbose)
zip_sequence(seq_dir, zip_path)
state_path.write_text(json.dumps(seq_state, indent=2))
return (zip_path, state_path)
def process_expand(zip_path: Path, state: dict, *, verbose: bool) -> None:
log("expand", f"{zip_path} -> {sequence_dir_for(zip_path)}", verbose_only=True, verbose=verbose)
expand_sequence(zip_path, state)
def run_zip(worker_count: int, *, verbose: bool) -> int:
work_items: list[tuple[Path, Path, Path, dict]] = []
if RENDER_ROOT.exists():
for seq_dir in find_sequence_dirs(RENDER_ROOT):
seq_state = compute_state(seq_dir)
if not seq_state["files"]:
continue
zip_path = archive_path_for(seq_dir)
state_path = state_path_for(zip_path)
stored_state = load_state(state_path)
if not state_changed(seq_state, stored_state):
continue
work_items.append((seq_dir, zip_path, state_path, seq_state))
if not work_items:
if not RENDER_ROOT.exists():
log("zip", "Render root 'Renders' not found; nothing to zip.")
else:
log("zip", "Archives already up to date; no sequences needed zipping.")
return 0
updated_paths: list[Path] = []
total = len(work_items)
completed = 0
with ThreadPoolExecutor(max_workers=worker_count) as executor:
future_map = {
executor.submit(process_zip, seq_dir, zip_path, state_path, seq_state, verbose=verbose): seq_dir
for seq_dir, zip_path, state_path, seq_state in work_items
}
for future in as_completed(future_map):
updated_paths.extend(future.result())
completed += 1
seq_dir = future_map[future]
rel = seq_dir.relative_to(RENDER_ROOT)
log("zip", f"{completed}/{total} {rel}")
updated_count = len(updated_paths) // 2
log("zip", f"Updated {updated_count} sequence archive(s).", verbose=verbose)
if updated_paths:
log(
"zip",
"Archives updated. Stage manually with `git add Renders/_zipped`, if desired.",
verbose_only=True,
verbose=verbose,
)
removed = cleanup_orphan_archives(verbose=verbose)
if removed:
log("zip", f"Removed {removed} orphan archive(s).", verbose=verbose)
return updated_count
def run_expand(worker_count: int, *, verbose: bool) -> int:
if not ARCHIVE_ROOT.exists():
log("expand", "No archives to expand (missing 'Renders/_zipped').")
return 0
work_items: list[tuple[Path, dict]] = []
for zip_path in ARCHIVE_ROOT.rglob("*.zip"):
state_path = state_path_for(zip_path)
seq_state = load_state(state_path)
if seq_state is None:
log("expand", f"Skipping {zip_path} (missing metadata)")
continue
target_dir = sequence_dir_for(zip_path)
if current_state(target_dir) == seq_state:
continue
work_items.append((zip_path, seq_state))
if not work_items:
log("expand", "Working folders already match archives; nothing to expand.")
return 0
total = len(work_items)
completed = 0
with ThreadPoolExecutor(max_workers=worker_count) as executor:
future_map = {
executor.submit(process_expand, zip_path, seq_state, verbose=verbose): zip_path
for zip_path, seq_state in work_items
}
for future in as_completed(future_map):
future.result()
completed += 1
zip_path = future_map[future]
rel = zip_path.relative_to(ARCHIVE_ROOT)
log("expand", f"{completed}/{total} {rel}")
log("expand", f"Refreshed {len(work_items)} sequence folder(s).", verbose=verbose)
return len(work_items)
def cleanup_orphan_archives(*, verbose: bool) -> int:
if not ARCHIVE_ROOT.exists():
return 0
removed: list[Path] = []
for zip_path in ARCHIVE_ROOT.rglob("*.zip"):
seq_dir = sequence_dir_for(zip_path)
if seq_dir.exists():
continue
rel = zip_path.relative_to(ARCHIVE_ROOT)
log("zip", f"Removing orphan archive {rel}", verbose_only=True, verbose=verbose)
zip_path.unlink(missing_ok=True)
state_path = state_path_for(zip_path)
if state_path.exists():
state_path.unlink()
removed.append(zip_path)
if not removed:
return 0
for parent in sorted({p.parent for p in removed}, key=lambda p: len(p.parts), reverse=True):
if not parent.exists():
continue
while parent != ARCHIVE_ROOT and not any(parent.iterdir()):
parent.rmdir()
parent = parent.parent
return len(removed)
def main() -> int:
args = parse_args()
workers = max_workers(args.jobs)
if args.mode == "expand":
run_expand(workers, verbose=args.verbose)
return 0
updated = run_zip(workers, verbose=args.verbose)
return 0 if updated >= 0 else 1
if __name__ == "__main__":
try:
raise SystemExit(main())
except Exception as exc: # broad to surface unexpected errors cleanly
print(f"Sequence sync failed: {exc}", file=sys.stderr)
raise