diff --git a/rulesets/java-sheepit.xml b/rulesets/java-sheepit.xml
index 5224df8..839f362 100644
--- a/rulesets/java-sheepit.xml
+++ b/rulesets/java-sheepit.xml
@@ -40,7 +40,7 @@
-
+
diff --git a/src/main/java/com/sheepit/client/ChunkInputStream.java b/src/main/java/com/sheepit/client/ChunkInputStream.java
index fd52b98..e056eaf 100644
--- a/src/main/java/com/sheepit/client/ChunkInputStream.java
+++ b/src/main/java/com/sheepit/client/ChunkInputStream.java
@@ -2,74 +2,61 @@ package com.sheepit.client;
import lombok.NonNull;
-import java.io.BufferedInputStream;
+import java.io.File;
import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
-import java.nio.file.Path;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
-import java.util.NoSuchElementException;
public class ChunkInputStream extends InputStream {
- @NonNull private final Deque chunkPaths;
- @NonNull private BufferedInputStream currentStream;
+ @NonNull private final Deque chunkPaths;
+ @NonNull private InputStream currentStream;
/**
* Given a list of chunk paths, provides an InputStream that reads the contents of these files in the order they were provided in.
* @param chunkPaths Non-empty, ordered list of paths
* @throws IOException If the first chunk could not be found. Errors with other chunk will be thrown during calls to read()
*/
- public ChunkInputStream(List chunkPaths) throws IOException {
+ public ChunkInputStream(List chunkPaths) throws IOException {
this.chunkPaths = new ArrayDeque<>(chunkPaths);
- /// Setup the first chunk for reading
- prepareNextChunk();
- }
-
- private void prepareNextChunk() throws FileNotFoundException, NoSuchElementException {
- currentStream = new BufferedInputStream(new FileInputStream(chunkPaths.removeFirst().toFile()));
+ // Setup the first chunk for reading
+ currentStream = new FileInputStream(this.chunkPaths.removeFirst());
}
@Override public int read() throws IOException {
- int result = currentStream.read();
-
- if (result == -1) {
- /// Finished reading from this chunk, continue with the next if possible
- try {
- prepareNextChunk();
+ while (true) {
+ int result = currentStream.read();
+ // Finished reading from this chunk, continue with the next if possible
+ if (result > 0 || tryOpenNextChunk() == false) {
+ return result;
}
- catch (NoSuchElementException e) {
- /// This was the last chunk
- return -1;
- }
- result = currentStream.read();
}
-
- return result;
}
- /// Improve throughput by offering method to read whole block of bytes at once
@Override public int read(byte[] b, int off, int len) throws IOException {
- int bytesRead = -1;
-
- while (bytesRead == -1) {
- bytesRead = currentStream.read(b, off, len);
-
- if (bytesRead == -1) {
- try {
- prepareNextChunk();
- }
- catch (NoSuchElementException e) {
- /// This was the last chunk
- return -1;
- }
+ while (true) {
+ var read = currentStream.read(b, off, len);
+ // Finished reading from this chunk, continue with the next if possible
+ if (read > 0 || tryOpenNextChunk() == false) {
+ return read;
}
}
-
- return bytesRead;
+ }
+
+ @Override public void close() throws IOException {
+ currentStream.close();
+ }
+
+ private boolean tryOpenNextChunk() throws IOException {
+ if (chunkPaths.isEmpty()) {
+ return false;
+ }
+ currentStream.close();
+ currentStream = new FileInputStream(chunkPaths.removeFirst());
+ return true;
}
}
diff --git a/src/main/java/com/sheepit/client/UnzipUtils.java b/src/main/java/com/sheepit/client/UnzipUtils.java
new file mode 100644
index 0000000..69a5b51
--- /dev/null
+++ b/src/main/java/com/sheepit/client/UnzipUtils.java
@@ -0,0 +1,353 @@
+package com.sheepit.client;
+
+import net.lingala.zip4j.io.inputstream.ZipInputStream;
+import net.lingala.zip4j.model.LocalFileHeader;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+public class UnzipUtils {
+
+ private static final int MAX_IO_RETRIES = 32;
+ private static final int MAX_IO_WRITTER_THREADS = 6;
+
+ private interface ZipWriteCmd {
+ int memSize();
+
+ void execute(Lock dirLock) throws IOException;
+
+ final class MkDir implements ZipWriteCmd {
+
+ private final File folder;
+
+ public MkDir(File folder) {
+ this.folder = Objects.requireNonNull(folder);
+ }
+
+ @Override public int memSize() {
+ return folder.getPath().length();
+ }
+
+ @Override public void execute(Lock dirLock) throws IOException {
+ lockedMkdirs(dirLock, folder);
+ }
+ }
+
+ final class SmallFile implements ZipWriteCmd {
+
+ private final File file;
+ private final byte[] data;
+
+ public SmallFile(File file, byte[] data) {
+ this.file = file;
+ this.data = data.clone();
+ }
+
+ @Override public int memSize() {
+ return data.length + file.getPath().length();
+ }
+
+ @Override public void execute(Lock dirLock) throws IOException {
+ lockedMkdirs(dirLock, file.getParentFile());
+ exponentialBackoff(() -> {
+ try (var os = new FileOutputStream(file)) {
+ os.write(data);
+ }
+ });
+ }
+ }
+
+ final class BigFile implements ZipWriteCmd {
+
+ private static final int CHUNK_SIZE = 1024 * 1024;
+
+ public final BlockingDeque chunks = new LinkedBlockingDeque<>(8);
+ public boolean doneAdding;
+
+ private final File file;
+
+ public BigFile(File file) {
+ this.file = file;
+ }
+
+ @Override public int memSize() {
+ int[] sum = { doneAdding ? 0 : CHUNK_SIZE };
+ chunks.forEach(bb -> sum[0] += bb.length);
+ return sum[0];
+ }
+
+ @Override public void execute(Lock dirLock) throws IOException {
+
+ lockedMkdirs(dirLock, file.getParentFile());
+
+ try (var raFile = exponentialBackoff(() -> new RandomAccessFile(file, "rw"))) {
+ while (true) {
+ if (doneAdding && chunks.isEmpty()) {
+ return;
+ }
+ byte[] chunk = chunks.poll(10, TimeUnit.MILLISECONDS);
+ if (chunk == null) {
+ continue;
+ }
+
+ var pos = raFile.getFilePointer();
+
+ exponentialBackoff(() -> {
+ raFile.seek(pos);
+ raFile.write(chunk);
+ });
+ }
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+ }
+
+ private static final class WriteQueue {
+
+ private static final int MAX_SIZE = 64 * 1024 * 1024;
+
+ private final BlockingDeque cmds = new LinkedBlockingDeque<>();
+ private final Set workingCommands = Collections.synchronizedSet(new HashSet<>());
+ private final Object hasSpaceLock = new Object();
+
+ public IOException err;
+
+ private final Lock dirLock = new ReentrantLock();
+
+ private boolean doneAdding;
+
+ private boolean tryExecute() throws IOException, InterruptedException {
+ ZipWriteCmd cmd = cmds.poll(10, TimeUnit.MILLISECONDS);
+ if (cmd == null) {
+ return false;
+ }
+
+ workingCommands.add(cmd);
+ cmd.execute(dirLock);
+ workingCommands.remove(cmd);
+
+ synchronized (hasSpaceLock) {
+ hasSpaceLock.notify();
+ }
+ return true;
+ }
+
+ private void submit(ZipWriteCmd cmd) throws InterruptedException {
+ while (!hasSpaceFor(cmd)) {
+ synchronized (hasSpaceLock) {
+ hasSpaceLock.wait(100);
+ }
+ }
+ cmds.add(cmd);
+ }
+
+ private boolean hasSpaceFor(ZipWriteCmd command) {
+ int[] sum = { command.memSize() };
+ Consumer acum = cmd -> sum[0] += cmd.memSize();
+ cmds.forEach(acum);
+ workingCommands.forEach(acum);
+ return sum[0] <= MAX_SIZE;
+ }
+
+ }
+
+ /**
+ * This method will extract all contents from a zip file contained within the {@link InputStream}. The current thread reads the data, decompresses it
+ * and puts it in a buffer. A set of workers will drain the buffer and write it to the destination. Multiple works are useful for mitigating the slow
+ * creation of many small files or writing over a NAS.
+ *
+ * @param zipFile the contents of the archive
+ * @param destination Path to the target directory where files will be extracted to
+ * @param password (optional) the password to decrypt the zip with
+ * @return Any error that occurred
+ */
+ public static Throwable parallelUnzip(InputStream zipFile, File destination, char[] password) throws IOException, InterruptedException {
+ var workerCount = Math.min(MAX_IO_WRITTER_THREADS, Runtime.getRuntime().availableProcessors() + 1);
+ return parallelUnzip(zipFile, destination, password, workerCount);
+ }
+
+ /**
+ * This method will extract all contents from a zip file contained within the {@link InputStream}. The current thread reads the data, decompresses it
+ * and puts it in a buffer. A set of workers will drain the buffer and write it to the destination. Multiple works are useful for mitigating the slow
+ * creation of many small files or writing over a NAS.
+ *
+ * @param zipFile the contents of the archive
+ * @param destination Path to the target directory where files will be extracted to
+ * @param password (optional) the password to decrypt the zip with
+ * @param workerCount the number of workers for writing the output
+ * @return Any error that occurred
+ */
+ public static Throwable parallelUnzip(InputStream zipFile, File destination, char[] password, int workerCount) throws IOException, InterruptedException {
+ if (workerCount < 1) {
+ throw new IllegalArgumentException("Workers must be greater or equal to 1");
+ }
+
+ var queue = new WriteQueue();
+ ExecutorService executor = Executors.newFixedThreadPool(workerCount);
+ for (int i = 0; i < workerCount; i++) {
+ executor.submit(() -> {writeTask(queue); return null;});
+ }
+
+ try {
+ readArchiveToQueue(zipFile, destination, password, queue);
+ writeWait(queue, executor);
+ return queue.err;
+ }
+ finally {
+ writeWait(queue, executor);
+ }
+ }
+
+ private static void writeTask(WriteQueue queue) {
+ while (queue.err == null) {
+ try {
+ if (!queue.tryExecute()) {
+ if (queue.doneAdding) {
+ break;
+ }
+ }
+ }
+ catch (IOException e) {
+ queue.err = e;
+ break;
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private static final Executor GC_RUNNER = new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
+
+ private static void writeWait(WriteQueue queue, ExecutorService executor) throws InterruptedException {
+ queue.doneAdding = true;
+ executor.shutdown();
+ //Encourage the JVM to release memory used while decompressing
+ GC_RUNNER.execute(System::gc);
+ }
+
+ private static void readArchiveToQueue(InputStream zipFile, File destination, char[] password, WriteQueue queue) throws IOException, InterruptedException {
+ try (var zipInputStream = new ZipInputStream(zipFile)) {
+ if (password != null) {
+ zipInputStream.setPassword(password);
+ }
+ LocalFileHeader fileHeader;
+ while ((fileHeader = zipInputStream.getNextEntry()) != null) {
+ var outFile = new File(destination, fileHeader.getFileName());
+
+ //Checks if the file is a directory
+ if (fileHeader.isDirectory()) {
+ queue.submit(new ZipWriteCmd.MkDir(outFile));
+ continue;
+ }
+
+ if (fileHeader.getUncompressedSize() < WriteQueue.MAX_SIZE / 10) {
+ queue.submit(new ZipWriteCmd.SmallFile(outFile, zipInputStream.readAllBytes()));
+ }
+ else {
+ readBigFile(outFile, zipInputStream, queue);
+ }
+ }
+ }
+ }
+
+ private static void readBigFile(File outFile, ZipInputStream zipInputStream, WriteQueue queue) throws IOException, InterruptedException {
+ var bf = new ZipWriteCmd.BigFile(outFile);
+ queue.submit(bf);
+ try {
+ var buff = new byte[ZipWriteCmd.BigFile.CHUNK_SIZE];
+ int read;
+ while ((read = zipInputStream.read(buff)) != -1) {
+ var chunk = Arrays.copyOf(buff, read);
+ bf.chunks.put(chunk);
+ }
+ }
+ finally {
+ bf.doneAdding = true;
+ }
+ }
+
+ /**
+ * Creates a directory with a lock. The lock is useful here as conflicting multithreaded requests can make each other fail sometimes.
+ */
+ private static void lockedMkdirs(Lock dirLock, File file) throws IOException {
+ if (file.isDirectory()) {
+ return;
+ }
+ dirLock.lock();
+ try {
+ exponentialBackoff(() -> {
+ if (!file.isDirectory() && !file.mkdirs()) {
+ throw new IOException("Failed to create folder: " + file);
+ }
+ });
+ }
+ finally {
+ dirLock.unlock();
+ }
+ }
+
+ private interface IoOp {
+ void run() throws IOException;
+ }
+
+ private interface IoOpVal {
+ T run() throws IOException;
+ }
+
+ private static void exponentialBackoff(IoOp ioOp) throws IOException {
+ exponentialBackoff(() -> {
+ ioOp.run();
+ return null;
+ });
+ }
+
+ private static T exponentialBackoff(IoOpVal ioOp) throws IOException {
+ IOException err = null;
+ for (int i = 0; i < MAX_IO_RETRIES; i++) {
+ try {
+ return ioOp.run();
+ }
+ catch (IOException e) {
+ if (err == null) {
+ err = e;
+ }
+ exponentialBackoffSleep(i);
+ }
+ }
+ throw err;
+ }
+
+ private static void exponentialBackoffSleep(int i) {
+ var i1 = i + 1;
+ try {
+ Thread.sleep(2L * i1 * i1);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/src/main/java/com/sheepit/client/Utils.java b/src/main/java/com/sheepit/client/Utils.java
index 81d262f..66450ce 100644
--- a/src/main/java/com/sheepit/client/Utils.java
+++ b/src/main/java/com/sheepit/client/Utils.java
@@ -21,13 +21,10 @@ package com.sheepit.client;
import net.lingala.zip4j.ZipFile;
import net.lingala.zip4j.exception.ZipException;
-import net.lingala.zip4j.io.inputstream.ZipInputStream;
-import net.lingala.zip4j.model.LocalFileHeader;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
@@ -104,54 +101,18 @@ public class Utils {
* @return A status code as an integer, -1 if it encounters a IOException, 0 otherwise
*/
public static int unzipChunksIntoDirectory(List full_path_chunks, String destinationDirectory, char[] password, Log log) {
+ // STEP 1: Create the paths for ChunkInputStream, which will read the chunks' contents in order
+ var chunks = full_path_chunks.stream().map(File::new).collect(Collectors.toList());
+
+ // STEP 2: unzip the zip like before
try {
- // STEP 1: Create a ChunkInputStream, which will read the chunks' contents in order
- ChunkInputStream chunkInputStream = new ChunkInputStream(full_path_chunks.stream().map(Paths::get).collect(Collectors.toList()));
-
- // STEP 2: unzip the zip like before
- ZipInputStream zipInputStream = new ZipInputStream(chunkInputStream);
- if (password != null) {
- zipInputStream.setPassword(password);
- }
- LocalFileHeader fileHeader;
- while ((fileHeader = zipInputStream.getNextEntry()) != null) {
- String outFilePath = destinationDirectory + File.separator + fileHeader.getFileName();
- File outFile = new File(outFilePath);
-
- //Checks if the file is a directory
- if (fileHeader.isDirectory()) {
- outFile.mkdirs();
- continue;
- }
-
- File parentDir = outFile.getParentFile();
- if (parentDir.exists() == false) {
- parentDir.mkdirs();
- }
-
- FileOutputStream os = new FileOutputStream(outFile);
-
- int readLen;
- byte[] buff = new byte[1024];
-
- //Loop until End of File and write the contents to the output stream
- while ((readLen = zipInputStream.read(buff)) != -1) {
- os.write(buff, 0, readLen);
- }
-
- os.close();
- }
- zipInputStream.close();
+ UnzipUtils.parallelUnzip(new BufferedInputStream(new ChunkInputStream(chunks)), new File(destinationDirectory), password);
+ return 0;
}
- catch (IOException e) {
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- e.printStackTrace(pw);
- log.debug("Utils::unzipChunksIntoDirectory exception " + e + " stacktrace: " + sw.toString());
+ catch (IOException | InterruptedException e) {
+ log.debug("Utils::unzipChunksIntoDirectory exception " + e);
return -1;
}
-
- return 0;
}
/**