package com.sheepit.client.zip; import net.lingala.zip4j.io.inputstream.ZipInputStream; import net.lingala.zip4j.model.LocalFileHeader; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.RandomAccessFile; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Objects; import java.util.Set; import java.util.concurrent.BlockingDeque; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; public class UnzipUtils { private static final int MAX_IO_RETRIES = 32; private static final int MAX_IO_WRITTER_THREADS = 6; private interface ZipWriteCmd { int memSize(); void execute(Lock dirLock) throws IOException; final class MkDir implements ZipWriteCmd { private final File folder; public MkDir(File folder) { this.folder = Objects.requireNonNull(folder); } @Override public int memSize() { return folder.getPath().length(); } @Override public void execute(Lock dirLock) throws IOException { lockedMkdirs(dirLock, folder); } } final class SmallFile implements ZipWriteCmd { private final File file; private final byte[] data; public SmallFile(File file, byte[] data) { this.file = file; this.data = data.clone(); } @Override public int memSize() { return data.length + file.getPath().length(); } @Override public void execute(Lock dirLock) throws IOException { lockedMkdirs(dirLock, file.getParentFile()); exponentialBackoff(() -> { try (var os = new FileOutputStream(file)) { os.write(data); } }); } } final class BigFile implements ZipWriteCmd { private static final int CHUNK_SIZE = 1024 * 1024; public final BlockingDeque chunks = new LinkedBlockingDeque<>(8); public boolean doneAdding; private final File file; public BigFile(File file) { this.file = file; } @Override public int memSize() { int[] sum = { doneAdding ? 0 : CHUNK_SIZE }; chunks.forEach(bb -> sum[0] += bb.length); return sum[0]; } @Override public void execute(Lock dirLock) throws IOException { lockedMkdirs(dirLock, file.getParentFile()); try (var raFile = exponentialBackoff(() -> new RandomAccessFile(file, "rw"))) { while (true) { if (doneAdding && chunks.isEmpty()) { return; } byte[] chunk = chunks.poll(10, TimeUnit.MILLISECONDS); if (chunk == null) { continue; } var pos = raFile.getFilePointer(); exponentialBackoff(() -> { raFile.seek(pos); raFile.write(chunk); }); } } catch (InterruptedException e) { throw new RuntimeException(e); } } } } private static final class WriteQueue { private static final int MAX_SIZE = 64 * 1024 * 1024; private final BlockingDeque cmds = new LinkedBlockingDeque<>(); private final Set workingCommands = Collections.synchronizedSet(new HashSet<>()); private final Object hasSpaceLock = new Object(); public IOException err; private final Lock dirLock = new ReentrantLock(); private boolean doneAdding; private boolean tryExecute() throws IOException, InterruptedException { ZipWriteCmd cmd = cmds.poll(10, TimeUnit.MILLISECONDS); if (cmd == null) { return false; } workingCommands.add(cmd); cmd.execute(dirLock); workingCommands.remove(cmd); synchronized (hasSpaceLock) { hasSpaceLock.notify(); } return true; } private void submit(ZipWriteCmd cmd) throws InterruptedException { while (!hasSpaceFor(cmd)) { synchronized (hasSpaceLock) { hasSpaceLock.wait(100); } } cmds.add(cmd); } private boolean hasSpaceFor(ZipWriteCmd command) { int[] sum = { command.memSize() }; Consumer acum = cmd -> sum[0] += cmd.memSize(); cmds.forEach(acum); workingCommands.forEach(acum); return sum[0] <= MAX_SIZE; } } /** * This method will extract all contents from a zip file contained within the {@link InputStream}. The current thread reads the data, decompresses it * and puts it in a buffer. A set of workers will drain the buffer and write it to the destination. Multiple works are useful for mitigating the slow * creation of many small files or writing over a NAS. * * @param zipFile the contents of the archive * @param destination Path to the target directory where files will be extracted to * @param password (optional) the password to decrypt the zip with * @return Any error that occurred */ public static Throwable parallelUnzip(InputStream zipFile, File destination, char[] password) throws IOException, InterruptedException { var workerCount = Math.min(MAX_IO_WRITTER_THREADS, Runtime.getRuntime().availableProcessors() + 1); return parallelUnzip(zipFile, destination, password, workerCount); } /** * This method will extract all contents from a zip file contained within the {@link InputStream}. The current thread reads the data, decompresses it * and puts it in a buffer. A set of workers will drain the buffer and write it to the destination. Multiple works are useful for mitigating the slow * creation of many small files or writing over a NAS. * * @param zipFile the contents of the archive * @param destination Path to the target directory where files will be extracted to * @param password (optional) the password to decrypt the zip with * @param workerCount the number of workers for writing the output * @return Any error that occurred */ public static Throwable parallelUnzip(InputStream zipFile, File destination, char[] password, int workerCount) throws IOException, InterruptedException { if (workerCount < 1) { throw new IllegalArgumentException("Workers must be greater or equal to 1"); } var queue = new WriteQueue(); ExecutorService executor = Executors.newFixedThreadPool(workerCount); for (int i = 0; i < workerCount; i++) { executor.submit(() -> {writeTask(queue); return null;}); } try { readArchiveToQueue(zipFile, destination, password, queue); writeWait(queue, executor); return queue.err; } finally { writeWait(queue, executor); } } private static void writeTask(WriteQueue queue) { while (queue.err == null) { try { if (!queue.tryExecute()) { if (queue.doneAdding) { break; } } } catch (IOException e) { queue.err = e; break; } catch (InterruptedException e) { throw new RuntimeException(e); } } } private static final Executor GC_RUNNER = new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); private static void writeWait(WriteQueue queue, ExecutorService executor) throws InterruptedException { queue.doneAdding = true; executor.shutdown(); try { if (executor.awaitTermination(60, TimeUnit.MINUTES) == false) { executor.shutdownNow(); } } catch (InterruptedException e) { executor.shutdownNow(); Thread.currentThread().interrupt(); } //Encourage the JVM to release memory used while decompressing GC_RUNNER.execute(System::gc); } private static void readArchiveToQueue(InputStream zipFile, File destination, char[] password, WriteQueue queue) throws IOException, InterruptedException { try (var zipInputStream = new ZipInputStream(zipFile)) { if (password != null) { zipInputStream.setPassword(password); } LocalFileHeader fileHeader; while ((fileHeader = zipInputStream.getNextEntry()) != null) { var outFile = new File(destination, fileHeader.getFileName()); //Checks if the file is a directory if (fileHeader.isDirectory()) { queue.submit(new ZipWriteCmd.MkDir(outFile)); continue; } if (fileHeader.getUncompressedSize() < WriteQueue.MAX_SIZE / 10) { queue.submit(new ZipWriteCmd.SmallFile(outFile, zipInputStream.readAllBytes())); } else { readBigFile(outFile, zipInputStream, queue); } } } } private static void readBigFile(File outFile, ZipInputStream zipInputStream, WriteQueue queue) throws IOException, InterruptedException { var bf = new ZipWriteCmd.BigFile(outFile); queue.submit(bf); try { var buff = new byte[ZipWriteCmd.BigFile.CHUNK_SIZE]; int read; while ((read = zipInputStream.read(buff)) != -1) { var chunk = Arrays.copyOf(buff, read); bf.chunks.put(chunk); } } finally { bf.doneAdding = true; } } /** * Creates a directory with a lock. The lock is useful here as conflicting multithreaded requests can make each other fail sometimes. */ private static void lockedMkdirs(Lock dirLock, File file) throws IOException { if (file.isDirectory()) { return; } dirLock.lock(); try { exponentialBackoff(() -> { if (!file.isDirectory() && !file.mkdirs()) { throw new IOException("Failed to create folder: " + file); } }); } finally { dirLock.unlock(); } } private interface IoOp { void run() throws IOException; } private interface IoOpVal { T run() throws IOException; } private static void exponentialBackoff(IoOp ioOp) throws IOException { exponentialBackoff(() -> { ioOp.run(); return null; }); } private static T exponentialBackoff(IoOpVal ioOp) throws IOException { IOException err = null; for (int i = 0; i < MAX_IO_RETRIES; i++) { try { return ioOp.run(); } catch (IOException e) { if (err == null) { err = e; } exponentialBackoffSleep(i); } } throw err; } private static void exponentialBackoffSleep(int i) { var i1 = i + 1; try { Thread.sleep(2L * i1 * i1); } catch (InterruptedException e) { throw new RuntimeException(e); } } }