swap back to LZMA for better multithreading

This commit is contained in:
Nathan
2025-11-13 14:22:16 -07:00
parent 06e8177f0b
commit fbc0f833dc
2 changed files with 1666 additions and 10 deletions

View File

@@ -53,6 +53,7 @@ STATE_SUFFIX = ".meta.json"
DEFAULT_CONFIG = {
"zipper": "7z",
"compression": 9,
"compressionMethod": "LZMA2", # Compression method: LZMA2 (multi-threaded), PPMd (single-threaded), BZip2, Deflate
"dailyFormat": "daily_YYMMDD",
"Max7zInst": 0, # Maximum concurrent 7z instances (0 = auto-calculate)
}
@@ -119,6 +120,12 @@ if not isinstance(COMPRESSION_LEVEL, int):
COMPRESSION_LEVEL = 9
COMPRESSION_LEVEL = max(0, min(9, COMPRESSION_LEVEL))
COMPRESSION_METHOD = CONFIG.get("compressionMethod", "LZMA2")
# Validate compression method
valid_methods = {"LZMA2", "PPMd", "BZip2", "Deflate"}
if COMPRESSION_METHOD not in valid_methods:
COMPRESSION_METHOD = "LZMA2" # Default to LZMA2 for multi-threading support
MAX_7Z_INSTANCES = CONFIG.get("Max7zInst", 0)
if MAX_7Z_INSTANCES is not None:
if isinstance(MAX_7Z_INSTANCES, str):
@@ -543,7 +550,7 @@ def state_path_for(zip_path: Path) -> Path:
return zip_path.with_suffix(zip_path.suffix + STATE_SUFFIX)
def zip_sequence(seq_dir: Path, zip_path: Path, per_job_memory_limit: int | None = None) -> None:
def zip_sequence(seq_dir: Path, zip_path: Path, per_job_memory_limit: int | None = None, worker_count: int = 1, *, verbose: bool = False) -> None:
if ZIPPER_TYPE == "7z":
if SEVEN_Z_EXE is None:
raise RuntimeError(
@@ -603,16 +610,54 @@ def zip_sequence(seq_dir: Path, zip_path: Path, per_job_memory_limit: int | None
"-t7z", # Use 7z format, not zip
]
# Always use fixed dictionary size: 1GB (1024MB)
# The per_job_memory_limit parameter is the estimated RAM usage (for logging/info only)
# We keep dictionary at 1GB for best compression regardless of RAM estimate
# Set compression method and memory/dictionary size based on method
# PPMd uses memory parameter in method string, LZMA2 uses -md flag
FIXED_DICT_SIZE_MB = 1024
cmd.append(f"-md={FIXED_DICT_SIZE_MB}m")
if COMPRESSION_METHOD == "PPMd":
# PPMd: specify memory as part of method string
cmd.append(f"-m0=PPMd:mem={FIXED_DICT_SIZE_MB}m")
elif COMPRESSION_METHOD == "LZMA2":
# LZMA2: use -md for dictionary size
cmd.append("-m0=LZMA2")
cmd.append(f"-md={FIXED_DICT_SIZE_MB}m")
elif COMPRESSION_METHOD == "BZip2":
# BZip2: use -md for dictionary size (smaller max: 900KB)
max_bzip2_dict = min(FIXED_DICT_SIZE_MB, 900) # BZip2 max is 900KB
cmd.append("-m0=BZip2")
cmd.append(f"-md={max_bzip2_dict}k")
elif COMPRESSION_METHOD == "Deflate":
# Deflate: doesn't use dictionary size parameter
cmd.append("-m0=Deflate")
else:
# Fallback: use LZMA2
cmd.append("-m0=LZMA2")
cmd.append(f"-md={FIXED_DICT_SIZE_MB}m")
# CPU thread allocation: when there's only 1 worker, use all CPU cores
# When there are multiple workers, use auto mode to let 7z decide
# Note: PPMd is single-threaded and won't benefit from -mmt
cpu_cores = os.cpu_count() or 1
if COMPRESSION_METHOD == "PPMd":
# PPMd is single-threaded, so -mmt won't help
# But we can still set it for consistency
cmd.append("-mmt=on")
elif worker_count == 1:
# Single worker: use all CPU cores for maximum speed (LZMA2, BZip2, Deflate support this)
cmd.append(f"-mmt={cpu_cores}")
else:
# Multiple workers: use auto mode (7z will manage threads)
cmd.append("-mmt=on")
cmd.extend([
str(temp_zip_abs),
f"@{list_file_abs}",
])
# Log the command in verbose mode for debugging
if verbose:
cmd_str = " ".join(cmd)
log("zip", f"7z command: {cmd_str}", verbose_only=True, verbose=verbose)
result = subprocess.run(
cmd,
cwd=seq_dir,
@@ -728,9 +773,9 @@ def expand_sequence(zip_path: Path, seq_state: dict) -> None:
os.utime(file_path, ns=(entry["mtime_ns"], entry["mtime_ns"]))
def process_zip(seq_dir: Path, zip_path: Path, state_path: Path, seq_state: dict, per_job_memory_limit: int | None, *, verbose: bool) -> Sequence[Path]:
def process_zip(seq_dir: Path, zip_path: Path, state_path: Path, seq_state: dict, per_job_memory_limit: int | None, worker_count: int, *, verbose: bool) -> Sequence[Path]:
log("zip", f"{seq_dir} -> {zip_path}", verbose_only=True, verbose=verbose)
zip_sequence(seq_dir, zip_path, per_job_memory_limit)
zip_sequence(seq_dir, zip_path, per_job_memory_limit, worker_count, verbose=verbose)
state_path.write_text(json.dumps(seq_state, indent=2))
return (zip_path, state_path)
@@ -859,7 +904,7 @@ def run_zip(requested_workers: int | None, *, verbose: bool) -> int:
with ThreadPoolExecutor(max_workers=worker_count) as executor:
future_map = {
executor.submit(process_zip, seq_dir, zip_path, state_path, seq_state, ram_limits_dict.get(seq_dir), verbose=verbose): seq_dir
executor.submit(process_zip, seq_dir, zip_path, state_path, seq_state, ram_limits_dict.get(seq_dir), worker_count, verbose=verbose): seq_dir
for seq_dir, zip_path, state_path, seq_state in work_items
}
@@ -983,7 +1028,7 @@ def main() -> int:
if ZIPPER_TYPE == "7z":
exe = SEVEN_Z_EXE or "not found"
max_inst = MAX_7Z_INSTANCES if MAX_7Z_INSTANCES is not None else "auto"
log("init", f"7z executable: {exe}, Max7zInst: {max_inst}")
log("init", f"7z executable: {exe}, Max7zInst: {max_inst}, method: {COMPRESSION_METHOD}, level: {COMPRESSION_LEVEL}")
if args.mode == "expand":
# For expand mode, use simple CPU-based worker calculation