From d7fc130025af725105cc29e169166c448dba748e Mon Sep 17 00:00:00 2001 From: Sheepit Renderfarm Date: Thu, 12 Dec 2024 22:05:13 +0000 Subject: [PATCH] Ref: parallele unzip, to improve performance --- rulesets/java-sheepit.xml | 2 +- .../com/sheepit/client/ChunkInputStream.java | 71 ++-- .../java/com/sheepit/client/UnzipUtils.java | 353 ++++++++++++++++++ src/main/java/com/sheepit/client/Utils.java | 55 +-- 4 files changed, 391 insertions(+), 90 deletions(-) create mode 100644 src/main/java/com/sheepit/client/UnzipUtils.java diff --git a/rulesets/java-sheepit.xml b/rulesets/java-sheepit.xml index 5224df8..839f362 100644 --- a/rulesets/java-sheepit.xml +++ b/rulesets/java-sheepit.xml @@ -40,7 +40,7 @@ - + diff --git a/src/main/java/com/sheepit/client/ChunkInputStream.java b/src/main/java/com/sheepit/client/ChunkInputStream.java index fd52b98..e056eaf 100644 --- a/src/main/java/com/sheepit/client/ChunkInputStream.java +++ b/src/main/java/com/sheepit/client/ChunkInputStream.java @@ -2,74 +2,61 @@ package com.sheepit.client; import lombok.NonNull; -import java.io.BufferedInputStream; +import java.io.File; import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; -import java.nio.file.Path; import java.util.ArrayDeque; import java.util.Deque; import java.util.List; -import java.util.NoSuchElementException; public class ChunkInputStream extends InputStream { - @NonNull private final Deque chunkPaths; - @NonNull private BufferedInputStream currentStream; + @NonNull private final Deque chunkPaths; + @NonNull private InputStream currentStream; /** * Given a list of chunk paths, provides an InputStream that reads the contents of these files in the order they were provided in. * @param chunkPaths Non-empty, ordered list of paths * @throws IOException If the first chunk could not be found. Errors with other chunk will be thrown during calls to read() */ - public ChunkInputStream(List chunkPaths) throws IOException { + public ChunkInputStream(List chunkPaths) throws IOException { this.chunkPaths = new ArrayDeque<>(chunkPaths); - /// Setup the first chunk for reading - prepareNextChunk(); - } - - private void prepareNextChunk() throws FileNotFoundException, NoSuchElementException { - currentStream = new BufferedInputStream(new FileInputStream(chunkPaths.removeFirst().toFile())); + // Setup the first chunk for reading + currentStream = new FileInputStream(this.chunkPaths.removeFirst()); } @Override public int read() throws IOException { - int result = currentStream.read(); - - if (result == -1) { - /// Finished reading from this chunk, continue with the next if possible - try { - prepareNextChunk(); + while (true) { + int result = currentStream.read(); + // Finished reading from this chunk, continue with the next if possible + if (result > 0 || tryOpenNextChunk() == false) { + return result; } - catch (NoSuchElementException e) { - /// This was the last chunk - return -1; - } - result = currentStream.read(); } - - return result; } - /// Improve throughput by offering method to read whole block of bytes at once @Override public int read(byte[] b, int off, int len) throws IOException { - int bytesRead = -1; - - while (bytesRead == -1) { - bytesRead = currentStream.read(b, off, len); - - if (bytesRead == -1) { - try { - prepareNextChunk(); - } - catch (NoSuchElementException e) { - /// This was the last chunk - return -1; - } + while (true) { + var read = currentStream.read(b, off, len); + // Finished reading from this chunk, continue with the next if possible + if (read > 0 || tryOpenNextChunk() == false) { + return read; } } - - return bytesRead; + } + + @Override public void close() throws IOException { + currentStream.close(); + } + + private boolean tryOpenNextChunk() throws IOException { + if (chunkPaths.isEmpty()) { + return false; + } + currentStream.close(); + currentStream = new FileInputStream(chunkPaths.removeFirst()); + return true; } } diff --git a/src/main/java/com/sheepit/client/UnzipUtils.java b/src/main/java/com/sheepit/client/UnzipUtils.java new file mode 100644 index 0000000..69a5b51 --- /dev/null +++ b/src/main/java/com/sheepit/client/UnzipUtils.java @@ -0,0 +1,353 @@ +package com.sheepit.client; + +import net.lingala.zip4j.io.inputstream.ZipInputStream; +import net.lingala.zip4j.model.LocalFileHeader; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; + +public class UnzipUtils { + + private static final int MAX_IO_RETRIES = 32; + private static final int MAX_IO_WRITTER_THREADS = 6; + + private interface ZipWriteCmd { + int memSize(); + + void execute(Lock dirLock) throws IOException; + + final class MkDir implements ZipWriteCmd { + + private final File folder; + + public MkDir(File folder) { + this.folder = Objects.requireNonNull(folder); + } + + @Override public int memSize() { + return folder.getPath().length(); + } + + @Override public void execute(Lock dirLock) throws IOException { + lockedMkdirs(dirLock, folder); + } + } + + final class SmallFile implements ZipWriteCmd { + + private final File file; + private final byte[] data; + + public SmallFile(File file, byte[] data) { + this.file = file; + this.data = data.clone(); + } + + @Override public int memSize() { + return data.length + file.getPath().length(); + } + + @Override public void execute(Lock dirLock) throws IOException { + lockedMkdirs(dirLock, file.getParentFile()); + exponentialBackoff(() -> { + try (var os = new FileOutputStream(file)) { + os.write(data); + } + }); + } + } + + final class BigFile implements ZipWriteCmd { + + private static final int CHUNK_SIZE = 1024 * 1024; + + public final BlockingDeque chunks = new LinkedBlockingDeque<>(8); + public boolean doneAdding; + + private final File file; + + public BigFile(File file) { + this.file = file; + } + + @Override public int memSize() { + int[] sum = { doneAdding ? 0 : CHUNK_SIZE }; + chunks.forEach(bb -> sum[0] += bb.length); + return sum[0]; + } + + @Override public void execute(Lock dirLock) throws IOException { + + lockedMkdirs(dirLock, file.getParentFile()); + + try (var raFile = exponentialBackoff(() -> new RandomAccessFile(file, "rw"))) { + while (true) { + if (doneAdding && chunks.isEmpty()) { + return; + } + byte[] chunk = chunks.poll(10, TimeUnit.MILLISECONDS); + if (chunk == null) { + continue; + } + + var pos = raFile.getFilePointer(); + + exponentialBackoff(() -> { + raFile.seek(pos); + raFile.write(chunk); + }); + } + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + } + } + + private static final class WriteQueue { + + private static final int MAX_SIZE = 64 * 1024 * 1024; + + private final BlockingDeque cmds = new LinkedBlockingDeque<>(); + private final Set workingCommands = Collections.synchronizedSet(new HashSet<>()); + private final Object hasSpaceLock = new Object(); + + public IOException err; + + private final Lock dirLock = new ReentrantLock(); + + private boolean doneAdding; + + private boolean tryExecute() throws IOException, InterruptedException { + ZipWriteCmd cmd = cmds.poll(10, TimeUnit.MILLISECONDS); + if (cmd == null) { + return false; + } + + workingCommands.add(cmd); + cmd.execute(dirLock); + workingCommands.remove(cmd); + + synchronized (hasSpaceLock) { + hasSpaceLock.notify(); + } + return true; + } + + private void submit(ZipWriteCmd cmd) throws InterruptedException { + while (!hasSpaceFor(cmd)) { + synchronized (hasSpaceLock) { + hasSpaceLock.wait(100); + } + } + cmds.add(cmd); + } + + private boolean hasSpaceFor(ZipWriteCmd command) { + int[] sum = { command.memSize() }; + Consumer acum = cmd -> sum[0] += cmd.memSize(); + cmds.forEach(acum); + workingCommands.forEach(acum); + return sum[0] <= MAX_SIZE; + } + + } + + /** + * This method will extract all contents from a zip file contained within the {@link InputStream}. The current thread reads the data, decompresses it + * and puts it in a buffer. A set of workers will drain the buffer and write it to the destination. Multiple works are useful for mitigating the slow + * creation of many small files or writing over a NAS. + * + * @param zipFile the contents of the archive + * @param destination Path to the target directory where files will be extracted to + * @param password (optional) the password to decrypt the zip with + * @return Any error that occurred + */ + public static Throwable parallelUnzip(InputStream zipFile, File destination, char[] password) throws IOException, InterruptedException { + var workerCount = Math.min(MAX_IO_WRITTER_THREADS, Runtime.getRuntime().availableProcessors() + 1); + return parallelUnzip(zipFile, destination, password, workerCount); + } + + /** + * This method will extract all contents from a zip file contained within the {@link InputStream}. The current thread reads the data, decompresses it + * and puts it in a buffer. A set of workers will drain the buffer and write it to the destination. Multiple works are useful for mitigating the slow + * creation of many small files or writing over a NAS. + * + * @param zipFile the contents of the archive + * @param destination Path to the target directory where files will be extracted to + * @param password (optional) the password to decrypt the zip with + * @param workerCount the number of workers for writing the output + * @return Any error that occurred + */ + public static Throwable parallelUnzip(InputStream zipFile, File destination, char[] password, int workerCount) throws IOException, InterruptedException { + if (workerCount < 1) { + throw new IllegalArgumentException("Workers must be greater or equal to 1"); + } + + var queue = new WriteQueue(); + ExecutorService executor = Executors.newFixedThreadPool(workerCount); + for (int i = 0; i < workerCount; i++) { + executor.submit(() -> {writeTask(queue); return null;}); + } + + try { + readArchiveToQueue(zipFile, destination, password, queue); + writeWait(queue, executor); + return queue.err; + } + finally { + writeWait(queue, executor); + } + } + + private static void writeTask(WriteQueue queue) { + while (queue.err == null) { + try { + if (!queue.tryExecute()) { + if (queue.doneAdding) { + break; + } + } + } + catch (IOException e) { + queue.err = e; + break; + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + private static final Executor GC_RUNNER = new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); + + private static void writeWait(WriteQueue queue, ExecutorService executor) throws InterruptedException { + queue.doneAdding = true; + executor.shutdown(); + //Encourage the JVM to release memory used while decompressing + GC_RUNNER.execute(System::gc); + } + + private static void readArchiveToQueue(InputStream zipFile, File destination, char[] password, WriteQueue queue) throws IOException, InterruptedException { + try (var zipInputStream = new ZipInputStream(zipFile)) { + if (password != null) { + zipInputStream.setPassword(password); + } + LocalFileHeader fileHeader; + while ((fileHeader = zipInputStream.getNextEntry()) != null) { + var outFile = new File(destination, fileHeader.getFileName()); + + //Checks if the file is a directory + if (fileHeader.isDirectory()) { + queue.submit(new ZipWriteCmd.MkDir(outFile)); + continue; + } + + if (fileHeader.getUncompressedSize() < WriteQueue.MAX_SIZE / 10) { + queue.submit(new ZipWriteCmd.SmallFile(outFile, zipInputStream.readAllBytes())); + } + else { + readBigFile(outFile, zipInputStream, queue); + } + } + } + } + + private static void readBigFile(File outFile, ZipInputStream zipInputStream, WriteQueue queue) throws IOException, InterruptedException { + var bf = new ZipWriteCmd.BigFile(outFile); + queue.submit(bf); + try { + var buff = new byte[ZipWriteCmd.BigFile.CHUNK_SIZE]; + int read; + while ((read = zipInputStream.read(buff)) != -1) { + var chunk = Arrays.copyOf(buff, read); + bf.chunks.put(chunk); + } + } + finally { + bf.doneAdding = true; + } + } + + /** + * Creates a directory with a lock. The lock is useful here as conflicting multithreaded requests can make each other fail sometimes. + */ + private static void lockedMkdirs(Lock dirLock, File file) throws IOException { + if (file.isDirectory()) { + return; + } + dirLock.lock(); + try { + exponentialBackoff(() -> { + if (!file.isDirectory() && !file.mkdirs()) { + throw new IOException("Failed to create folder: " + file); + } + }); + } + finally { + dirLock.unlock(); + } + } + + private interface IoOp { + void run() throws IOException; + } + + private interface IoOpVal { + T run() throws IOException; + } + + private static void exponentialBackoff(IoOp ioOp) throws IOException { + exponentialBackoff(() -> { + ioOp.run(); + return null; + }); + } + + private static T exponentialBackoff(IoOpVal ioOp) throws IOException { + IOException err = null; + for (int i = 0; i < MAX_IO_RETRIES; i++) { + try { + return ioOp.run(); + } + catch (IOException e) { + if (err == null) { + err = e; + } + exponentialBackoffSleep(i); + } + } + throw err; + } + + private static void exponentialBackoffSleep(int i) { + var i1 = i + 1; + try { + Thread.sleep(2L * i1 * i1); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/main/java/com/sheepit/client/Utils.java b/src/main/java/com/sheepit/client/Utils.java index 81d262f..66450ce 100644 --- a/src/main/java/com/sheepit/client/Utils.java +++ b/src/main/java/com/sheepit/client/Utils.java @@ -21,13 +21,10 @@ package com.sheepit.client; import net.lingala.zip4j.ZipFile; import net.lingala.zip4j.exception.ZipException; -import net.lingala.zip4j.io.inputstream.ZipInputStream; -import net.lingala.zip4j.model.LocalFileHeader; import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.PrintWriter; @@ -104,54 +101,18 @@ public class Utils { * @return A status code as an integer, -1 if it encounters a IOException, 0 otherwise */ public static int unzipChunksIntoDirectory(List full_path_chunks, String destinationDirectory, char[] password, Log log) { + // STEP 1: Create the paths for ChunkInputStream, which will read the chunks' contents in order + var chunks = full_path_chunks.stream().map(File::new).collect(Collectors.toList()); + + // STEP 2: unzip the zip like before try { - // STEP 1: Create a ChunkInputStream, which will read the chunks' contents in order - ChunkInputStream chunkInputStream = new ChunkInputStream(full_path_chunks.stream().map(Paths::get).collect(Collectors.toList())); - - // STEP 2: unzip the zip like before - ZipInputStream zipInputStream = new ZipInputStream(chunkInputStream); - if (password != null) { - zipInputStream.setPassword(password); - } - LocalFileHeader fileHeader; - while ((fileHeader = zipInputStream.getNextEntry()) != null) { - String outFilePath = destinationDirectory + File.separator + fileHeader.getFileName(); - File outFile = new File(outFilePath); - - //Checks if the file is a directory - if (fileHeader.isDirectory()) { - outFile.mkdirs(); - continue; - } - - File parentDir = outFile.getParentFile(); - if (parentDir.exists() == false) { - parentDir.mkdirs(); - } - - FileOutputStream os = new FileOutputStream(outFile); - - int readLen; - byte[] buff = new byte[1024]; - - //Loop until End of File and write the contents to the output stream - while ((readLen = zipInputStream.read(buff)) != -1) { - os.write(buff, 0, readLen); - } - - os.close(); - } - zipInputStream.close(); + UnzipUtils.parallelUnzip(new BufferedInputStream(new ChunkInputStream(chunks)), new File(destinationDirectory), password); + return 0; } - catch (IOException e) { - StringWriter sw = new StringWriter(); - PrintWriter pw = new PrintWriter(sw); - e.printStackTrace(pw); - log.debug("Utils::unzipChunksIntoDirectory exception " + e + " stacktrace: " + sw.toString()); + catch (IOException | InterruptedException e) { + log.debug("Utils::unzipChunksIntoDirectory exception " + e); return -1; } - - return 0; } /**