7z seq compression working
This commit is contained in:
236
zip_sequences.py
236
zip_sequences.py
@@ -33,7 +33,6 @@ SEQUENCE_EXTENSIONS = {
|
||||
".exr",
|
||||
}
|
||||
STATE_SUFFIX = ".meta.json"
|
||||
CONFIG_PATH = Path(__file__).resolve().with_name("config.json")
|
||||
DEFAULT_CONFIG = {
|
||||
"zipper": "7z",
|
||||
"compression": 9,
|
||||
@@ -42,24 +41,34 @@ DEFAULT_CONFIG = {
|
||||
|
||||
|
||||
def load_config() -> dict:
|
||||
try:
|
||||
text = CONFIG_PATH.read_text(encoding="utf-8")
|
||||
except FileNotFoundError:
|
||||
return DEFAULT_CONFIG.copy()
|
||||
except OSError:
|
||||
return DEFAULT_CONFIG.copy()
|
||||
|
||||
try:
|
||||
data = json.loads(text)
|
||||
except json.JSONDecodeError:
|
||||
return DEFAULT_CONFIG.copy()
|
||||
|
||||
if not isinstance(data, dict):
|
||||
return DEFAULT_CONFIG.copy()
|
||||
|
||||
merged = DEFAULT_CONFIG.copy()
|
||||
merged.update(data)
|
||||
return merged
|
||||
# First try to load from project's .config folder (current working directory)
|
||||
# Then fall back to ProjectStructure repo config (next to zip_sequences.py)
|
||||
cwd = Path.cwd()
|
||||
project_config = cwd / ".config" / "config.json"
|
||||
repo_config = Path(__file__).resolve().with_name("config.json")
|
||||
|
||||
config_paths = [
|
||||
("project", project_config),
|
||||
("repo", repo_config),
|
||||
]
|
||||
|
||||
for source, config_path in config_paths:
|
||||
try:
|
||||
if config_path.exists():
|
||||
text = config_path.read_text(encoding="utf-8")
|
||||
try:
|
||||
data = json.loads(text)
|
||||
if isinstance(data, dict):
|
||||
merged = DEFAULT_CONFIG.copy()
|
||||
merged.update(data)
|
||||
return merged
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
except OSError:
|
||||
continue
|
||||
|
||||
# If no config found, return defaults
|
||||
return DEFAULT_CONFIG.copy()
|
||||
|
||||
|
||||
CONFIG = load_config()
|
||||
@@ -69,6 +78,7 @@ if isinstance(zipper_val, bool):
|
||||
ZIPPER_TYPE = "7z" if zipper_val else "zip"
|
||||
else:
|
||||
ZIPPER_TYPE = str(zipper_val).lower()
|
||||
|
||||
COMPRESSION_LEVEL = CONFIG.get("compression", 9)
|
||||
if isinstance(COMPRESSION_LEVEL, str):
|
||||
try:
|
||||
@@ -82,8 +92,6 @@ COMPRESSION_LEVEL = max(0, min(9, COMPRESSION_LEVEL))
|
||||
SEVEN_Z_EXE: str | None = None
|
||||
if ZIPPER_TYPE == "7z":
|
||||
SEVEN_Z_EXE = shutil.which("7z") or shutil.which("7za")
|
||||
if SEVEN_Z_EXE is None:
|
||||
print("[zip] Warning: 7z compression requested but no 7z executable was found in PATH.", file=sys.stderr)
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
@@ -179,7 +187,8 @@ def state_changed(seq_state: dict, stored_state: dict | None) -> bool:
|
||||
|
||||
def archive_path_for(seq_dir: Path) -> Path:
|
||||
rel = seq_dir.relative_to(RENDER_ROOT)
|
||||
return (ARCHIVE_ROOT / rel).with_suffix(".zip")
|
||||
suffix = ".7z" if ZIPPER_TYPE == "7z" else ".zip"
|
||||
return (ARCHIVE_ROOT / rel).with_suffix(suffix)
|
||||
|
||||
|
||||
def sequence_dir_for(zip_path: Path) -> Path:
|
||||
@@ -194,15 +203,21 @@ def state_path_for(zip_path: Path) -> Path:
|
||||
def zip_sequence(seq_dir: Path, zip_path: Path) -> None:
|
||||
if ZIPPER_TYPE == "7z":
|
||||
if SEVEN_Z_EXE is None:
|
||||
raise RuntimeError("7z compression requested but 7z executable not found in PATH")
|
||||
raise RuntimeError(
|
||||
"7z compression requested but 7z executable not found in PATH. "
|
||||
"Please install 7z (e.g., via Chocolatey: choco install 7zip) "
|
||||
"or set zipper to 'zip' in config.json"
|
||||
)
|
||||
zip_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
# Remove existing zip file if it exists to avoid corruption issues
|
||||
if zip_path.exists():
|
||||
try:
|
||||
zip_path.unlink()
|
||||
except OSError:
|
||||
# If deletion fails, try to overwrite with -aoa flag
|
||||
pass
|
||||
|
||||
# If creating a .7z file, remove any existing .zip file for the same sequence
|
||||
if zip_path.suffix == ".7z":
|
||||
old_zip_path = zip_path.with_suffix(".zip")
|
||||
if old_zip_path.exists():
|
||||
old_zip_path.unlink(missing_ok=True)
|
||||
old_state_path = state_path_for(old_zip_path)
|
||||
if old_state_path.exists():
|
||||
old_state_path.unlink(missing_ok=True)
|
||||
|
||||
# Build list of files to archive with relative paths
|
||||
file_list = []
|
||||
@@ -213,9 +228,14 @@ def zip_sequence(seq_dir: Path, zip_path: Path) -> None:
|
||||
if not file_list:
|
||||
raise RuntimeError(f"No files found to archive in {seq_dir}")
|
||||
|
||||
# Use a temporary list file to avoid Windows command line length limits
|
||||
# Create zip in temporary location first to avoid issues with corrupted existing files
|
||||
temp_zip = None
|
||||
list_file_path = None
|
||||
try:
|
||||
# Create temporary archive file path (but don't create the file - let 7z create it)
|
||||
temp_zip_path = tempfile.mktemp(suffix=".7z", dir=zip_path.parent)
|
||||
temp_zip = Path(temp_zip_path)
|
||||
|
||||
# Create list file with absolute path
|
||||
fd, temp_path = tempfile.mkstemp(suffix=".lst", text=True)
|
||||
list_file_path = Path(temp_path)
|
||||
@@ -227,18 +247,18 @@ def zip_sequence(seq_dir: Path, zip_path: Path) -> None:
|
||||
# File is closed here by context manager, small delay to ensure OS releases handle
|
||||
time.sleep(0.1)
|
||||
|
||||
# Use absolute path for list file in 7z command
|
||||
# Use absolute paths for both list file and temp zip
|
||||
list_file_abs = list_file_path.resolve()
|
||||
# Use -aoa to overwrite all files, -bb0 to suppress progress output
|
||||
temp_zip_abs = temp_zip.resolve()
|
||||
# Create archive in temp location first (7z will create it fresh)
|
||||
cmd = [
|
||||
SEVEN_Z_EXE,
|
||||
"a",
|
||||
"-y",
|
||||
"-aoa", # Overwrite all existing files
|
||||
"-bb0", # Suppress progress output
|
||||
f"-mx={COMPRESSION_LEVEL}",
|
||||
"-tzip",
|
||||
str(zip_path),
|
||||
"-t7z", # Use 7z format, not zip
|
||||
str(temp_zip_abs),
|
||||
f"@{list_file_abs}",
|
||||
]
|
||||
result = subprocess.run(
|
||||
@@ -254,7 +274,19 @@ def zip_sequence(seq_dir: Path, zip_path: Path) -> None:
|
||||
if result.stdout:
|
||||
error_msg += f"\nstdout: {result.stdout.strip()}"
|
||||
raise RuntimeError(f"7z compression failed: {error_msg}")
|
||||
|
||||
# Move temp zip to final location, replacing any existing file
|
||||
if zip_path.exists():
|
||||
zip_path.unlink()
|
||||
temp_zip.replace(zip_path)
|
||||
temp_zip = None # Mark as moved so we don't delete it
|
||||
finally:
|
||||
# Clean up temp zip if it wasn't moved
|
||||
if temp_zip and temp_zip.exists():
|
||||
try:
|
||||
temp_zip.unlink(missing_ok=True)
|
||||
except OSError:
|
||||
pass
|
||||
# Clean up list file, with retry in case 7z still has it open
|
||||
if list_file_path and list_file_path.exists():
|
||||
for attempt in range(3):
|
||||
@@ -270,20 +302,29 @@ def zip_sequence(seq_dir: Path, zip_path: Path) -> None:
|
||||
pass
|
||||
return
|
||||
|
||||
# Use zipfile (ZIPPER_TYPE == "zip" or fallback)
|
||||
from zipfile import ZIP_DEFLATED, ZIP_STORED, ZipFile
|
||||
# Use zipfile (only if ZIPPER_TYPE == "zip")
|
||||
if ZIPPER_TYPE == "zip":
|
||||
from zipfile import ZIP_DEFLATED, ZIP_STORED, ZipFile
|
||||
|
||||
zip_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
if COMPRESSION_LEVEL <= 0:
|
||||
compression = ZIP_STORED
|
||||
zip_kwargs = {}
|
||||
else:
|
||||
compression = ZIP_DEFLATED
|
||||
zip_kwargs = {"compresslevel": COMPRESSION_LEVEL}
|
||||
zip_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
if COMPRESSION_LEVEL <= 0:
|
||||
compression = ZIP_STORED
|
||||
zip_kwargs = {}
|
||||
else:
|
||||
compression = ZIP_DEFLATED
|
||||
zip_kwargs = {"compresslevel": COMPRESSION_LEVEL}
|
||||
|
||||
with ZipFile(zip_path, "w", compression=compression, **zip_kwargs) as archive:
|
||||
for file_path in iter_sequence_files(seq_dir):
|
||||
archive.write(file_path, arcname=file_path.relative_to(seq_dir).as_posix())
|
||||
with ZipFile(zip_path, "w", compression=compression, **zip_kwargs) as archive:
|
||||
for file_path in iter_sequence_files(seq_dir):
|
||||
archive.write(file_path, arcname=file_path.relative_to(seq_dir).as_posix())
|
||||
return
|
||||
|
||||
# Unknown ZIPPER_TYPE - fail with clear error
|
||||
raise RuntimeError(
|
||||
f"Unsupported ZIPPER_TYPE: {ZIPPER_TYPE!r}. "
|
||||
f"Expected '7z' or 'zip'. "
|
||||
f"Config zipper value: {CONFIG.get('zipper', 'not set')!r}"
|
||||
)
|
||||
|
||||
|
||||
def expand_sequence(zip_path: Path, seq_state: dict) -> None:
|
||||
@@ -294,7 +335,10 @@ def expand_sequence(zip_path: Path, seq_state: dict) -> None:
|
||||
|
||||
if ZIPPER_TYPE == "7z":
|
||||
if SEVEN_Z_EXE is None:
|
||||
raise RuntimeError("7z extraction requested but 7z executable not found in PATH")
|
||||
raise RuntimeError(
|
||||
"7z extraction requested but 7z executable not found in PATH. "
|
||||
"Please install 7z or set zipper to 'zip' in config.json"
|
||||
)
|
||||
cmd = [
|
||||
SEVEN_Z_EXE,
|
||||
"x",
|
||||
@@ -302,12 +346,29 @@ def expand_sequence(zip_path: Path, seq_state: dict) -> None:
|
||||
str(zip_path),
|
||||
f"-o{target_dir}",
|
||||
]
|
||||
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
else:
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
check=False,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
error_msg = result.stderr.strip() if result.stderr else "Unknown error"
|
||||
if result.stdout:
|
||||
error_msg += f"\nstdout: {result.stdout.strip()}"
|
||||
raise RuntimeError(f"7z extraction failed: {error_msg}")
|
||||
elif ZIPPER_TYPE == "zip":
|
||||
from zipfile import ZipFile
|
||||
|
||||
with ZipFile(zip_path, "r") as archive:
|
||||
archive.extractall(target_dir)
|
||||
else:
|
||||
raise RuntimeError(
|
||||
f"Unsupported ZIPPER_TYPE: {ZIPPER_TYPE!r}. "
|
||||
f"Expected '7z' or 'zip'. "
|
||||
f"Config zipper value: {CONFIG.get('zipper', 'not set')!r}"
|
||||
)
|
||||
|
||||
for entry in seq_state.get("files", []):
|
||||
file_path = target_dir / entry["path"]
|
||||
@@ -336,11 +397,34 @@ def run_zip(worker_count: int, *, verbose: bool) -> int:
|
||||
if not seq_state["files"]:
|
||||
continue
|
||||
|
||||
# Get the target archive path (will be .7z if ZIPPER_TYPE is "7z")
|
||||
zip_path = archive_path_for(seq_dir)
|
||||
state_path = state_path_for(zip_path)
|
||||
|
||||
# Check if we need to upgrade from .zip to .7z
|
||||
old_zip_path = None
|
||||
if ZIPPER_TYPE == "7z":
|
||||
# Check if an old .zip file exists
|
||||
old_zip_path = zip_path.with_suffix(".zip")
|
||||
if old_zip_path.exists():
|
||||
# Check if the old .zip's metadata matches current state
|
||||
old_state_path = state_path_for(old_zip_path)
|
||||
old_stored_state = load_state(old_state_path)
|
||||
if not state_changed(seq_state, old_stored_state):
|
||||
# Old .zip is up to date, skip conversion
|
||||
continue
|
||||
# Old .zip is out of date, will be replaced with .7z
|
||||
|
||||
# Check if the target archive (e.g., .7z) already exists and is up to date
|
||||
stored_state = load_state(state_path)
|
||||
|
||||
if not state_changed(seq_state, stored_state):
|
||||
# Target archive is up to date, but we might still need to clean up old .zip
|
||||
if old_zip_path and old_zip_path.exists():
|
||||
# Old .zip exists but we have a newer .7z, remove the old one
|
||||
old_zip_path.unlink(missing_ok=True)
|
||||
old_state_path = state_path_for(old_zip_path)
|
||||
if old_state_path.exists():
|
||||
old_state_path.unlink(missing_ok=True)
|
||||
continue
|
||||
|
||||
work_items.append((seq_dir, zip_path, state_path, seq_state))
|
||||
@@ -394,18 +478,21 @@ def run_expand(worker_count: int, *, verbose: bool) -> int:
|
||||
|
||||
work_items: list[tuple[Path, dict]] = []
|
||||
|
||||
for zip_path in ARCHIVE_ROOT.rglob("*.zip"):
|
||||
state_path = state_path_for(zip_path)
|
||||
seq_state = load_state(state_path)
|
||||
if seq_state is None:
|
||||
log("expand", f"Skipping {zip_path} (missing metadata)")
|
||||
continue
|
||||
# Look for both .zip and .7z archives
|
||||
archive_patterns = ["*.zip", "*.7z"]
|
||||
for pattern in archive_patterns:
|
||||
for zip_path in ARCHIVE_ROOT.rglob(pattern):
|
||||
state_path = state_path_for(zip_path)
|
||||
seq_state = load_state(state_path)
|
||||
if seq_state is None:
|
||||
log("expand", f"Skipping {zip_path} (missing metadata)")
|
||||
continue
|
||||
|
||||
target_dir = sequence_dir_for(zip_path)
|
||||
if current_state(target_dir) == seq_state:
|
||||
continue
|
||||
target_dir = sequence_dir_for(zip_path)
|
||||
if current_state(target_dir) == seq_state:
|
||||
continue
|
||||
|
||||
work_items.append((zip_path, seq_state))
|
||||
work_items.append((zip_path, seq_state))
|
||||
|
||||
if not work_items:
|
||||
log("expand", "Working folders already match archives; nothing to expand.")
|
||||
@@ -437,19 +524,22 @@ def cleanup_orphan_archives(*, verbose: bool) -> int:
|
||||
|
||||
removed: list[Path] = []
|
||||
|
||||
for zip_path in ARCHIVE_ROOT.rglob("*.zip"):
|
||||
seq_dir = sequence_dir_for(zip_path)
|
||||
if seq_dir.exists():
|
||||
continue
|
||||
# Look for both .zip and .7z archives
|
||||
archive_patterns = ["*.zip", "*.7z"]
|
||||
for pattern in archive_patterns:
|
||||
for zip_path in ARCHIVE_ROOT.rglob(pattern):
|
||||
seq_dir = sequence_dir_for(zip_path)
|
||||
if seq_dir.exists():
|
||||
continue
|
||||
|
||||
rel = zip_path.relative_to(ARCHIVE_ROOT)
|
||||
log("zip", f"Removing orphan archive {rel}", verbose_only=True, verbose=verbose)
|
||||
rel = zip_path.relative_to(ARCHIVE_ROOT)
|
||||
log("zip", f"Removing orphan archive {rel}", verbose_only=True, verbose=verbose)
|
||||
|
||||
zip_path.unlink(missing_ok=True)
|
||||
state_path = state_path_for(zip_path)
|
||||
if state_path.exists():
|
||||
state_path.unlink()
|
||||
removed.append(zip_path)
|
||||
zip_path.unlink(missing_ok=True)
|
||||
state_path = state_path_for(zip_path)
|
||||
if state_path.exists():
|
||||
state_path.unlink()
|
||||
removed.append(zip_path)
|
||||
|
||||
if not removed:
|
||||
return 0
|
||||
|
||||
Reference in New Issue
Block a user