more 7zinst behavior modification
This commit is contained in:
File diff suppressed because one or more lines are too long
230
zip_sequences.py
230
zip_sequences.py
@@ -58,6 +58,12 @@ DEFAULT_CONFIG = {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def log(mode: str, message: str, *, verbose_only: bool = False, verbose: bool = False) -> None:
|
||||||
|
if verbose_only and not verbose:
|
||||||
|
return
|
||||||
|
print(f"[{mode}] {message}", flush=True)
|
||||||
|
|
||||||
|
|
||||||
def load_config() -> dict:
|
def load_config() -> dict:
|
||||||
# First try to load from project's .config folder (current working directory)
|
# First try to load from project's .config folder (current working directory)
|
||||||
# Then fall back to ProjectStructure repo config (next to zip_sequences.py)
|
# Then fall back to ProjectStructure repo config (next to zip_sequences.py)
|
||||||
@@ -70,22 +76,28 @@ def load_config() -> dict:
|
|||||||
("repo", repo_config),
|
("repo", repo_config),
|
||||||
]
|
]
|
||||||
|
|
||||||
|
log("init", "Loading configuration sources...")
|
||||||
for source, config_path in config_paths:
|
for source, config_path in config_paths:
|
||||||
try:
|
try:
|
||||||
if config_path.exists():
|
if config_path.exists():
|
||||||
|
log("init", f"Reading {source} config at {config_path}")
|
||||||
text = config_path.read_text(encoding="utf-8")
|
text = config_path.read_text(encoding="utf-8")
|
||||||
try:
|
try:
|
||||||
data = json.loads(text)
|
data = json.loads(text)
|
||||||
if isinstance(data, dict):
|
if isinstance(data, dict):
|
||||||
merged = DEFAULT_CONFIG.copy()
|
merged = DEFAULT_CONFIG.copy()
|
||||||
merged.update(data)
|
merged.update(data)
|
||||||
|
log("init", f"Configuration loaded from {source}")
|
||||||
return merged
|
return merged
|
||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError:
|
||||||
|
log("init", f"Config file at {config_path} is invalid JSON; skipping")
|
||||||
continue
|
continue
|
||||||
except OSError:
|
except OSError:
|
||||||
|
log("init", f"Unable to read config at {config_path}; skipping")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# If no config found, return defaults
|
# If no config found, return defaults
|
||||||
|
log("init", "No config files found; using default settings")
|
||||||
return DEFAULT_CONFIG.copy()
|
return DEFAULT_CONFIG.copy()
|
||||||
|
|
||||||
|
|
||||||
@@ -197,22 +209,32 @@ def estimate_ram_per_job(seq_dir: Path, seq_state: dict) -> int:
|
|||||||
total_bytes = sum(entry.get("size", 0) for entry in seq_state.get("files", []))
|
total_bytes = sum(entry.get("size", 0) for entry in seq_state.get("files", []))
|
||||||
|
|
||||||
if ZIPPER_TYPE == "7z":
|
if ZIPPER_TYPE == "7z":
|
||||||
# Base RAM: 500MB per job
|
# Fixed dictionary size: 1GB (1024MB)
|
||||||
base_ram = 500 * 1024 * 1024 # 500 MB
|
FIXED_DICT_SIZE_MB = 1024
|
||||||
|
FIXED_DICT_SIZE_BYTES = FIXED_DICT_SIZE_MB * 1024 * 1024
|
||||||
|
|
||||||
# Compression factor: 7z can use significant RAM, especially for large files
|
# Dictionary RAM: ~11x dictionary size (1GB dict = 11GB RAM for dictionary operations)
|
||||||
# Use 0.15x factor (conservative estimate accounting for 7z's 80% usage)
|
# This is the main memory consumer for 7z LZMA compression
|
||||||
compression_factor = 0.15
|
dict_ram = FIXED_DICT_SIZE_BYTES * 11 # 11GB for 1GB dictionary
|
||||||
|
|
||||||
# For very large folders (>10GB), cap at 8GB per job
|
# Input buffer: 7z processes in chunks, but for very large sequences needs more RAM
|
||||||
max_ram_per_job = 8 * 1024 * 1024 * 1024 # 8 GB
|
# For sequences >100GB, use a larger buffer factor to handle file metadata and processing
|
||||||
large_folder_threshold = 10 * 1024 * 1024 * 1024 # 10 GB
|
if total_bytes > 500 * 1024 * 1024 * 1024: # >500GB (extremely large)
|
||||||
|
# Extremely large sequences: use 8% of size, capped at 64GB
|
||||||
if total_bytes > large_folder_threshold:
|
# This accounts for file metadata, directory structures, and processing overhead
|
||||||
estimated_ram = max_ram_per_job
|
input_buffer = min(int(total_bytes * 0.08), 64 * 1024 * 1024 * 1024)
|
||||||
|
overhead = 8 * 1024 * 1024 * 1024 # 8GB overhead for extremely large sequences
|
||||||
|
elif total_bytes > 100 * 1024 * 1024 * 1024: # >100GB
|
||||||
|
# Very large sequences: use 5% of size, capped at 32GB
|
||||||
|
input_buffer = min(int(total_bytes * 0.05), 32 * 1024 * 1024 * 1024)
|
||||||
|
overhead = 4 * 1024 * 1024 * 1024 # 4GB for large sequences
|
||||||
else:
|
else:
|
||||||
estimated_ram = max(base_ram, int(total_bytes * compression_factor))
|
# Smaller sequences: use 15% of size, capped at 2GB
|
||||||
estimated_ram = min(estimated_ram, max_ram_per_job)
|
input_buffer = min(int(total_bytes * 0.15), 2 * 1024 * 1024 * 1024)
|
||||||
|
overhead = 1 * 1024 * 1024 * 1024 # 1GB for smaller sequences
|
||||||
|
|
||||||
|
# Total RAM estimate
|
||||||
|
estimated_ram = dict_ram + input_buffer + overhead
|
||||||
|
|
||||||
return estimated_ram
|
return estimated_ram
|
||||||
else:
|
else:
|
||||||
@@ -245,7 +267,7 @@ def max_workers(
|
|||||||
|
|
||||||
# If no work items provided, return CPU-based limit
|
# If no work items provided, return CPU-based limit
|
||||||
if work_items is None or len(work_items) == 0:
|
if work_items is None or len(work_items) == 0:
|
||||||
return (cpu_limit, None)
|
return (cpu_limit, {})
|
||||||
|
|
||||||
# Try to calculate RAM-based limit
|
# Try to calculate RAM-based limit
|
||||||
available_ram = get_available_ram()
|
available_ram = get_available_ram()
|
||||||
@@ -254,7 +276,7 @@ def max_workers(
|
|||||||
# RAM detection failed, fall back to CPU limit
|
# RAM detection failed, fall back to CPU limit
|
||||||
if verbose:
|
if verbose:
|
||||||
log("zip", "RAM detection failed, using CPU-based worker limit", verbose_only=True, verbose=verbose)
|
log("zip", "RAM detection failed, using CPU-based worker limit", verbose_only=True, verbose=verbose)
|
||||||
return (cpu_limit, None)
|
return (cpu_limit, {})
|
||||||
|
|
||||||
# For 7z: use fixed dictionary size and calculate workers
|
# For 7z: use fixed dictionary size and calculate workers
|
||||||
if ZIPPER_TYPE == "7z":
|
if ZIPPER_TYPE == "7z":
|
||||||
@@ -272,6 +294,12 @@ def max_workers(
|
|||||||
if requested and requested > 0:
|
if requested and requested > 0:
|
||||||
final_limit = min(final_limit, requested)
|
final_limit = min(final_limit, requested)
|
||||||
|
|
||||||
|
# Create RAM limits dict (all use fixed dict size, but return as dict for consistency)
|
||||||
|
ram_limits_dict: dict[Path, int] = {}
|
||||||
|
if work_items:
|
||||||
|
for seq_dir, _, _, _ in work_items:
|
||||||
|
ram_limits_dict[seq_dir] = fixed_dict_size_bytes
|
||||||
|
|
||||||
if verbose:
|
if verbose:
|
||||||
log(
|
log(
|
||||||
"zip",
|
"zip",
|
||||||
@@ -281,45 +309,97 @@ def max_workers(
|
|||||||
verbose=verbose
|
verbose=verbose
|
||||||
)
|
)
|
||||||
|
|
||||||
return (final_limit, fixed_dict_size_bytes)
|
return (final_limit, ram_limits_dict)
|
||||||
|
|
||||||
# Auto-calculate based on RAM if Max7zInst not configured
|
# Auto-calculate based on RAM if Max7zInst not configured
|
||||||
# "Balls-to-the-walls" mode: use maximum resources
|
# Use bin-packing algorithm with size-aware RAM estimation
|
||||||
if available_ram is not None:
|
if available_ram is not None:
|
||||||
# 7z uses ~2-3x dictionary size in RAM, use 3x for aggressive mode
|
|
||||||
# This is more realistic and allows more concurrent workers
|
|
||||||
FIXED_RAM_PER_JOB = FIXED_DICT_SIZE_MB * 3 * 1024 * 1024 # 3GB per job
|
|
||||||
|
|
||||||
# available_ram is already 80% of total (20% reserved for system)
|
# available_ram is already 80% of total (20% reserved for system)
|
||||||
# Use 95% of available RAM for compression jobs (aggressive mode)
|
# Use 95% of available RAM for compression jobs
|
||||||
compression_ram = int(available_ram * 0.95)
|
compression_ram = int(available_ram * 0.95)
|
||||||
|
|
||||||
# Calculate worker limit based on fixed per-job RAM
|
# Estimate RAM for each work item and create list with (seq_dir, estimated_ram, work_item)
|
||||||
ram_limit = max(1, compression_ram // FIXED_RAM_PER_JOB)
|
work_items_with_ram: list[tuple[Path, int, tuple[Path, Path, Path, dict]]] = []
|
||||||
|
ram_limits_dict: dict[Path, int] = {}
|
||||||
|
|
||||||
|
for seq_dir, zip_path, state_path, seq_state in work_items:
|
||||||
|
try:
|
||||||
|
estimated_ram = estimate_ram_per_job(seq_dir, seq_state)
|
||||||
|
work_items_with_ram.append((seq_dir, estimated_ram, (seq_dir, zip_path, state_path, seq_state)))
|
||||||
|
ram_limits_dict[seq_dir] = estimated_ram
|
||||||
|
except Exception:
|
||||||
|
# If estimation fails, use a safe default (12GB minimum for 1GB dict)
|
||||||
|
default_ram = 12 * 1024 * 1024 * 1024 # 12GB
|
||||||
|
work_items_with_ram.append((seq_dir, default_ram, (seq_dir, zip_path, state_path, seq_state)))
|
||||||
|
ram_limits_dict[seq_dir] = default_ram
|
||||||
|
|
||||||
|
# Sort by estimated RAM (largest first) for bin-packing
|
||||||
|
work_items_with_ram.sort(key=lambda x: x[1], reverse=True)
|
||||||
|
|
||||||
|
# Bin-packing algorithm: pack largest items first
|
||||||
|
bins: list[list[tuple[Path, int, tuple[Path, Path, Path, dict]]]] = []
|
||||||
|
bin_remaining: list[int] = []
|
||||||
|
|
||||||
|
for seq_dir, estimated_ram, work_item in work_items_with_ram:
|
||||||
|
# Try to fit in existing bin
|
||||||
|
placed = False
|
||||||
|
for i, remaining in enumerate(bin_remaining):
|
||||||
|
if remaining >= estimated_ram:
|
||||||
|
bins[i].append((seq_dir, estimated_ram, work_item))
|
||||||
|
bin_remaining[i] -= estimated_ram
|
||||||
|
placed = True
|
||||||
|
break
|
||||||
|
|
||||||
|
# If doesn't fit, create new bin
|
||||||
|
if not placed:
|
||||||
|
bins.append([(seq_dir, estimated_ram, work_item)])
|
||||||
|
bin_remaining.append(compression_ram - estimated_ram)
|
||||||
|
|
||||||
|
# Worker count is number of bins
|
||||||
|
worker_count = len(bins)
|
||||||
|
|
||||||
# Cap at number of actual work items (can't have more workers than jobs)
|
# Cap at number of actual work items (can't have more workers than jobs)
|
||||||
num_work_items = len(work_items) if work_items else 0
|
num_work_items = len(work_items) if work_items else 0
|
||||||
if num_work_items > 0:
|
if num_work_items > 0:
|
||||||
ram_limit = min(ram_limit, num_work_items)
|
worker_count = min(worker_count, num_work_items)
|
||||||
|
|
||||||
# Use RAM limit directly (no CPU limit)
|
# Respect user's --jobs if provided
|
||||||
final_limit = ram_limit
|
|
||||||
if requested and requested > 0:
|
if requested and requested > 0:
|
||||||
final_limit = min(final_limit, requested)
|
worker_count = min(worker_count, requested)
|
||||||
|
|
||||||
if verbose:
|
if verbose:
|
||||||
ram_gb = available_ram / (1024 ** 3)
|
ram_gb = available_ram / (1024 ** 3)
|
||||||
compression_ram_gb = compression_ram / (1024 ** 3)
|
compression_ram_gb = compression_ram / (1024 ** 3)
|
||||||
ram_per_job_gb = FIXED_RAM_PER_JOB / (1024 ** 3)
|
total_estimated_gb = sum(ram for _, ram, _ in work_items_with_ram) / (1024 ** 3)
|
||||||
log(
|
log(
|
||||||
"zip",
|
"zip",
|
||||||
f"RAM: {ram_gb:.1f}GB available (80% of total), {compression_ram_gb:.1f}GB for compression (95%), {ram_per_job_gb:.1f}GB per job (dict: {FIXED_DICT_SIZE_MB}MB) → "
|
f"RAM: {ram_gb:.1f}GB available (80% of total), {compression_ram_gb:.1f}GB for compression (95%)",
|
||||||
f"RAM limit: {ram_limit}, work items: {num_work_items}, requested: {requested}, final: {final_limit}",
|
verbose_only=True,
|
||||||
|
verbose=verbose
|
||||||
|
)
|
||||||
|
log(
|
||||||
|
"zip",
|
||||||
|
f"Estimated RAM per sequence: {total_estimated_gb:.1f}GB total across {num_work_items} sequences",
|
||||||
|
verbose_only=True,
|
||||||
|
verbose=verbose
|
||||||
|
)
|
||||||
|
if len(bins) > 0:
|
||||||
|
bin_sizes = [sum(ram for _, ram, _ in bin_items) / (1024 ** 3) for bin_items in bins]
|
||||||
|
log(
|
||||||
|
"zip",
|
||||||
|
f"Bin-packing: {worker_count} workers, bin sizes: {[f'{s:.1f}GB' for s in bin_sizes[:5]]}{'...' if len(bin_sizes) > 5 else ''}",
|
||||||
|
verbose_only=True,
|
||||||
|
verbose=verbose
|
||||||
|
)
|
||||||
|
log(
|
||||||
|
"zip",
|
||||||
|
f"Final worker count: {worker_count} (requested: {requested})",
|
||||||
verbose_only=True,
|
verbose_only=True,
|
||||||
verbose=verbose
|
verbose=verbose
|
||||||
)
|
)
|
||||||
|
|
||||||
return (final_limit, fixed_dict_size_bytes)
|
# Return worker count and RAM limits dict
|
||||||
|
return (worker_count, ram_limits_dict)
|
||||||
|
|
||||||
# RAM detection failed, use a safe default (no CPU limit)
|
# RAM detection failed, use a safe default (no CPU limit)
|
||||||
default_limit = 4
|
default_limit = 4
|
||||||
@@ -328,6 +408,14 @@ def max_workers(
|
|||||||
default_limit = min(default_limit, num_work_items)
|
default_limit = min(default_limit, num_work_items)
|
||||||
if requested and requested > 0:
|
if requested and requested > 0:
|
||||||
default_limit = min(default_limit, requested)
|
default_limit = min(default_limit, requested)
|
||||||
|
|
||||||
|
# Create RAM limits dict with safe defaults (12GB per job for 1GB dict)
|
||||||
|
ram_limits_dict: dict[Path, int] = {}
|
||||||
|
if work_items:
|
||||||
|
default_ram = 12 * 1024 * 1024 * 1024 # 12GB default
|
||||||
|
for seq_dir, _, _, _ in work_items:
|
||||||
|
ram_limits_dict[seq_dir] = default_ram
|
||||||
|
|
||||||
if verbose:
|
if verbose:
|
||||||
log(
|
log(
|
||||||
"zip",
|
"zip",
|
||||||
@@ -336,21 +424,25 @@ def max_workers(
|
|||||||
verbose_only=True,
|
verbose_only=True,
|
||||||
verbose=verbose
|
verbose=verbose
|
||||||
)
|
)
|
||||||
return (default_limit, fixed_dict_size_bytes)
|
return (default_limit, ram_limits_dict)
|
||||||
|
|
||||||
# For zip compression, use existing estimation-based approach
|
# For zip compression, use existing estimation-based approach
|
||||||
# Estimate RAM per job for each work item
|
# Estimate RAM per job for each work item
|
||||||
ram_estimates = []
|
ram_estimates = []
|
||||||
|
ram_limits_dict: dict[Path, int] = {}
|
||||||
for seq_dir, zip_path, state_path, seq_state in work_items:
|
for seq_dir, zip_path, state_path, seq_state in work_items:
|
||||||
try:
|
try:
|
||||||
estimated_ram = estimate_ram_per_job(seq_dir, seq_state)
|
estimated_ram = estimate_ram_per_job(seq_dir, seq_state)
|
||||||
ram_estimates.append(estimated_ram)
|
ram_estimates.append(estimated_ram)
|
||||||
|
ram_limits_dict[seq_dir] = estimated_ram
|
||||||
except Exception:
|
except Exception:
|
||||||
# If estimation fails, use fallback estimate
|
# If estimation fails, use fallback estimate
|
||||||
ram_estimates.append(1024 * 1024 * 1024) # 1GB fallback for zip
|
fallback_ram = 1024 * 1024 * 1024 # 1GB fallback for zip
|
||||||
|
ram_estimates.append(fallback_ram)
|
||||||
|
ram_limits_dict[seq_dir] = fallback_ram
|
||||||
|
|
||||||
if not ram_estimates:
|
if not ram_estimates:
|
||||||
return (cpu_limit, None)
|
return (cpu_limit, {})
|
||||||
|
|
||||||
max_ram_per_job = max(ram_estimates)
|
max_ram_per_job = max(ram_estimates)
|
||||||
ram_limit = max(1, available_ram // max_ram_per_job)
|
ram_limit = max(1, available_ram // max_ram_per_job)
|
||||||
@@ -368,13 +460,7 @@ def max_workers(
|
|||||||
verbose=verbose
|
verbose=verbose
|
||||||
)
|
)
|
||||||
|
|
||||||
return (final_limit, None)
|
return (final_limit, ram_limits_dict)
|
||||||
|
|
||||||
|
|
||||||
def log(mode: str, message: str, *, verbose_only: bool = False, verbose: bool = False) -> None:
|
|
||||||
if verbose_only and not verbose:
|
|
||||||
return
|
|
||||||
print(f"[{mode}] {message}")
|
|
||||||
|
|
||||||
|
|
||||||
def is_archive_path(path: Path) -> bool:
|
def is_archive_path(path: Path) -> bool:
|
||||||
@@ -516,11 +602,11 @@ def zip_sequence(seq_dir: Path, zip_path: Path, per_job_memory_limit: int | None
|
|||||||
"-t7z", # Use 7z format, not zip
|
"-t7z", # Use 7z format, not zip
|
||||||
]
|
]
|
||||||
|
|
||||||
# Add fixed dictionary size if specified (7z memory usage is controlled by dictionary size)
|
# Always use fixed dictionary size: 1GB (1024MB)
|
||||||
if per_job_memory_limit is not None:
|
# The per_job_memory_limit parameter is the estimated RAM usage (for logging/info only)
|
||||||
# per_job_memory_limit is actually the fixed dictionary size in bytes
|
# We keep dictionary at 1GB for best compression regardless of RAM estimate
|
||||||
dict_size_mb = per_job_memory_limit // (1024 * 1024)
|
FIXED_DICT_SIZE_MB = 1024
|
||||||
cmd.append(f"-md={dict_size_mb}m")
|
cmd.append(f"-md={FIXED_DICT_SIZE_MB}m")
|
||||||
|
|
||||||
cmd.extend([
|
cmd.extend([
|
||||||
str(temp_zip_abs),
|
str(temp_zip_abs),
|
||||||
@@ -655,9 +741,20 @@ def process_expand(zip_path: Path, state: dict, *, verbose: bool) -> None:
|
|||||||
|
|
||||||
def run_zip(requested_workers: int | None, *, verbose: bool) -> int:
|
def run_zip(requested_workers: int | None, *, verbose: bool) -> int:
|
||||||
work_items: list[tuple[Path, Path, Path, dict]] = []
|
work_items: list[tuple[Path, Path, Path, dict]] = []
|
||||||
|
log("init", f"Scanning sequences under {RENDER_ROOT.resolve()}")
|
||||||
|
total_scanned = 0
|
||||||
|
quick_skipped = 0
|
||||||
|
state_skipped = 0
|
||||||
|
empty_dirs = 0
|
||||||
|
queued = 0
|
||||||
|
|
||||||
if RENDER_ROOT.exists():
|
if RENDER_ROOT.exists():
|
||||||
for seq_dir in find_sequence_dirs(RENDER_ROOT):
|
for seq_dir in find_sequence_dirs(RENDER_ROOT):
|
||||||
|
total_scanned += 1
|
||||||
|
rel = seq_dir.relative_to(RENDER_ROOT)
|
||||||
|
if total_scanned <= 5 or total_scanned % 10 == 0:
|
||||||
|
log("scan", f"[{total_scanned}] Inspecting {rel}")
|
||||||
|
|
||||||
# Get the target archive path (will be .7z if ZIPPER_TYPE is "7z")
|
# Get the target archive path (will be .7z if ZIPPER_TYPE is "7z")
|
||||||
zip_path = archive_path_for(seq_dir)
|
zip_path = archive_path_for(seq_dir)
|
||||||
state_path = state_path_for(zip_path)
|
state_path = state_path_for(zip_path)
|
||||||
@@ -685,6 +782,9 @@ def run_zip(requested_workers: int | None, *, verbose: bool) -> int:
|
|||||||
archive_mtime = zip_path.stat().st_mtime_ns
|
archive_mtime = zip_path.stat().st_mtime_ns
|
||||||
# If directory wasn't modified since archive was created, skip state computation
|
# If directory wasn't modified since archive was created, skip state computation
|
||||||
if dir_mtime <= archive_mtime:
|
if dir_mtime <= archive_mtime:
|
||||||
|
quick_skipped += 1
|
||||||
|
if quick_skipped <= 5:
|
||||||
|
log("scan", f"Skipping {rel} (unchanged since archive)")
|
||||||
# Still need to check for old .zip cleanup (we have .7z, so .zip is obsolete)
|
# Still need to check for old .zip cleanup (we have .7z, so .zip is obsolete)
|
||||||
if old_zip_path and old_zip_path.exists():
|
if old_zip_path and old_zip_path.exists():
|
||||||
old_zip_path.unlink(missing_ok=True)
|
old_zip_path.unlink(missing_ok=True)
|
||||||
@@ -699,11 +799,17 @@ def run_zip(requested_workers: int | None, *, verbose: bool) -> int:
|
|||||||
# Compute current state only if we need to
|
# Compute current state only if we need to
|
||||||
seq_state = compute_state(seq_dir)
|
seq_state = compute_state(seq_dir)
|
||||||
if not seq_state["files"]:
|
if not seq_state["files"]:
|
||||||
|
empty_dirs += 1
|
||||||
|
if empty_dirs <= 5:
|
||||||
|
log("scan", f"{rel} has no files; skipping")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Check if state changed
|
# Check if state changed
|
||||||
if stored_state is not None and not state_changed(seq_state, stored_state):
|
if stored_state is not None and not state_changed(seq_state, stored_state):
|
||||||
# Metadata matches stored state
|
# Metadata matches stored state
|
||||||
|
state_skipped += 1
|
||||||
|
if state_skipped <= 5:
|
||||||
|
log("scan", f"{rel} metadata unchanged; archive up to date")
|
||||||
if zip_path.exists():
|
if zip_path.exists():
|
||||||
# .7z exists and is up to date, clean up old .zip if it exists
|
# .7z exists and is up to date, clean up old .zip if it exists
|
||||||
if old_zip_path and old_zip_path.exists():
|
if old_zip_path and old_zip_path.exists():
|
||||||
@@ -720,16 +826,30 @@ def run_zip(requested_workers: int | None, *, verbose: bool) -> int:
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
work_items.append((seq_dir, zip_path, state_path, seq_state))
|
work_items.append((seq_dir, zip_path, state_path, seq_state))
|
||||||
|
queued += 1
|
||||||
|
if queued <= 5 or queued % 5 == 0:
|
||||||
|
total_bytes = sum(entry.get("size", 0) for entry in seq_state.get("files", []))
|
||||||
|
size_gb = total_bytes / (1024 ** 3)
|
||||||
|
log("scan", f"Queued {rel} for compression (~{size_gb:.2f}GB) [{queued} total]")
|
||||||
|
|
||||||
if not work_items:
|
if not work_items:
|
||||||
if not RENDER_ROOT.exists():
|
if not RENDER_ROOT.exists():
|
||||||
log("zip", "Render root 'Renders' not found; nothing to zip.")
|
log("zip", "Render root 'Renders' not found; nothing to zip.")
|
||||||
else:
|
else:
|
||||||
log("zip", "Archives already up to date; no sequences needed zipping.")
|
log("zip", "Archives already up to date; no sequences needed zipping.")
|
||||||
|
log(
|
||||||
|
"scan",
|
||||||
|
f"Summary: scanned {total_scanned}, quick-skipped {quick_skipped}, "
|
||||||
|
f"state-skipped {state_skipped}, empty {empty_dirs}, queued {queued}",
|
||||||
|
)
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
# Calculate RAM-aware worker count based on work items
|
# Calculate RAM-aware worker count based on work items
|
||||||
worker_count, per_job_memory_limit = max_workers(requested_workers, work_items, verbose=verbose)
|
worker_count, ram_limits_dict = max_workers(requested_workers, work_items, verbose=verbose)
|
||||||
|
log(
|
||||||
|
"init",
|
||||||
|
f"Preparing to compress {len(work_items)} sequence(s) with {worker_count} worker(s)",
|
||||||
|
)
|
||||||
|
|
||||||
updated_paths: list[Path] = []
|
updated_paths: list[Path] = []
|
||||||
|
|
||||||
@@ -738,7 +858,7 @@ def run_zip(requested_workers: int | None, *, verbose: bool) -> int:
|
|||||||
|
|
||||||
with ThreadPoolExecutor(max_workers=worker_count) as executor:
|
with ThreadPoolExecutor(max_workers=worker_count) as executor:
|
||||||
future_map = {
|
future_map = {
|
||||||
executor.submit(process_zip, seq_dir, zip_path, state_path, seq_state, per_job_memory_limit, verbose=verbose): seq_dir
|
executor.submit(process_zip, seq_dir, zip_path, state_path, seq_state, ram_limits_dict.get(seq_dir), verbose=verbose): seq_dir
|
||||||
for seq_dir, zip_path, state_path, seq_state in work_items
|
for seq_dir, zip_path, state_path, seq_state in work_items
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -758,6 +878,11 @@ def run_zip(requested_workers: int | None, *, verbose: bool) -> int:
|
|||||||
verbose_only=True,
|
verbose_only=True,
|
||||||
verbose=verbose,
|
verbose=verbose,
|
||||||
)
|
)
|
||||||
|
log(
|
||||||
|
"scan",
|
||||||
|
f"Summary: scanned {total_scanned}, quick-skipped {quick_skipped}, "
|
||||||
|
f"state-skipped {state_skipped}, empty {empty_dirs}, queued {queued}",
|
||||||
|
)
|
||||||
|
|
||||||
removed = cleanup_orphan_archives(verbose=verbose)
|
removed = cleanup_orphan_archives(verbose=verbose)
|
||||||
if removed:
|
if removed:
|
||||||
@@ -851,6 +976,13 @@ def cleanup_orphan_archives(*, verbose: bool) -> int:
|
|||||||
|
|
||||||
def main() -> int:
|
def main() -> int:
|
||||||
args = parse_args()
|
args = parse_args()
|
||||||
|
log("init", "zip_sequences starting up...")
|
||||||
|
log("init", f"Working directory: {Path.cwd()}")
|
||||||
|
log("init", f"Mode: {args.mode}, zipper: {ZIPPER_TYPE}, jobs arg: {args.jobs or 'auto'}")
|
||||||
|
if ZIPPER_TYPE == "7z":
|
||||||
|
exe = SEVEN_Z_EXE or "not found"
|
||||||
|
max_inst = MAX_7Z_INSTANCES if MAX_7Z_INSTANCES is not None else "auto"
|
||||||
|
log("init", f"7z executable: {exe}, Max7zInst: {max_inst}")
|
||||||
|
|
||||||
if args.mode == "expand":
|
if args.mode == "expand":
|
||||||
# For expand mode, use simple CPU-based worker calculation
|
# For expand mode, use simple CPU-based worker calculation
|
||||||
|
|||||||
Reference in New Issue
Block a user