moved catch incrementing into hash builder

This commit is contained in:
Szum123321 2023-03-09 21:46:25 +01:00
parent 9ffaff1a2d
commit 27d6d68e97
7 changed files with 55 additions and 41 deletions

View File

@ -29,19 +29,18 @@ import java.io.InputStream;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CountDownLatch;
public record FileInputStreamSupplier(Path path, String name, FileTreeHashBuilder hashTreeBuilder, BrokenFileHandler brokenFileHandler, CountDownLatch latch) implements InputSupplier { public record FileInputStreamSupplier(Path path, String name, FileTreeHashBuilder hashTreeBuilder, BrokenFileHandler brokenFileHandler) implements InputSupplier {
private final static TextileLogger log = new TextileLogger(TextileBackup.MOD_NAME); private final static TextileLogger log = new TextileLogger(TextileBackup.MOD_NAME);
@Override @Override
public InputStream getInputStream() throws IOException { public InputStream getInputStream() throws IOException {
try { try {
return new HashingInputStream(Files.newInputStream(path), path, hashTreeBuilder, brokenFileHandler, latch); return new HashingInputStream(Files.newInputStream(path), path, hashTreeBuilder, brokenFileHandler);
} catch (IOException e) { } catch (IOException e) {
//Probably good idea to just put it here. In the case an exception is thrown here, it could be possble //Probably good idea to just put it here. In the case an exception is thrown here, it could be possble
//The latch would have never been lifted //The latch would have never been lifted
latch.countDown(); hashTreeBuilder.update(path, 0);
brokenFileHandler.handle(path, e); brokenFileHandler.handle(path, e);
throw e; throw e;
} }

View File

@ -30,10 +30,8 @@ import net.szum123321.textile_backup.core.create.compressors.ZipCompressor;
import net.szum123321.textile_backup.core.create.compressors.tar.AbstractTarArchiver; import net.szum123321.textile_backup.core.create.compressors.tar.AbstractTarArchiver;
import net.szum123321.textile_backup.core.create.compressors.tar.ParallelBZip2Compressor; import net.szum123321.textile_backup.core.create.compressors.tar.ParallelBZip2Compressor;
import net.szum123321.textile_backup.core.create.compressors.tar.ParallelGzipCompressor; import net.szum123321.textile_backup.core.create.compressors.tar.ParallelGzipCompressor;
import org.apache.commons.compress.compressors.lzma.LZMACompressorOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -94,11 +92,11 @@ public class MakeBackupRunnable implements Callable<Void> {
} }
case BZIP2 -> ParallelBZip2Compressor.getInstance().createArchive(world, outFile, context, coreCount); case BZIP2 -> ParallelBZip2Compressor.getInstance().createArchive(world, outFile, context, coreCount);
case GZIP -> ParallelGzipCompressor.getInstance().createArchive(world, outFile, context, coreCount); case GZIP -> ParallelGzipCompressor.getInstance().createArchive(world, outFile, context, coreCount);
case LZMA -> new AbstractTarArchiver() { /* case LZMA -> new AbstractTarArchiver() {
protected OutputStream getCompressorOutputStream(OutputStream stream, BackupContext ctx, int coreLimit) throws IOException { protected OutputStream getCompressorOutputStream(OutputStream stream, BackupContext ctx, int coreLimit) throws IOException {
return new LZMACompressorOutputStream(stream); return new LZMACompressorOutputStream(stream);
} }
}.createArchive(world, outFile, context, coreCount); }.createArchive(world, outFile, context, coreCount);*/
case TAR -> new AbstractTarArchiver().createArchive(world, outFile, context, coreCount); case TAR -> new AbstractTarArchiver().createArchive(world, outFile, context, coreCount);
} }

View File

@ -35,7 +35,6 @@ import java.nio.file.Path;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -48,7 +47,6 @@ public abstract class AbstractCompressor {
public void createArchive(Path inputFile, Path outputFile, BackupContext ctx, int coreLimit) throws IOException, ExecutionException, InterruptedException { public void createArchive(Path inputFile, Path outputFile, BackupContext ctx, int coreLimit) throws IOException, ExecutionException, InterruptedException {
Instant start = Instant.now(); Instant start = Instant.now();
FileTreeHashBuilder fileHashBuilder = new FileTreeHashBuilder();
BrokenFileHandler brokenFileHandler = new BrokenFileHandler(); //Basically a hashmap storing files and their respective exceptions BrokenFileHandler brokenFileHandler = new BrokenFileHandler(); //Basically a hashmap storing files and their respective exceptions
try (OutputStream outStream = Files.newOutputStream(outputFile); try (OutputStream outStream = Files.newOutputStream(outputFile);
@ -61,8 +59,7 @@ public abstract class AbstractCompressor {
.filter(Files::isRegularFile) .filter(Files::isRegularFile)
.toList(); .toList();
//will be used in conjunction with ParallelZip to avoid race condition FileTreeHashBuilder fileHashBuilder = new FileTreeHashBuilder(fileList.size());
CountDownLatch latch = new CountDownLatch(fileList.size());
for (Path file : fileList) { for (Path file : fileList) {
try { try {
@ -71,12 +68,12 @@ public abstract class AbstractCompressor {
file, file,
inputFile.relativize(file).toString(), inputFile.relativize(file).toString(),
fileHashBuilder, fileHashBuilder,
brokenFileHandler, brokenFileHandler),
latch),
arc arc
); );
} catch (IOException e) { } catch (IOException e) {
brokenFileHandler.handle(file, e); brokenFileHandler.handle(file, e);
fileHashBuilder.update(file, 0);
//In Permissive mode we allow partial backups //In Permissive mode we allow partial backups
if (ConfigHelper.INSTANCE.get().integrityVerificationMode.isStrict()) throw e; if (ConfigHelper.INSTANCE.get().integrityVerificationMode.isStrict()) throw e;
else log.sendErrorAL(ctx, "An exception occurred while trying to compress: {}", else log.sendErrorAL(ctx, "An exception occurred while trying to compress: {}",
@ -85,13 +82,15 @@ public abstract class AbstractCompressor {
} }
} }
arc.flush();
//wait for all the InputStreams to close/fail with InputSupplier //wait for all the InputStreams to close/fail with InputSupplier
latch.await();
Instant now = Instant.now(); Instant now = Instant.now();
long treeHash = fileHashBuilder.getValue(true);
CompressionStatus status = new CompressionStatus ( CompressionStatus status = new CompressionStatus (
fileHashBuilder.getValue(), treeHash,
brokenFileHandler.get(), brokenFileHandler.get(),
ctx.startDate(), start.toEpochMilli(), now.toEpochMilli(), ctx.startDate(), start.toEpochMilli(), now.toEpochMilli(),
Globals.INSTANCE.getCombinedVersionString() Globals.INSTANCE.getCombinedVersionString()

View File

@ -26,6 +26,7 @@ import net.szum123321.textile_backup.core.CompressionStatus;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
/** /**
@ -35,35 +36,44 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class FileTreeHashBuilder { public class FileTreeHashBuilder {
private final static TextileLogger log = new TextileLogger(TextileBackup.MOD_NAME); private final static TextileLogger log = new TextileLogger(TextileBackup.MOD_NAME);
private final Object lock = new Object(); private final Object lock = new Object();
private long hash = 0, filesProcessed = 0, filesTotalSize = 0; private long hash = 0, filesToProcess, filesTotalSize = 0;
private final AtomicBoolean closed = new AtomicBoolean(false); private final AtomicBoolean closed = new AtomicBoolean(false);
public void update(Path path, long newHash) throws IOException { private final CountDownLatch latch;
if(closed.get()) throw new RuntimeException("Hash Builder already closed!");
public FileTreeHashBuilder(int filesToProcess) {
this.filesToProcess = filesToProcess;
latch = new CountDownLatch(filesToProcess);
}
public void update(Path path, long newHash) throws IOException {
if(path.getFileName().toString().equals(CompressionStatus.DATA_FILENAME)) return; if(path.getFileName().toString().equals(CompressionStatus.DATA_FILENAME)) return;
latch.countDown();
long size = Files.size(path); long size = Files.size(path);
synchronized (lock) { synchronized (lock) {
//This way, the exact order of files processed doesn't matter.
this.hash ^= newHash; this.hash ^= newHash;
filesProcessed++; filesToProcess--;
filesTotalSize += size; filesTotalSize += size;
} }
} }
public long getValue() { public int getRemaining() { return (int) latch.getCount(); }
synchronized public long getValue(boolean lock) throws InterruptedException {
long leftover = latch.getCount();
if(lock) latch.await();
else if(leftover != 0) log.warn("Finishing with {} files unprocessed!", leftover);
var hasher = Globals.CHECKSUM_SUPPLIER.get(); var hasher = Globals.CHECKSUM_SUPPLIER.get();
closed.set(true);
synchronized (lock) { log.debug("Closing: files: {}, bytes {}, raw hash {}", filesToProcess, filesTotalSize, hash);
log.debug("Closing: files: {}, bytes {}, raw hash {}", filesProcessed, filesTotalSize, hash); hasher.update(hash);
hasher.update(hash); hasher.update(filesToProcess);
hasher.update(filesProcessed); hasher.update(filesTotalSize);
hasher.update(filesTotalSize);
return hasher.getValue(); return hasher.getValue();
}
} }
} }

View File

@ -19,6 +19,8 @@
package net.szum123321.textile_backup.core.digest; package net.szum123321.textile_backup.core.digest;
import net.szum123321.textile_backup.Globals; import net.szum123321.textile_backup.Globals;
import net.szum123321.textile_backup.TextileBackup;
import net.szum123321.textile_backup.TextileLogger;
import net.szum123321.textile_backup.core.DataLeftException; import net.szum123321.textile_backup.core.DataLeftException;
import net.szum123321.textile_backup.core.create.BrokenFileHandler; import net.szum123321.textile_backup.core.create.BrokenFileHandler;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
@ -26,7 +28,6 @@ import org.jetbrains.annotations.NotNull;
import java.io.*; import java.io.*;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.concurrent.CountDownLatch;
/** /**
* This class calculates a hash of the file on the input stream, submits it to FileTreeHashBuilder. * This class calculates a hash of the file on the input stream, submits it to FileTreeHashBuilder.
@ -37,16 +38,16 @@ import java.util.concurrent.CountDownLatch;
* That is what CountDownLatch does * That is what CountDownLatch does
*/ */
public class HashingInputStream extends FilterInputStream { public class HashingInputStream extends FilterInputStream {
private final static TextileLogger log = new TextileLogger(TextileBackup.MOD_NAME);
private final Path path; private final Path path;
private final Hash hasher = Globals.CHECKSUM_SUPPLIER.get(); private final Hash hasher = Globals.CHECKSUM_SUPPLIER.get();
private final FileTreeHashBuilder hashBuilder; private final FileTreeHashBuilder hashBuilder;
private final BrokenFileHandler brokenFileHandler; private final BrokenFileHandler brokenFileHandler;
private final CountDownLatch latch;
public HashingInputStream(InputStream in, Path path, FileTreeHashBuilder hashBuilder, BrokenFileHandler brokenFileHandler, CountDownLatch latch) { public HashingInputStream(InputStream in, Path path, FileTreeHashBuilder hashBuilder, BrokenFileHandler brokenFileHandler) {
super(in); super(in);
this.path = path; this.path = path;
this.latch = latch;
this.hashBuilder = hashBuilder; this.hashBuilder = hashBuilder;
this.brokenFileHandler = brokenFileHandler; this.brokenFileHandler = brokenFileHandler;
} }
@ -74,10 +75,9 @@ public class HashingInputStream extends FilterInputStream {
public void close() throws IOException { public void close() throws IOException {
hasher.update(path.getFileName().toString().getBytes(StandardCharsets.UTF_8)); hasher.update(path.getFileName().toString().getBytes(StandardCharsets.UTF_8));
latch.countDown(); hashBuilder.update(path, hasher.getValue());
if(in.available() == 0) hashBuilder.update(path, hasher.getValue()); if(in.available() != 0) brokenFileHandler.handle(path, new DataLeftException(in.available()));
else brokenFileHandler.handle(path, new DataLeftException(in.available()));
super.close(); super.close();
} }

View File

@ -40,7 +40,7 @@ public class GenericTarDecompressor {
public static long decompress(Path input, Path target) throws IOException { public static long decompress(Path input, Path target) throws IOException {
Instant start = Instant.now(); Instant start = Instant.now();
FileTreeHashBuilder treeBuilder = new FileTreeHashBuilder(); FileTreeHashBuilder treeBuilder = new FileTreeHashBuilder(0);
try (InputStream fileInputStream = Files.newInputStream(input); try (InputStream fileInputStream = Files.newInputStream(input);
InputStream bufferedInputStream = new BufferedInputStream(fileInputStream); InputStream bufferedInputStream = new BufferedInputStream(fileInputStream);
@ -70,7 +70,11 @@ public class GenericTarDecompressor {
log.info("Decompression took {} seconds.", Utilities.formatDuration(Duration.between(start, Instant.now()))); log.info("Decompression took {} seconds.", Utilities.formatDuration(Duration.between(start, Instant.now())));
return treeBuilder.getValue(); try {
return treeBuilder.getValue(false);
} catch (InterruptedException ignored) {
return 0;
}
} }
private static InputStream getCompressorInputStream(InputStream inputStream) throws CompressorException { private static InputStream getCompressorInputStream(InputStream inputStream) throws CompressorException {

View File

@ -40,7 +40,7 @@ public class ZipDecompressor {
public static long decompress(Path inputFile, Path target) throws IOException { public static long decompress(Path inputFile, Path target) throws IOException {
Instant start = Instant.now(); Instant start = Instant.now();
FileTreeHashBuilder hashBuilder = new FileTreeHashBuilder(); FileTreeHashBuilder hashBuilder = new FileTreeHashBuilder(0);
try(ZipFile zipFile = new ZipFile(inputFile.toFile())) { try(ZipFile zipFile = new ZipFile(inputFile.toFile())) {
for (Iterator<ZipArchiveEntry> it = zipFile.getEntries().asIterator(); it.hasNext(); ) { for (Iterator<ZipArchiveEntry> it = zipFile.getEntries().asIterator(); it.hasNext(); ) {
@ -63,6 +63,10 @@ public class ZipDecompressor {
log.info("Decompression took: {} seconds.", Utilities.formatDuration(Duration.between(start, Instant.now()))); log.info("Decompression took: {} seconds.", Utilities.formatDuration(Duration.between(start, Instant.now())));
return hashBuilder.getValue(); try {
return hashBuilder.getValue(false);
} catch (InterruptedException ignored) {
return 0;
}
} }
} }