append zipseqarchv script and batches
This commit is contained in:
1
.specstory/.gitignore
vendored
1
.specstory/.gitignore
vendored
@@ -1,2 +1,3 @@
|
||||
|
||||
# SpecStory explanation file
|
||||
/.what-is-this.md
|
||||
|
||||
8
.specstory/.project.json
Normal file
8
.specstory/.project.json
Normal file
@@ -0,0 +1,8 @@
|
||||
{
|
||||
"workspace_id": "94b7-1af5-44a2-c709",
|
||||
"workspace_id_at": "2025-11-06T17:01:19.380Z",
|
||||
"project_name": "ProjectStructure",
|
||||
"cloud_sync": false,
|
||||
"git_id": "002d-9d95-b84c-ef52",
|
||||
"git_id_at": "2025-11-06T17:01:19.414Z"
|
||||
}
|
||||
@@ -298,3 +298,13 @@ Lines added: 1, lines removed: 1
|
||||
|
||||
---
|
||||
|
||||
Updated to use the absolute work path:
|
||||
|
||||
```4:4:A:\1 Amazon_Active_Projects\3 ProjectStructure\UpdateSequences.bat
|
||||
set "ps1=A:\1 Amazon_Active_Projects\3 ProjectStructure\UpdateSequences.ps1"
|
||||
```
|
||||
|
||||
The script now points to the absolute path for the work configuration.
|
||||
|
||||
---
|
||||
|
||||
|
||||
24
UnzipSeqArchv.bat
Normal file
24
UnzipSeqArchv.bat
Normal file
@@ -0,0 +1,24 @@
|
||||
@echo off
|
||||
setlocal
|
||||
|
||||
set "REPO_ROOT=%~dp0"
|
||||
set "PY_SCRIPT=A:\1 Amazon_Active_Projects\3 ProjectStructure\zip_sequences.py"
|
||||
|
||||
pushd "%REPO_ROOT%" >nul 2>&1
|
||||
|
||||
if not exist "%PY_SCRIPT%" (
|
||||
echo Missing %PY_SCRIPT%
|
||||
popd >nul 2>&1
|
||||
exit /b 1
|
||||
)
|
||||
|
||||
python "%PY_SCRIPT%" --mode expand --verbose %*
|
||||
set "ERR=%ERRORLEVEL%"
|
||||
|
||||
if not "%ERR%"=="0" (
|
||||
echo Failed to expand render sequence archives (exit code %ERR%).
|
||||
)
|
||||
|
||||
popd >nul 2>&1
|
||||
exit /b %ERR%
|
||||
|
||||
24
ZipSeqArchv.bat
Normal file
24
ZipSeqArchv.bat
Normal file
@@ -0,0 +1,24 @@
|
||||
@echo off
|
||||
setlocal
|
||||
|
||||
set "REPO_ROOT=%~dp0"
|
||||
set "PY_SCRIPT=A:\1 Amazon_Active_Projects\3 ProjectStructure\zip_sequences.py"
|
||||
|
||||
pushd "%REPO_ROOT%" >nul 2>&1
|
||||
|
||||
if not exist "%PY_SCRIPT%" (
|
||||
echo Missing %PY_SCRIPT%
|
||||
popd >nul 2>&1
|
||||
exit /b 1
|
||||
)
|
||||
|
||||
python "%PY_SCRIPT%" --verbose %*
|
||||
set "ERR=%ERRORLEVEL%"
|
||||
|
||||
if not "%ERR%"=="0" (
|
||||
echo Failed to update render sequence archives (exit code %ERR%).
|
||||
)
|
||||
|
||||
popd >nul 2>&1
|
||||
exit /b %ERR%
|
||||
|
||||
311
zip_sequences.py
Normal file
311
zip_sequences.py
Normal file
@@ -0,0 +1,311 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Maintain zipped render sequences for Git hooks.
|
||||
|
||||
Default mode scans `Renders/`, produces ZIP archives under `Renders/_zipped/`,
|
||||
and stages any updated archives so commits only track compact files. Switch to
|
||||
`--mode expand` to inflate the tracked archives back into the ignored working
|
||||
directories after checkouts or pulls.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from pathlib import Path
|
||||
from typing import Iterator, Sequence
|
||||
|
||||
|
||||
RENDER_ROOT = Path("Renders")
|
||||
ARCHIVE_ROOT = RENDER_ROOT / "_zipped"
|
||||
SEQUENCE_EXTENSIONS = {
|
||||
".png",
|
||||
".jpg",
|
||||
".jpeg",
|
||||
".tif",
|
||||
".tiff",
|
||||
".exr",
|
||||
}
|
||||
STATE_SUFFIX = ".meta.json"
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(description="Sync render sequences with zipped archives.")
|
||||
parser.add_argument(
|
||||
"--mode",
|
||||
choices=("zip", "expand"),
|
||||
default="zip",
|
||||
help="zip sequences for commit (default) or expand tracked archives",
|
||||
)
|
||||
parser.add_argument("--jobs", type=int, help="max parallel workers")
|
||||
parser.add_argument("--verbose", action="store_true", help="print extra progress details")
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def max_workers(requested: int | None) -> int:
|
||||
cpu = os.cpu_count() or 1
|
||||
limit = max(1, min(8, cpu))
|
||||
if requested and requested > 0:
|
||||
return min(requested, max(1, cpu))
|
||||
return limit
|
||||
|
||||
|
||||
def log(mode: str, message: str, *, verbose_only: bool = False, verbose: bool = False) -> None:
|
||||
if verbose_only and not verbose:
|
||||
return
|
||||
print(f"[{mode}] {message}")
|
||||
|
||||
|
||||
def find_sequence_dirs(root: Path) -> Iterator[Path]:
|
||||
for dirpath, dirnames, filenames in os.walk(root):
|
||||
path = Path(dirpath)
|
||||
if ARCHIVE_ROOT in path.parents or path == ARCHIVE_ROOT:
|
||||
dirnames.clear()
|
||||
continue
|
||||
has_frames = any(Path(dirpath, f).suffix.lower() in SEQUENCE_EXTENSIONS for f in filenames)
|
||||
if has_frames:
|
||||
yield path
|
||||
|
||||
|
||||
def compute_state(seq_dir: Path) -> dict:
|
||||
entries = []
|
||||
for file_path in sorted(p for p in seq_dir.iterdir() if p.is_file()):
|
||||
stat = file_path.stat()
|
||||
entries.append(
|
||||
{
|
||||
"path": file_path.name,
|
||||
"size": stat.st_size,
|
||||
"mtime_ns": stat.st_mtime_ns,
|
||||
}
|
||||
)
|
||||
return {"files": entries}
|
||||
|
||||
|
||||
def current_state(seq_dir: Path) -> dict:
|
||||
if not seq_dir.exists() or not seq_dir.is_dir():
|
||||
return {"files": []}
|
||||
return compute_state(seq_dir)
|
||||
|
||||
|
||||
def load_state(state_path: Path) -> dict | None:
|
||||
if not state_path.exists():
|
||||
return None
|
||||
try:
|
||||
return json.loads(state_path.read_text())
|
||||
except json.JSONDecodeError:
|
||||
return None
|
||||
|
||||
|
||||
def state_changed(seq_state: dict, stored_state: dict | None) -> bool:
|
||||
if stored_state is None:
|
||||
return True
|
||||
return seq_state != stored_state
|
||||
|
||||
|
||||
def archive_path_for(seq_dir: Path) -> Path:
|
||||
rel = seq_dir.relative_to(RENDER_ROOT)
|
||||
return (ARCHIVE_ROOT / rel).with_suffix(".zip")
|
||||
|
||||
|
||||
def sequence_dir_for(zip_path: Path) -> Path:
|
||||
rel = zip_path.relative_to(ARCHIVE_ROOT)
|
||||
return (RENDER_ROOT / rel).with_suffix("")
|
||||
|
||||
|
||||
def state_path_for(zip_path: Path) -> Path:
|
||||
return zip_path.with_suffix(zip_path.suffix + STATE_SUFFIX)
|
||||
|
||||
|
||||
def zip_sequence(seq_dir: Path, zip_path: Path) -> None:
|
||||
from zipfile import ZIP_STORED, ZipFile
|
||||
|
||||
zip_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with ZipFile(zip_path, "w", compression=ZIP_STORED) as archive:
|
||||
for file_path in sorted(seq_dir.iterdir()):
|
||||
if not file_path.is_file():
|
||||
continue
|
||||
archive.write(file_path, arcname=file_path.name)
|
||||
|
||||
|
||||
def expand_sequence(zip_path: Path, seq_state: dict) -> None:
|
||||
from zipfile import ZipFile
|
||||
|
||||
target_dir = sequence_dir_for(zip_path)
|
||||
if target_dir.exists():
|
||||
shutil.rmtree(target_dir)
|
||||
target_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
with ZipFile(zip_path, "r") as archive:
|
||||
archive.extractall(target_dir)
|
||||
|
||||
for entry in seq_state.get("files", []):
|
||||
file_path = target_dir / entry["path"]
|
||||
if file_path.exists():
|
||||
os.utime(file_path, ns=(entry["mtime_ns"], entry["mtime_ns"]))
|
||||
|
||||
|
||||
def process_zip(seq_dir: Path, zip_path: Path, state_path: Path, seq_state: dict, *, verbose: bool) -> Sequence[Path]:
|
||||
log("zip", f"{seq_dir} -> {zip_path}", verbose_only=True, verbose=verbose)
|
||||
zip_sequence(seq_dir, zip_path)
|
||||
state_path.write_text(json.dumps(seq_state, indent=2))
|
||||
return (zip_path, state_path)
|
||||
|
||||
|
||||
def process_expand(zip_path: Path, state: dict, *, verbose: bool) -> None:
|
||||
log("expand", f"{zip_path} -> {sequence_dir_for(zip_path)}", verbose_only=True, verbose=verbose)
|
||||
expand_sequence(zip_path, state)
|
||||
|
||||
|
||||
def run_zip(worker_count: int, *, verbose: bool) -> int:
|
||||
work_items: list[tuple[Path, Path, Path, dict]] = []
|
||||
|
||||
if RENDER_ROOT.exists():
|
||||
for seq_dir in find_sequence_dirs(RENDER_ROOT):
|
||||
seq_state = compute_state(seq_dir)
|
||||
if not seq_state["files"]:
|
||||
continue
|
||||
|
||||
zip_path = archive_path_for(seq_dir)
|
||||
state_path = state_path_for(zip_path)
|
||||
stored_state = load_state(state_path)
|
||||
|
||||
if not state_changed(seq_state, stored_state):
|
||||
continue
|
||||
|
||||
work_items.append((seq_dir, zip_path, state_path, seq_state))
|
||||
|
||||
if not work_items:
|
||||
return 0
|
||||
|
||||
updated_paths: list[Path] = []
|
||||
|
||||
total = len(work_items)
|
||||
completed = 0
|
||||
|
||||
with ThreadPoolExecutor(max_workers=worker_count) as executor:
|
||||
future_map = {
|
||||
executor.submit(process_zip, seq_dir, zip_path, state_path, seq_state, verbose=verbose): seq_dir
|
||||
for seq_dir, zip_path, state_path, seq_state in work_items
|
||||
}
|
||||
|
||||
for future in as_completed(future_map):
|
||||
updated_paths.extend(future.result())
|
||||
completed += 1
|
||||
seq_dir = future_map[future]
|
||||
rel = seq_dir.relative_to(RENDER_ROOT)
|
||||
log("zip", f"{completed}/{total} {rel}")
|
||||
|
||||
updated_count = len(updated_paths) // 2
|
||||
log("zip", f"Updated {updated_count} sequence archive(s).", verbose=verbose)
|
||||
if updated_paths:
|
||||
log(
|
||||
"zip",
|
||||
"Archives updated. Stage manually with `git add Renders/_zipped`, if desired.",
|
||||
verbose_only=True,
|
||||
verbose=verbose,
|
||||
)
|
||||
|
||||
removed = cleanup_orphan_archives(verbose=verbose)
|
||||
if removed:
|
||||
log("zip", f"Removed {removed} orphan archive(s).", verbose=verbose)
|
||||
|
||||
return updated_count
|
||||
|
||||
|
||||
def run_expand(worker_count: int, *, verbose: bool) -> int:
|
||||
if not ARCHIVE_ROOT.exists():
|
||||
return 0
|
||||
|
||||
work_items: list[tuple[Path, dict]] = []
|
||||
|
||||
for zip_path in ARCHIVE_ROOT.rglob("*.zip"):
|
||||
state_path = state_path_for(zip_path)
|
||||
seq_state = load_state(state_path)
|
||||
if seq_state is None:
|
||||
log("expand", f"Skipping {zip_path} (missing metadata)")
|
||||
continue
|
||||
|
||||
target_dir = sequence_dir_for(zip_path)
|
||||
if current_state(target_dir) == seq_state:
|
||||
continue
|
||||
|
||||
work_items.append((zip_path, seq_state))
|
||||
|
||||
if not work_items:
|
||||
return 0
|
||||
|
||||
total = len(work_items)
|
||||
completed = 0
|
||||
|
||||
with ThreadPoolExecutor(max_workers=worker_count) as executor:
|
||||
future_map = {
|
||||
executor.submit(process_expand, zip_path, seq_state, verbose=verbose): zip_path
|
||||
for zip_path, seq_state in work_items
|
||||
}
|
||||
|
||||
for future in as_completed(future_map):
|
||||
future.result()
|
||||
completed += 1
|
||||
zip_path = future_map[future]
|
||||
rel = zip_path.relative_to(ARCHIVE_ROOT)
|
||||
log("expand", f"{completed}/{total} {rel}")
|
||||
|
||||
log("expand", f"Refreshed {len(work_items)} sequence folder(s).", verbose=verbose)
|
||||
return len(work_items)
|
||||
|
||||
|
||||
def cleanup_orphan_archives(*, verbose: bool) -> int:
|
||||
if not ARCHIVE_ROOT.exists():
|
||||
return 0
|
||||
|
||||
removed: list[Path] = []
|
||||
|
||||
for zip_path in ARCHIVE_ROOT.rglob("*.zip"):
|
||||
seq_dir = sequence_dir_for(zip_path)
|
||||
if seq_dir.exists():
|
||||
continue
|
||||
|
||||
rel = zip_path.relative_to(ARCHIVE_ROOT)
|
||||
log("zip", f"Removing orphan archive {rel}", verbose_only=True, verbose=verbose)
|
||||
|
||||
zip_path.unlink(missing_ok=True)
|
||||
state_path = state_path_for(zip_path)
|
||||
if state_path.exists():
|
||||
state_path.unlink()
|
||||
removed.append(zip_path)
|
||||
|
||||
if not removed:
|
||||
return 0
|
||||
|
||||
for parent in sorted({p.parent for p in removed}, key=lambda p: len(p.parts), reverse=True):
|
||||
if not parent.exists():
|
||||
continue
|
||||
while parent != ARCHIVE_ROOT and not any(parent.iterdir()):
|
||||
parent.rmdir()
|
||||
parent = parent.parent
|
||||
|
||||
return len(removed)
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
workers = max_workers(args.jobs)
|
||||
|
||||
if args.mode == "expand":
|
||||
run_expand(workers, verbose=args.verbose)
|
||||
return 0
|
||||
|
||||
updated = run_zip(workers, verbose=args.verbose)
|
||||
return 0 if updated >= 0 else 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
raise SystemExit(main())
|
||||
except Exception as exc: # broad to surface unexpected errors cleanly
|
||||
print(f"Sequence sync failed: {exc}", file=sys.stderr)
|
||||
raise
|
||||
|
||||
Reference in New Issue
Block a user