Ref: parallele unzip, to improve performance
This commit is contained in:
@@ -40,7 +40,7 @@
|
|||||||
<rule ref="category/java/bestpractices.xml/UnusedPrivateMethod" />
|
<rule ref="category/java/bestpractices.xml/UnusedPrivateMethod" />
|
||||||
<rule ref="category/java/bestpractices.xml/UseCollectionIsEmpty" />
|
<rule ref="category/java/bestpractices.xml/UseCollectionIsEmpty" />
|
||||||
<rule ref="category/java/bestpractices.xml/UseStandardCharsets" />
|
<rule ref="category/java/bestpractices.xml/UseStandardCharsets" />
|
||||||
<rule ref="category/java/bestpractices.xml/UseVarargs" />
|
<!-- <rule ref="category/java/bestpractices.xml/UseVarargs" />-->
|
||||||
<rule ref="category/java/bestpractices.xml/WhileLoopWithLiteralBoolean" />
|
<rule ref="category/java/bestpractices.xml/WhileLoopWithLiteralBoolean" />
|
||||||
<rule ref="category/java/codestyle.xml/ExtendsObject" />
|
<rule ref="category/java/codestyle.xml/ExtendsObject" />
|
||||||
<rule ref="category/java/codestyle.xml/ForLoopShouldBeWhileLoop" />
|
<rule ref="category/java/codestyle.xml/ForLoopShouldBeWhileLoop" />
|
||||||
|
|||||||
@@ -2,74 +2,61 @@ package com.sheepit.client;
|
|||||||
|
|
||||||
import lombok.NonNull;
|
import lombok.NonNull;
|
||||||
|
|
||||||
import java.io.BufferedInputStream;
|
import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
import java.io.FileNotFoundException;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.nio.file.Path;
|
|
||||||
import java.util.ArrayDeque;
|
import java.util.ArrayDeque;
|
||||||
import java.util.Deque;
|
import java.util.Deque;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.NoSuchElementException;
|
|
||||||
|
|
||||||
public class ChunkInputStream extends InputStream {
|
public class ChunkInputStream extends InputStream {
|
||||||
|
|
||||||
@NonNull private final Deque<Path> chunkPaths;
|
@NonNull private final Deque<File> chunkPaths;
|
||||||
@NonNull private BufferedInputStream currentStream;
|
@NonNull private InputStream currentStream;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Given a list of chunk paths, provides an InputStream that reads the contents of these files in the order they were provided in.
|
* Given a list of chunk paths, provides an InputStream that reads the contents of these files in the order they were provided in.
|
||||||
* @param chunkPaths Non-empty, ordered list of paths
|
* @param chunkPaths Non-empty, ordered list of paths
|
||||||
* @throws IOException If the first chunk could not be found. Errors with other chunk will be thrown during calls to read()
|
* @throws IOException If the first chunk could not be found. Errors with other chunk will be thrown during calls to read()
|
||||||
*/
|
*/
|
||||||
public ChunkInputStream(List<Path> chunkPaths) throws IOException {
|
public ChunkInputStream(List<File> chunkPaths) throws IOException {
|
||||||
this.chunkPaths = new ArrayDeque<>(chunkPaths);
|
this.chunkPaths = new ArrayDeque<>(chunkPaths);
|
||||||
|
|
||||||
/// Setup the first chunk for reading
|
// Setup the first chunk for reading
|
||||||
prepareNextChunk();
|
currentStream = new FileInputStream(this.chunkPaths.removeFirst());
|
||||||
}
|
|
||||||
|
|
||||||
private void prepareNextChunk() throws FileNotFoundException, NoSuchElementException {
|
|
||||||
currentStream = new BufferedInputStream(new FileInputStream(chunkPaths.removeFirst().toFile()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public int read() throws IOException {
|
@Override public int read() throws IOException {
|
||||||
|
while (true) {
|
||||||
int result = currentStream.read();
|
int result = currentStream.read();
|
||||||
|
// Finished reading from this chunk, continue with the next if possible
|
||||||
if (result == -1) {
|
if (result > 0 || tryOpenNextChunk() == false) {
|
||||||
/// Finished reading from this chunk, continue with the next if possible
|
|
||||||
try {
|
|
||||||
prepareNextChunk();
|
|
||||||
}
|
|
||||||
catch (NoSuchElementException e) {
|
|
||||||
/// This was the last chunk
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
result = currentStream.read();
|
|
||||||
}
|
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Improve throughput by offering method to read whole block of bytes at once
|
|
||||||
@Override public int read(byte[] b, int off, int len) throws IOException {
|
@Override public int read(byte[] b, int off, int len) throws IOException {
|
||||||
int bytesRead = -1;
|
while (true) {
|
||||||
|
var read = currentStream.read(b, off, len);
|
||||||
while (bytesRead == -1) {
|
// Finished reading from this chunk, continue with the next if possible
|
||||||
bytesRead = currentStream.read(b, off, len);
|
if (read > 0 || tryOpenNextChunk() == false) {
|
||||||
|
return read;
|
||||||
if (bytesRead == -1) {
|
|
||||||
try {
|
|
||||||
prepareNextChunk();
|
|
||||||
}
|
|
||||||
catch (NoSuchElementException e) {
|
|
||||||
/// This was the last chunk
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return bytesRead;
|
@Override public void close() throws IOException {
|
||||||
|
currentStream.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean tryOpenNextChunk() throws IOException {
|
||||||
|
if (chunkPaths.isEmpty()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
currentStream.close();
|
||||||
|
currentStream = new FileInputStream(chunkPaths.removeFirst());
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
353
src/main/java/com/sheepit/client/UnzipUtils.java
Normal file
353
src/main/java/com/sheepit/client/UnzipUtils.java
Normal file
@@ -0,0 +1,353 @@
|
|||||||
|
package com.sheepit.client;
|
||||||
|
|
||||||
|
import net.lingala.zip4j.io.inputstream.ZipInputStream;
|
||||||
|
import net.lingala.zip4j.model.LocalFileHeader;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.RandomAccessFile;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.BlockingDeque;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.LinkedBlockingDeque;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
public class UnzipUtils {
|
||||||
|
|
||||||
|
private static final int MAX_IO_RETRIES = 32;
|
||||||
|
private static final int MAX_IO_WRITTER_THREADS = 6;
|
||||||
|
|
||||||
|
private interface ZipWriteCmd {
|
||||||
|
int memSize();
|
||||||
|
|
||||||
|
void execute(Lock dirLock) throws IOException;
|
||||||
|
|
||||||
|
final class MkDir implements ZipWriteCmd {
|
||||||
|
|
||||||
|
private final File folder;
|
||||||
|
|
||||||
|
public MkDir(File folder) {
|
||||||
|
this.folder = Objects.requireNonNull(folder);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public int memSize() {
|
||||||
|
return folder.getPath().length();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void execute(Lock dirLock) throws IOException {
|
||||||
|
lockedMkdirs(dirLock, folder);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final class SmallFile implements ZipWriteCmd {
|
||||||
|
|
||||||
|
private final File file;
|
||||||
|
private final byte[] data;
|
||||||
|
|
||||||
|
public SmallFile(File file, byte[] data) {
|
||||||
|
this.file = file;
|
||||||
|
this.data = data.clone();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public int memSize() {
|
||||||
|
return data.length + file.getPath().length();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void execute(Lock dirLock) throws IOException {
|
||||||
|
lockedMkdirs(dirLock, file.getParentFile());
|
||||||
|
exponentialBackoff(() -> {
|
||||||
|
try (var os = new FileOutputStream(file)) {
|
||||||
|
os.write(data);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final class BigFile implements ZipWriteCmd {
|
||||||
|
|
||||||
|
private static final int CHUNK_SIZE = 1024 * 1024;
|
||||||
|
|
||||||
|
public final BlockingDeque<byte[]> chunks = new LinkedBlockingDeque<>(8);
|
||||||
|
public boolean doneAdding;
|
||||||
|
|
||||||
|
private final File file;
|
||||||
|
|
||||||
|
public BigFile(File file) {
|
||||||
|
this.file = file;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public int memSize() {
|
||||||
|
int[] sum = { doneAdding ? 0 : CHUNK_SIZE };
|
||||||
|
chunks.forEach(bb -> sum[0] += bb.length);
|
||||||
|
return sum[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void execute(Lock dirLock) throws IOException {
|
||||||
|
|
||||||
|
lockedMkdirs(dirLock, file.getParentFile());
|
||||||
|
|
||||||
|
try (var raFile = exponentialBackoff(() -> new RandomAccessFile(file, "rw"))) {
|
||||||
|
while (true) {
|
||||||
|
if (doneAdding && chunks.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
byte[] chunk = chunks.poll(10, TimeUnit.MILLISECONDS);
|
||||||
|
if (chunk == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
var pos = raFile.getFilePointer();
|
||||||
|
|
||||||
|
exponentialBackoff(() -> {
|
||||||
|
raFile.seek(pos);
|
||||||
|
raFile.write(chunk);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class WriteQueue {
|
||||||
|
|
||||||
|
private static final int MAX_SIZE = 64 * 1024 * 1024;
|
||||||
|
|
||||||
|
private final BlockingDeque<ZipWriteCmd> cmds = new LinkedBlockingDeque<>();
|
||||||
|
private final Set<ZipWriteCmd> workingCommands = Collections.synchronizedSet(new HashSet<>());
|
||||||
|
private final Object hasSpaceLock = new Object();
|
||||||
|
|
||||||
|
public IOException err;
|
||||||
|
|
||||||
|
private final Lock dirLock = new ReentrantLock();
|
||||||
|
|
||||||
|
private boolean doneAdding;
|
||||||
|
|
||||||
|
private boolean tryExecute() throws IOException, InterruptedException {
|
||||||
|
ZipWriteCmd cmd = cmds.poll(10, TimeUnit.MILLISECONDS);
|
||||||
|
if (cmd == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
workingCommands.add(cmd);
|
||||||
|
cmd.execute(dirLock);
|
||||||
|
workingCommands.remove(cmd);
|
||||||
|
|
||||||
|
synchronized (hasSpaceLock) {
|
||||||
|
hasSpaceLock.notify();
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void submit(ZipWriteCmd cmd) throws InterruptedException {
|
||||||
|
while (!hasSpaceFor(cmd)) {
|
||||||
|
synchronized (hasSpaceLock) {
|
||||||
|
hasSpaceLock.wait(100);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cmds.add(cmd);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean hasSpaceFor(ZipWriteCmd command) {
|
||||||
|
int[] sum = { command.memSize() };
|
||||||
|
Consumer<ZipWriteCmd> acum = cmd -> sum[0] += cmd.memSize();
|
||||||
|
cmds.forEach(acum);
|
||||||
|
workingCommands.forEach(acum);
|
||||||
|
return sum[0] <= MAX_SIZE;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method will extract all contents from a zip file contained within the {@link InputStream}. The current thread reads the data, decompresses it
|
||||||
|
* and puts it in a buffer. A set of workers will drain the buffer and write it to the destination. Multiple works are useful for mitigating the slow
|
||||||
|
* creation of many small files or writing over a NAS.
|
||||||
|
*
|
||||||
|
* @param zipFile the contents of the archive
|
||||||
|
* @param destination Path to the target directory where files will be extracted to
|
||||||
|
* @param password (optional) the password to decrypt the zip with
|
||||||
|
* @return Any error that occurred
|
||||||
|
*/
|
||||||
|
public static Throwable parallelUnzip(InputStream zipFile, File destination, char[] password) throws IOException, InterruptedException {
|
||||||
|
var workerCount = Math.min(MAX_IO_WRITTER_THREADS, Runtime.getRuntime().availableProcessors() + 1);
|
||||||
|
return parallelUnzip(zipFile, destination, password, workerCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method will extract all contents from a zip file contained within the {@link InputStream}. The current thread reads the data, decompresses it
|
||||||
|
* and puts it in a buffer. A set of workers will drain the buffer and write it to the destination. Multiple works are useful for mitigating the slow
|
||||||
|
* creation of many small files or writing over a NAS.
|
||||||
|
*
|
||||||
|
* @param zipFile the contents of the archive
|
||||||
|
* @param destination Path to the target directory where files will be extracted to
|
||||||
|
* @param password (optional) the password to decrypt the zip with
|
||||||
|
* @param workerCount the number of workers for writing the output
|
||||||
|
* @return Any error that occurred
|
||||||
|
*/
|
||||||
|
public static Throwable parallelUnzip(InputStream zipFile, File destination, char[] password, int workerCount) throws IOException, InterruptedException {
|
||||||
|
if (workerCount < 1) {
|
||||||
|
throw new IllegalArgumentException("Workers must be greater or equal to 1");
|
||||||
|
}
|
||||||
|
|
||||||
|
var queue = new WriteQueue();
|
||||||
|
ExecutorService executor = Executors.newFixedThreadPool(workerCount);
|
||||||
|
for (int i = 0; i < workerCount; i++) {
|
||||||
|
executor.submit(() -> {writeTask(queue); return null;});
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
readArchiveToQueue(zipFile, destination, password, queue);
|
||||||
|
writeWait(queue, executor);
|
||||||
|
return queue.err;
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
writeWait(queue, executor);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void writeTask(WriteQueue queue) {
|
||||||
|
while (queue.err == null) {
|
||||||
|
try {
|
||||||
|
if (!queue.tryExecute()) {
|
||||||
|
if (queue.doneAdding) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
queue.err = e;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final Executor GC_RUNNER = new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
|
||||||
|
|
||||||
|
private static void writeWait(WriteQueue queue, ExecutorService executor) throws InterruptedException {
|
||||||
|
queue.doneAdding = true;
|
||||||
|
executor.shutdown();
|
||||||
|
//Encourage the JVM to release memory used while decompressing
|
||||||
|
GC_RUNNER.execute(System::gc);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void readArchiveToQueue(InputStream zipFile, File destination, char[] password, WriteQueue queue) throws IOException, InterruptedException {
|
||||||
|
try (var zipInputStream = new ZipInputStream(zipFile)) {
|
||||||
|
if (password != null) {
|
||||||
|
zipInputStream.setPassword(password);
|
||||||
|
}
|
||||||
|
LocalFileHeader fileHeader;
|
||||||
|
while ((fileHeader = zipInputStream.getNextEntry()) != null) {
|
||||||
|
var outFile = new File(destination, fileHeader.getFileName());
|
||||||
|
|
||||||
|
//Checks if the file is a directory
|
||||||
|
if (fileHeader.isDirectory()) {
|
||||||
|
queue.submit(new ZipWriteCmd.MkDir(outFile));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fileHeader.getUncompressedSize() < WriteQueue.MAX_SIZE / 10) {
|
||||||
|
queue.submit(new ZipWriteCmd.SmallFile(outFile, zipInputStream.readAllBytes()));
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
readBigFile(outFile, zipInputStream, queue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void readBigFile(File outFile, ZipInputStream zipInputStream, WriteQueue queue) throws IOException, InterruptedException {
|
||||||
|
var bf = new ZipWriteCmd.BigFile(outFile);
|
||||||
|
queue.submit(bf);
|
||||||
|
try {
|
||||||
|
var buff = new byte[ZipWriteCmd.BigFile.CHUNK_SIZE];
|
||||||
|
int read;
|
||||||
|
while ((read = zipInputStream.read(buff)) != -1) {
|
||||||
|
var chunk = Arrays.copyOf(buff, read);
|
||||||
|
bf.chunks.put(chunk);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
bf.doneAdding = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a directory with a lock. The lock is useful here as conflicting multithreaded requests can make each other fail sometimes.
|
||||||
|
*/
|
||||||
|
private static void lockedMkdirs(Lock dirLock, File file) throws IOException {
|
||||||
|
if (file.isDirectory()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
dirLock.lock();
|
||||||
|
try {
|
||||||
|
exponentialBackoff(() -> {
|
||||||
|
if (!file.isDirectory() && !file.mkdirs()) {
|
||||||
|
throw new IOException("Failed to create folder: " + file);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
dirLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private interface IoOp {
|
||||||
|
void run() throws IOException;
|
||||||
|
}
|
||||||
|
|
||||||
|
private interface IoOpVal<T> {
|
||||||
|
T run() throws IOException;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void exponentialBackoff(IoOp ioOp) throws IOException {
|
||||||
|
exponentialBackoff(() -> {
|
||||||
|
ioOp.run();
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private static <T> T exponentialBackoff(IoOpVal<T> ioOp) throws IOException {
|
||||||
|
IOException err = null;
|
||||||
|
for (int i = 0; i < MAX_IO_RETRIES; i++) {
|
||||||
|
try {
|
||||||
|
return ioOp.run();
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
if (err == null) {
|
||||||
|
err = e;
|
||||||
|
}
|
||||||
|
exponentialBackoffSleep(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void exponentialBackoffSleep(int i) {
|
||||||
|
var i1 = i + 1;
|
||||||
|
try {
|
||||||
|
Thread.sleep(2L * i1 * i1);
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -21,13 +21,10 @@ package com.sheepit.client;
|
|||||||
|
|
||||||
import net.lingala.zip4j.ZipFile;
|
import net.lingala.zip4j.ZipFile;
|
||||||
import net.lingala.zip4j.exception.ZipException;
|
import net.lingala.zip4j.exception.ZipException;
|
||||||
import net.lingala.zip4j.io.inputstream.ZipInputStream;
|
|
||||||
import net.lingala.zip4j.model.LocalFileHeader;
|
|
||||||
|
|
||||||
import java.io.BufferedInputStream;
|
import java.io.BufferedInputStream;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
import java.io.FileOutputStream;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
@@ -104,54 +101,18 @@ public class Utils {
|
|||||||
* @return A status code as an integer, -1 if it encounters a IOException, 0 otherwise
|
* @return A status code as an integer, -1 if it encounters a IOException, 0 otherwise
|
||||||
*/
|
*/
|
||||||
public static int unzipChunksIntoDirectory(List<String> full_path_chunks, String destinationDirectory, char[] password, Log log) {
|
public static int unzipChunksIntoDirectory(List<String> full_path_chunks, String destinationDirectory, char[] password, Log log) {
|
||||||
try {
|
// STEP 1: Create the paths for ChunkInputStream, which will read the chunks' contents in order
|
||||||
// STEP 1: Create a ChunkInputStream, which will read the chunks' contents in order
|
var chunks = full_path_chunks.stream().map(File::new).collect(Collectors.toList());
|
||||||
ChunkInputStream chunkInputStream = new ChunkInputStream(full_path_chunks.stream().map(Paths::get).collect(Collectors.toList()));
|
|
||||||
|
|
||||||
// STEP 2: unzip the zip like before
|
// STEP 2: unzip the zip like before
|
||||||
ZipInputStream zipInputStream = new ZipInputStream(chunkInputStream);
|
try {
|
||||||
if (password != null) {
|
UnzipUtils.parallelUnzip(new BufferedInputStream(new ChunkInputStream(chunks)), new File(destinationDirectory), password);
|
||||||
zipInputStream.setPassword(password);
|
return 0;
|
||||||
}
|
}
|
||||||
LocalFileHeader fileHeader;
|
catch (IOException | InterruptedException e) {
|
||||||
while ((fileHeader = zipInputStream.getNextEntry()) != null) {
|
log.debug("Utils::unzipChunksIntoDirectory exception " + e);
|
||||||
String outFilePath = destinationDirectory + File.separator + fileHeader.getFileName();
|
|
||||||
File outFile = new File(outFilePath);
|
|
||||||
|
|
||||||
//Checks if the file is a directory
|
|
||||||
if (fileHeader.isDirectory()) {
|
|
||||||
outFile.mkdirs();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
File parentDir = outFile.getParentFile();
|
|
||||||
if (parentDir.exists() == false) {
|
|
||||||
parentDir.mkdirs();
|
|
||||||
}
|
|
||||||
|
|
||||||
FileOutputStream os = new FileOutputStream(outFile);
|
|
||||||
|
|
||||||
int readLen;
|
|
||||||
byte[] buff = new byte[1024];
|
|
||||||
|
|
||||||
//Loop until End of File and write the contents to the output stream
|
|
||||||
while ((readLen = zipInputStream.read(buff)) != -1) {
|
|
||||||
os.write(buff, 0, readLen);
|
|
||||||
}
|
|
||||||
|
|
||||||
os.close();
|
|
||||||
}
|
|
||||||
zipInputStream.close();
|
|
||||||
}
|
|
||||||
catch (IOException e) {
|
|
||||||
StringWriter sw = new StringWriter();
|
|
||||||
PrintWriter pw = new PrintWriter(sw);
|
|
||||||
e.printStackTrace(pw);
|
|
||||||
log.debug("Utils::unzipChunksIntoDirectory exception " + e + " stacktrace: " + sw.toString());
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
Reference in New Issue
Block a user