7zinst fix bin-packing vs concurrent behavior for max CPU and RAM allocation
This commit is contained in:
107
zip_sequences.py
107
zip_sequences.py
@@ -210,31 +210,17 @@ def estimate_ram_per_job(seq_dir: Path, seq_state: dict) -> int:
|
||||
|
||||
if ZIPPER_TYPE == "7z":
|
||||
# Fixed dictionary size: 1GB (1024MB)
|
||||
FIXED_DICT_SIZE_MB = 1024
|
||||
FIXED_DICT_SIZE_BYTES = FIXED_DICT_SIZE_MB * 1024 * 1024
|
||||
# Actual observed usage: ~2GB per process regardless of sequence size
|
||||
# Use a tight estimate to allow maximum concurrency
|
||||
# Estimate 3GB per job (50% safety margin over observed ~2GB)
|
||||
base_ram = 3 * 1024 * 1024 * 1024 # 3GB base estimate
|
||||
|
||||
# Dictionary RAM: ~11x dictionary size (1GB dict = 11GB RAM for dictionary operations)
|
||||
# This is the main memory consumer for 7z LZMA compression
|
||||
dict_ram = FIXED_DICT_SIZE_BYTES * 11 # 11GB for 1GB dictionary
|
||||
|
||||
# Input buffer: 7z processes in chunks, but for very large sequences needs more RAM
|
||||
# For sequences >100GB, use a larger buffer factor to handle file metadata and processing
|
||||
if total_bytes > 500 * 1024 * 1024 * 1024: # >500GB (extremely large)
|
||||
# Extremely large sequences: use 8% of size, capped at 64GB
|
||||
# This accounts for file metadata, directory structures, and processing overhead
|
||||
input_buffer = min(int(total_bytes * 0.08), 64 * 1024 * 1024 * 1024)
|
||||
overhead = 8 * 1024 * 1024 * 1024 # 8GB overhead for extremely large sequences
|
||||
elif total_bytes > 100 * 1024 * 1024 * 1024: # >100GB
|
||||
# Very large sequences: use 5% of size, capped at 32GB
|
||||
input_buffer = min(int(total_bytes * 0.05), 32 * 1024 * 1024 * 1024)
|
||||
overhead = 4 * 1024 * 1024 * 1024 # 4GB for large sequences
|
||||
# For very large sequences (>50GB), add small buffer
|
||||
if total_bytes > 50 * 1024 * 1024 * 1024: # >50GB
|
||||
# Add 1GB for very large sequences
|
||||
estimated_ram = base_ram + (1 * 1024 * 1024 * 1024)
|
||||
else:
|
||||
# Smaller sequences: use 15% of size, capped at 2GB
|
||||
input_buffer = min(int(total_bytes * 0.15), 2 * 1024 * 1024 * 1024)
|
||||
overhead = 1 * 1024 * 1024 * 1024 # 1GB for smaller sequences
|
||||
|
||||
# Total RAM estimate
|
||||
estimated_ram = dict_ram + input_buffer + overhead
|
||||
estimated_ram = base_ram
|
||||
|
||||
return estimated_ram
|
||||
else:
|
||||
@@ -312,13 +298,13 @@ def max_workers(
|
||||
return (final_limit, ram_limits_dict)
|
||||
|
||||
# Auto-calculate based on RAM if Max7zInst not configured
|
||||
# Use bin-packing algorithm with size-aware RAM estimation
|
||||
# Prioritize maximum concurrency: one worker per sequence when possible
|
||||
if available_ram is not None:
|
||||
# available_ram is already 80% of total (20% reserved for system)
|
||||
# Use 95% of available RAM for compression jobs
|
||||
compression_ram = int(available_ram * 0.95)
|
||||
|
||||
# Estimate RAM for each work item and create list with (seq_dir, estimated_ram, work_item)
|
||||
# Estimate RAM for each work item
|
||||
work_items_with_ram: list[tuple[Path, int, tuple[Path, Path, Path, dict]]] = []
|
||||
ram_limits_dict: dict[Path, int] = {}
|
||||
|
||||
@@ -328,38 +314,47 @@ def max_workers(
|
||||
work_items_with_ram.append((seq_dir, estimated_ram, (seq_dir, zip_path, state_path, seq_state)))
|
||||
ram_limits_dict[seq_dir] = estimated_ram
|
||||
except Exception:
|
||||
# If estimation fails, use a safe default (12GB minimum for 1GB dict)
|
||||
default_ram = 12 * 1024 * 1024 * 1024 # 12GB
|
||||
# If estimation fails, use a safe default (3GB)
|
||||
default_ram = 3 * 1024 * 1024 * 1024 # 3GB
|
||||
work_items_with_ram.append((seq_dir, default_ram, (seq_dir, zip_path, state_path, seq_state)))
|
||||
ram_limits_dict[seq_dir] = default_ram
|
||||
|
||||
# Sort by estimated RAM (largest first) for bin-packing
|
||||
work_items_with_ram.sort(key=lambda x: x[1], reverse=True)
|
||||
num_work_items = len(work_items) if work_items else 0
|
||||
|
||||
# Bin-packing algorithm: pack largest items first
|
||||
bins: list[list[tuple[Path, int, tuple[Path, Path, Path, dict]]]] = []
|
||||
bin_remaining: list[int] = []
|
||||
# Calculate total estimated RAM
|
||||
total_estimated_ram = sum(ram for _, ram, _ in work_items_with_ram)
|
||||
|
||||
for seq_dir, estimated_ram, work_item in work_items_with_ram:
|
||||
# Try to fit in existing bin
|
||||
placed = False
|
||||
for i, remaining in enumerate(bin_remaining):
|
||||
if remaining >= estimated_ram:
|
||||
bins[i].append((seq_dir, estimated_ram, work_item))
|
||||
bin_remaining[i] -= estimated_ram
|
||||
placed = True
|
||||
break
|
||||
# If all sequences fit in available RAM, use one worker per sequence (maximum concurrency)
|
||||
if total_estimated_ram <= compression_ram:
|
||||
worker_count = num_work_items
|
||||
else:
|
||||
# Not all fit - use bin-packing to minimize workers
|
||||
# Sort by estimated RAM (largest first) for bin-packing
|
||||
work_items_with_ram.sort(key=lambda x: x[1], reverse=True)
|
||||
|
||||
# If doesn't fit, create new bin
|
||||
if not placed:
|
||||
bins.append([(seq_dir, estimated_ram, work_item)])
|
||||
bin_remaining.append(compression_ram - estimated_ram)
|
||||
|
||||
# Worker count is number of bins
|
||||
worker_count = len(bins)
|
||||
# Bin-packing algorithm: pack largest items first
|
||||
bins: list[list[tuple[Path, int, tuple[Path, Path, Path, dict]]]] = []
|
||||
bin_remaining: list[int] = []
|
||||
|
||||
for seq_dir, estimated_ram, work_item in work_items_with_ram:
|
||||
# Try to fit in existing bin
|
||||
placed = False
|
||||
for i, remaining in enumerate(bin_remaining):
|
||||
if remaining >= estimated_ram:
|
||||
bins[i].append((seq_dir, estimated_ram, work_item))
|
||||
bin_remaining[i] -= estimated_ram
|
||||
placed = True
|
||||
break
|
||||
|
||||
# If doesn't fit, create new bin
|
||||
if not placed:
|
||||
bins.append([(seq_dir, estimated_ram, work_item)])
|
||||
bin_remaining.append(compression_ram - estimated_ram)
|
||||
|
||||
# Worker count is number of bins
|
||||
worker_count = len(bins)
|
||||
|
||||
# Cap at number of actual work items (can't have more workers than jobs)
|
||||
num_work_items = len(work_items) if work_items else 0
|
||||
if num_work_items > 0:
|
||||
worker_count = min(worker_count, num_work_items)
|
||||
|
||||
@@ -370,7 +365,7 @@ def max_workers(
|
||||
if verbose:
|
||||
ram_gb = available_ram / (1024 ** 3)
|
||||
compression_ram_gb = compression_ram / (1024 ** 3)
|
||||
total_estimated_gb = sum(ram for _, ram, _ in work_items_with_ram) / (1024 ** 3)
|
||||
total_estimated_gb = total_estimated_ram / (1024 ** 3)
|
||||
log(
|
||||
"zip",
|
||||
f"RAM: {ram_gb:.1f}GB available (80% of total), {compression_ram_gb:.1f}GB for compression (95%)",
|
||||
@@ -383,11 +378,17 @@ def max_workers(
|
||||
verbose_only=True,
|
||||
verbose=verbose
|
||||
)
|
||||
if len(bins) > 0:
|
||||
bin_sizes = [sum(ram for _, ram, _ in bin_items) / (1024 ** 3) for bin_items in bins]
|
||||
if total_estimated_ram <= compression_ram:
|
||||
log(
|
||||
"zip",
|
||||
f"Bin-packing: {worker_count} workers, bin sizes: {[f'{s:.1f}GB' for s in bin_sizes[:5]]}{'...' if len(bin_sizes) > 5 else ''}",
|
||||
f"All sequences fit in RAM → using {worker_count} workers (one per sequence)",
|
||||
verbose_only=True,
|
||||
verbose=verbose
|
||||
)
|
||||
else:
|
||||
log(
|
||||
"zip",
|
||||
f"Using bin-packing: {worker_count} workers needed",
|
||||
verbose_only=True,
|
||||
verbose=verbose
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user