From 2efe1121579e2c5997b4600afbf663a027f83396 Mon Sep 17 00:00:00 2001 From: Szum123321 Date: Tue, 29 Nov 2022 00:55:18 +0100 Subject: [PATCH] Repaired race condition with ParallelZip. Hashing now works! --- .../szum123321/textile_backup/Globals.java | 32 ++----------- .../core/create/BackupContext.java | 10 +++- .../core/create/FileInputStreamSupplier.java | 12 +++-- .../core/create/MakeBackupRunnable.java | 4 +- .../compressors/AbstractCompressor.java | 42 ++++++++-------- .../core/digest/FileTreeHashBuilder.java | 22 ++++++--- .../core/digest/HashingInputStream.java | 48 +++++++------------ .../core/restore/RestoreBackupRunnable.java | 13 +++-- 8 files changed, 80 insertions(+), 103 deletions(-) diff --git a/src/main/java/net/szum123321/textile_backup/Globals.java b/src/main/java/net/szum123321/textile_backup/Globals.java index 6e425b7..37d620a 100644 --- a/src/main/java/net/szum123321/textile_backup/Globals.java +++ b/src/main/java/net/szum123321/textile_backup/Globals.java @@ -19,16 +19,14 @@ package net.szum123321.textile_backup; import net.minecraft.server.MinecraftServer; +import net.szum123321.textile_backup.core.digest.BalticHash; import net.szum123321.textile_backup.core.digest.Hash; import net.szum123321.textile_backup.core.Utilities; import net.szum123321.textile_backup.core.create.MakeBackupRunnable; import net.szum123321.textile_backup.core.restore.AwaitThread; import org.apache.commons.io.FileUtils; -import org.tukaani.xz.check.CRC64; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; import java.nio.file.Files; import java.nio.file.Path; import java.time.format.DateTimeFormatter; @@ -44,33 +42,9 @@ public class Globals { public static final Globals INSTANCE = new Globals(); private static final TextileLogger log = new TextileLogger(TextileBackup.MOD_NAME); public static final DateTimeFormatter defaultDateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH.mm.ss"); - public static final Supplier CHECKSUM_SUPPLIER = () -> new Hash() { - private final CRC64 crc = new CRC64(); - @Override - public void update(byte b) { - crc.update(new byte[]{b}); - } + public static final Supplier CHECKSUM_SUPPLIER = BalticHash::new; - @Override - public void update(long b) { - ByteBuffer v = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN); - v.putLong(b); - crc.update(v.array()); - } - - @Override - public void update(byte[] b, int off, int len) { - crc.update(b, off, len); - } - - @Override - public long getValue() { - ByteBuffer b = ByteBuffer.wrap(crc.finish()); - return b.getLong(); - } - }; - - private ExecutorService executorService = null;// = Executors.newSingleThreadExecutor(); + private ExecutorService executorService = null;//TODO: AAAAAAAAAAAAAAA MEMORY LEAK!!!!!!!!! public final AtomicBoolean globalShutdownBackupFlag = new AtomicBoolean(true); public boolean disableWatchdog = false; private boolean disableTMPFiles = false; diff --git a/src/main/java/net/szum123321/textile_backup/core/create/BackupContext.java b/src/main/java/net/szum123321/textile_backup/core/create/BackupContext.java index 6eb6a57..c3469be 100644 --- a/src/main/java/net/szum123321/textile_backup/core/create/BackupContext.java +++ b/src/main/java/net/szum123321/textile_backup/core/create/BackupContext.java @@ -30,6 +30,7 @@ public record BackupContext(@NotNull MinecraftServer server, ServerCommandSource commandSource, ActionInitiator initiator, boolean save, + boolean cleanup, String comment, LocalDateTime startDate) { @@ -46,6 +47,7 @@ public record BackupContext(@NotNull MinecraftServer server, private ServerCommandSource commandSource; private ActionInitiator initiator; private boolean save; + private boolean cleanup; private String comment; private boolean guessInitiator; @@ -55,6 +57,7 @@ public record BackupContext(@NotNull MinecraftServer server, this.commandSource = null; this.initiator = null; this.save = false; + cleanup = true; //defaults this.comment = null; guessInitiator = false; @@ -94,6 +97,11 @@ public record BackupContext(@NotNull MinecraftServer server, return this; } + public Builder dontCleanup() { + this.cleanup = false; + return this; + } + public BackupContext build() { if (guessInitiator) { initiator = commandSource.getEntity() instanceof PlayerEntity ? ActionInitiator.Player : ActionInitiator.ServerConsole; @@ -106,7 +114,7 @@ public record BackupContext(@NotNull MinecraftServer server, else throw new RuntimeException("Neither MinecraftServer or ServerCommandSource were provided!"); } - return new BackupContext(server, commandSource, initiator, save, comment, LocalDateTime.now()); + return new BackupContext(server, commandSource, initiator, save, cleanup, comment, LocalDateTime.now()); } } } diff --git a/src/main/java/net/szum123321/textile_backup/core/create/FileInputStreamSupplier.java b/src/main/java/net/szum123321/textile_backup/core/create/FileInputStreamSupplier.java index 30c2556..90ca134 100644 --- a/src/main/java/net/szum123321/textile_backup/core/create/FileInputStreamSupplier.java +++ b/src/main/java/net/szum123321/textile_backup/core/create/FileInputStreamSupplier.java @@ -29,15 +29,19 @@ import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Path; import java.util.Optional; +import java.util.concurrent.CountDownLatch; -public record FileInputStreamSupplier(Path path, String name, FileTreeHashBuilder hashTreeBuilder, BrokenFileHandler brokenFileHandler) implements InputSupplier { +public record FileInputStreamSupplier(Path path, String name, FileTreeHashBuilder hashTreeBuilder, BrokenFileHandler brokenFileHandler, CountDownLatch latch) implements InputSupplier { private final static TextileLogger log = new TextileLogger(TextileBackup.MOD_NAME); @Override public InputStream getInputStream() throws IOException { try { - return new HashingInputStream(Files.newInputStream(path), path, hashTreeBuilder, brokenFileHandler); + return new HashingInputStream(Files.newInputStream(path), path, hashTreeBuilder, brokenFileHandler, latch); } catch (IOException e) { + //Probably good idea to just put it here. In the case an exception is thrown here, it would be probable + //The latch would never be lifted + latch.countDown(); brokenFileHandler.handle(path, e); throw e; } @@ -47,9 +51,7 @@ public record FileInputStreamSupplier(Path path, String name, FileTreeHashBuilde public Optional getPath() { return Optional.of(path); } @Override - public long size() throws IOException { - return Files.size(path); - } + public long size() throws IOException { return Files.size(path); } @Override public String getName() { diff --git a/src/main/java/net/szum123321/textile_backup/core/create/MakeBackupRunnable.java b/src/main/java/net/szum123321/textile_backup/core/create/MakeBackupRunnable.java index c9d1528..c07e9fc 100644 --- a/src/main/java/net/szum123321/textile_backup/core/create/MakeBackupRunnable.java +++ b/src/main/java/net/szum123321/textile_backup/core/create/MakeBackupRunnable.java @@ -102,8 +102,8 @@ public class MakeBackupRunnable implements Callable { case TAR -> new AbstractTarArchiver().createArchive(world, outFile, context, coreCount); } - if(!Globals.INSTANCE.getQueueExecutor().isShutdown()) - Globals.INSTANCE.getQueueExecutor().submit(new Cleanup(context.commandSource(), Utilities.getLevelName(context.server()))); + if(context.cleanup()) + new Cleanup(context.commandSource(), Utilities.getLevelName(context.server())).call(); if (config.get().broadcastBackupDone) Utilities.notifyPlayers(context.server(), "Done!"); else log.sendInfoAL(context, "Done!"); diff --git a/src/main/java/net/szum123321/textile_backup/core/create/compressors/AbstractCompressor.java b/src/main/java/net/szum123321/textile_backup/core/create/compressors/AbstractCompressor.java index 596ec48..87f91d3 100644 --- a/src/main/java/net/szum123321/textile_backup/core/create/compressors/AbstractCompressor.java +++ b/src/main/java/net/szum123321/textile_backup/core/create/compressors/AbstractCompressor.java @@ -34,7 +34,9 @@ import java.nio.file.Path; import java.time.Duration; import java.time.Instant; import java.util.Optional; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; /** @@ -53,9 +55,18 @@ public abstract class AbstractCompressor { BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(outStream); OutputStream arc = createArchiveOutputStream(bufferedOutputStream, ctx, coreLimit); Stream fileStream = Files.walk(inputFile)) { + + AtomicInteger fileCounter = new AtomicInteger(0); + var it = fileStream .filter(path -> !Utilities.isBlacklisted(inputFile.relativize(path))) - .filter(Files::isRegularFile).iterator(); + .filter(Files::isRegularFile) + .peek(x -> fileCounter.incrementAndGet()).toList() + .iterator(); + + log.info("File count: {}", fileCounter.get()); + + CountDownLatch latch = new CountDownLatch(fileCounter.get()); while(it.hasNext()) { Path file = it.next(); @@ -66,7 +77,8 @@ public abstract class AbstractCompressor { file, inputFile.relativize(file).toString(), fileHashBuilder, - brokenFileHandler), + brokenFileHandler, + latch), arc ); } catch (IOException e) { @@ -78,37 +90,21 @@ public abstract class AbstractCompressor { } } + latch.await(); + Instant now = Instant.now(); CompressionStatus status = new CompressionStatus ( fileHashBuilder.getValue(), + brokenFileHandler.get(), ctx.startDate(), start.toEpochMilli(), now.toEpochMilli(), - brokenFileHandler.get() + TextileBackup.VERSION ); addEntry(new StatusFileInputSupplier(status.serialize()), arc); finish(arc); - } /*catch(NoSpaceLeftOnDeviceException e) { - log.error(""" - CRITICAL ERROR OCCURRED! - The backup is corrupt! - Don't panic! This is a known issue! - For help see: https://github.com/Szum123321/textile_backup/wiki/ZIP-Problems - In case this isn't it here's also the exception itself""", e); - - if(ctx.initiator() == ActionInitiator.Player) { - log.sendError(ctx, "Backup failed. The file is corrupt."); - log.error("For help see: https://github.com/Szum123321/textile_backup/wiki/ZIP-Problems"); - } - if(ConfigHelper.INSTANCE.get().errorErrorHandlingMode.isStrict()) keep = false; - } catch (IOException | InterruptedException | ExecutionException e) { - log.error("An exception occurred!", e); - if(ctx.initiator() == ActionInitiator.Player) - log.sendError(ctx, "Something went wrong while compressing files!"); - if(ConfigHelper.INSTANCE.get().errorErrorHandlingMode.isStrict()) keep = false; - - } */finally { + } finally { close(); } diff --git a/src/main/java/net/szum123321/textile_backup/core/digest/FileTreeHashBuilder.java b/src/main/java/net/szum123321/textile_backup/core/digest/FileTreeHashBuilder.java index d735ece..5722d7b 100644 --- a/src/main/java/net/szum123321/textile_backup/core/digest/FileTreeHashBuilder.java +++ b/src/main/java/net/szum123321/textile_backup/core/digest/FileTreeHashBuilder.java @@ -27,19 +27,23 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.util.concurrent.atomic.AtomicBoolean; + public class FileTreeHashBuilder { private final static TextileLogger log = new TextileLogger(TextileBackup.MOD_NAME); private final Object lock = new Object(); private long hash = 0, filesProcessed = 0, filesTotalSize = 0; + private final AtomicBoolean closed = new AtomicBoolean(false); public void update(Path path, long newHash) throws IOException { - if(path.getFileName().toString().equals(CompressionStatus.DATA_FILENAME)) return; - //log.info("Putting: {}, {}", path, newHash); + if(closed.get()) throw new RuntimeException("Hash Builder already closed!"); - var hasher = Globals.CHECKSUM_SUPPLIER.get(); + if(path.getFileName().toString().equals(CompressionStatus.DATA_FILENAME)) return; long size = Files.size(path); + var hasher = Globals.CHECKSUM_SUPPLIER.get(); + hasher.update(path.getFileName().toString().getBytes(StandardCharsets.UTF_8)); hasher.update(newHash); @@ -53,11 +57,15 @@ public class FileTreeHashBuilder { public long getValue() { var hasher = Globals.CHECKSUM_SUPPLIER.get(); + closed.set(true); - hasher.update(hash); - hasher.update(filesProcessed); - hasher.update(filesTotalSize); + synchronized (lock) { + log.debug("Closing: files: {}, bytes {}, raw hash {}", filesProcessed, filesTotalSize, hash); + hasher.update(hash); + hasher.update(filesProcessed); + hasher.update(filesTotalSize); - return hasher.getValue(); + return hasher.getValue(); + } } } diff --git a/src/main/java/net/szum123321/textile_backup/core/digest/HashingInputStream.java b/src/main/java/net/szum123321/textile_backup/core/digest/HashingInputStream.java index 8c8d2e9..501935f 100644 --- a/src/main/java/net/szum123321/textile_backup/core/digest/HashingInputStream.java +++ b/src/main/java/net/szum123321/textile_backup/core/digest/HashingInputStream.java @@ -19,39 +19,28 @@ package net.szum123321.textile_backup.core.digest; import net.szum123321.textile_backup.Globals; -import net.szum123321.textile_backup.TextileBackup; -import net.szum123321.textile_backup.TextileLogger; import net.szum123321.textile_backup.core.DataLeftException; import net.szum123321.textile_backup.core.create.BrokenFileHandler; import org.jetbrains.annotations.NotNull; import java.io.*; import java.nio.file.Path; +import java.util.concurrent.CountDownLatch; //This class calculates a hash of the file on the input stream, submits it to FileTreeHashBuilder. //In case the underlying stream hasn't been read completely in, puts it into BrokeFileHandler public class HashingInputStream extends FilterInputStream { - private final static TextileLogger log = new TextileLogger(TextileBackup.MOD_NAME); private final Path path; private final Hash hasher = Globals.CHECKSUM_SUPPLIER.get(); private final FileTreeHashBuilder hashBuilder; private final BrokenFileHandler brokenFileHandler; + private final CountDownLatch latch; - private int cnt = 0; - @Override - public synchronized void reset() throws IOException { - log.info("Called reset! {}", path); - } - - @Override - public boolean markSupported() { - return false; - } - - public HashingInputStream(InputStream in, Path path, FileTreeHashBuilder hashBuilder, BrokenFileHandler brokenFileHandler) { + public HashingInputStream(InputStream in, Path path, FileTreeHashBuilder hashBuilder, BrokenFileHandler brokenFileHandler, CountDownLatch latch) { super(in); this.path = path; + this.latch = latch; this.hashBuilder = hashBuilder; this.brokenFileHandler = brokenFileHandler; } @@ -59,34 +48,29 @@ public class HashingInputStream extends FilterInputStream { @Override public int read(byte @NotNull [] b, int off, int len) throws IOException { int i = in.read(b, off, len); - if(i > -1) { - hasher.update(b, off, i); - cnt += i; - } + if(i != -1) hasher.update(b, off, i); return i; } @Override public int read() throws IOException { int i = in.read(); - if(i > -1) { - hasher.update(i); - cnt++; - } + if(i != -1) hasher.update(i); return i; } + @Override + public boolean markSupported() { + return false; + } + @Override public void close() throws IOException { - if(in.available() == 0) { - long val = hasher.getValue(); - hashBuilder.update(path, val); - log.info("Read in {}, of {}, with hash {}", path, cnt, val); - } - else { - brokenFileHandler.handle(path, new DataLeftException(in.available())); - //log.info("bad file {} {}",path, cnt); - } + if(in.available() == 0) hashBuilder.update(path, hasher.getValue()); + else brokenFileHandler.handle(path, new DataLeftException(in.available())); + + latch.countDown(); + super.close(); } } diff --git a/src/main/java/net/szum123321/textile_backup/core/restore/RestoreBackupRunnable.java b/src/main/java/net/szum123321/textile_backup/core/restore/RestoreBackupRunnable.java index 8caa282..806a4f5 100644 --- a/src/main/java/net/szum123321/textile_backup/core/restore/RestoreBackupRunnable.java +++ b/src/main/java/net/szum123321/textile_backup/core/restore/RestoreBackupRunnable.java @@ -67,6 +67,8 @@ public class RestoreBackupRunnable implements Runnable { return; } + //By making a separate thread we can start unpacking an old backup instantly + //Let the server shut down gracefully, and wait for the old world backup to complete FutureTask waitForShutdown = new FutureTask<>(() -> { ctx.server().getThread().join(); //wait for server to die and save all its state if(config.get().backupOldWorlds) { @@ -75,6 +77,7 @@ public class RestoreBackupRunnable implements Runnable { .newBackupContextBuilder() .setServer(ctx.server()) .setInitiator(ActionInitiator.Restore) + .dontCleanup() .setComment("Old_World" + (ctx.comment() != null ? "_" + ctx.comment() : "")) .build() ).call(); @@ -82,7 +85,7 @@ public class RestoreBackupRunnable implements Runnable { return null; }); - new Thread(waitForShutdown).start(); + new Thread(waitForShutdown, "Server shutdown wait thread").start(); try { log.info("Starting decompression..."); @@ -97,11 +100,14 @@ public class RestoreBackupRunnable implements Runnable { CompressionStatus status = CompressionStatus.readFromFile(tmp); Files.delete(tmp.resolve(CompressionStatus.DATA_FILENAME)); + log.info("Waiting for server to fully terminate..."); + //locks until the backup is finished waitForShutdown.get(); log.info("Status: {}", status); + //TODO: check broken file array boolean valid = status.isValid(hash); if(valid || !config.get().errorErrorHandlingMode.verify()) { if(valid) log.info("Backup valid. Restoring"); @@ -121,12 +127,11 @@ public class RestoreBackupRunnable implements Runnable { log.error("An exception occurred while trying to restore a backup!", e); } finally { //Regardless of what happened, we should still clean up - /* if(Files.exists(tmp)) { + if(Files.exists(tmp)) { try { Utilities.deleteDirectory(tmp); } catch (IOException ignored) {} - }*/ - //TODO: uncomment + } } //in case we're playing on client