Repaired race condition with ParallelZip. Hashing now works!

This commit is contained in:
Szum123321 2022-11-29 00:55:18 +01:00
parent 5367a00cdc
commit 2efe112157
8 changed files with 80 additions and 103 deletions

View File

@ -19,16 +19,14 @@
package net.szum123321.textile_backup; package net.szum123321.textile_backup;
import net.minecraft.server.MinecraftServer; import net.minecraft.server.MinecraftServer;
import net.szum123321.textile_backup.core.digest.BalticHash;
import net.szum123321.textile_backup.core.digest.Hash; import net.szum123321.textile_backup.core.digest.Hash;
import net.szum123321.textile_backup.core.Utilities; import net.szum123321.textile_backup.core.Utilities;
import net.szum123321.textile_backup.core.create.MakeBackupRunnable; import net.szum123321.textile_backup.core.create.MakeBackupRunnable;
import net.szum123321.textile_backup.core.restore.AwaitThread; import net.szum123321.textile_backup.core.restore.AwaitThread;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.tukaani.xz.check.CRC64;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
@ -44,33 +42,9 @@ public class Globals {
public static final Globals INSTANCE = new Globals(); public static final Globals INSTANCE = new Globals();
private static final TextileLogger log = new TextileLogger(TextileBackup.MOD_NAME); private static final TextileLogger log = new TextileLogger(TextileBackup.MOD_NAME);
public static final DateTimeFormatter defaultDateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH.mm.ss"); public static final DateTimeFormatter defaultDateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH.mm.ss");
public static final Supplier<Hash> CHECKSUM_SUPPLIER = () -> new Hash() { public static final Supplier<Hash> CHECKSUM_SUPPLIER = BalticHash::new;
private final CRC64 crc = new CRC64();
@Override
public void update(byte b) {
crc.update(new byte[]{b});
}
@Override private ExecutorService executorService = null;//TODO: AAAAAAAAAAAAAAA MEMORY LEAK!!!!!!!!!
public void update(long b) {
ByteBuffer v = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN);
v.putLong(b);
crc.update(v.array());
}
@Override
public void update(byte[] b, int off, int len) {
crc.update(b, off, len);
}
@Override
public long getValue() {
ByteBuffer b = ByteBuffer.wrap(crc.finish());
return b.getLong();
}
};
private ExecutorService executorService = null;// = Executors.newSingleThreadExecutor();
public final AtomicBoolean globalShutdownBackupFlag = new AtomicBoolean(true); public final AtomicBoolean globalShutdownBackupFlag = new AtomicBoolean(true);
public boolean disableWatchdog = false; public boolean disableWatchdog = false;
private boolean disableTMPFiles = false; private boolean disableTMPFiles = false;

View File

@ -30,6 +30,7 @@ public record BackupContext(@NotNull MinecraftServer server,
ServerCommandSource commandSource, ServerCommandSource commandSource,
ActionInitiator initiator, ActionInitiator initiator,
boolean save, boolean save,
boolean cleanup,
String comment, String comment,
LocalDateTime startDate) { LocalDateTime startDate) {
@ -46,6 +47,7 @@ public record BackupContext(@NotNull MinecraftServer server,
private ServerCommandSource commandSource; private ServerCommandSource commandSource;
private ActionInitiator initiator; private ActionInitiator initiator;
private boolean save; private boolean save;
private boolean cleanup;
private String comment; private String comment;
private boolean guessInitiator; private boolean guessInitiator;
@ -55,6 +57,7 @@ public record BackupContext(@NotNull MinecraftServer server,
this.commandSource = null; this.commandSource = null;
this.initiator = null; this.initiator = null;
this.save = false; this.save = false;
cleanup = true; //defaults
this.comment = null; this.comment = null;
guessInitiator = false; guessInitiator = false;
@ -94,6 +97,11 @@ public record BackupContext(@NotNull MinecraftServer server,
return this; return this;
} }
public Builder dontCleanup() {
this.cleanup = false;
return this;
}
public BackupContext build() { public BackupContext build() {
if (guessInitiator) { if (guessInitiator) {
initiator = commandSource.getEntity() instanceof PlayerEntity ? ActionInitiator.Player : ActionInitiator.ServerConsole; initiator = commandSource.getEntity() instanceof PlayerEntity ? ActionInitiator.Player : ActionInitiator.ServerConsole;
@ -106,7 +114,7 @@ public record BackupContext(@NotNull MinecraftServer server,
else throw new RuntimeException("Neither MinecraftServer or ServerCommandSource were provided!"); else throw new RuntimeException("Neither MinecraftServer or ServerCommandSource were provided!");
} }
return new BackupContext(server, commandSource, initiator, save, comment, LocalDateTime.now()); return new BackupContext(server, commandSource, initiator, save, cleanup, comment, LocalDateTime.now());
} }
} }
} }

View File

@ -29,15 +29,19 @@ import java.io.InputStream;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CountDownLatch;
public record FileInputStreamSupplier(Path path, String name, FileTreeHashBuilder hashTreeBuilder, BrokenFileHandler brokenFileHandler) implements InputSupplier { public record FileInputStreamSupplier(Path path, String name, FileTreeHashBuilder hashTreeBuilder, BrokenFileHandler brokenFileHandler, CountDownLatch latch) implements InputSupplier {
private final static TextileLogger log = new TextileLogger(TextileBackup.MOD_NAME); private final static TextileLogger log = new TextileLogger(TextileBackup.MOD_NAME);
@Override @Override
public InputStream getInputStream() throws IOException { public InputStream getInputStream() throws IOException {
try { try {
return new HashingInputStream(Files.newInputStream(path), path, hashTreeBuilder, brokenFileHandler); return new HashingInputStream(Files.newInputStream(path), path, hashTreeBuilder, brokenFileHandler, latch);
} catch (IOException e) { } catch (IOException e) {
//Probably good idea to just put it here. In the case an exception is thrown here, it would be probable
//The latch would never be lifted
latch.countDown();
brokenFileHandler.handle(path, e); brokenFileHandler.handle(path, e);
throw e; throw e;
} }
@ -47,9 +51,7 @@ public record FileInputStreamSupplier(Path path, String name, FileTreeHashBuilde
public Optional<Path> getPath() { return Optional.of(path); } public Optional<Path> getPath() { return Optional.of(path); }
@Override @Override
public long size() throws IOException { public long size() throws IOException { return Files.size(path); }
return Files.size(path);
}
@Override @Override
public String getName() { public String getName() {

View File

@ -102,8 +102,8 @@ public class MakeBackupRunnable implements Callable<Void> {
case TAR -> new AbstractTarArchiver().createArchive(world, outFile, context, coreCount); case TAR -> new AbstractTarArchiver().createArchive(world, outFile, context, coreCount);
} }
if(!Globals.INSTANCE.getQueueExecutor().isShutdown()) if(context.cleanup())
Globals.INSTANCE.getQueueExecutor().submit(new Cleanup(context.commandSource(), Utilities.getLevelName(context.server()))); new Cleanup(context.commandSource(), Utilities.getLevelName(context.server())).call();
if (config.get().broadcastBackupDone) Utilities.notifyPlayers(context.server(), "Done!"); if (config.get().broadcastBackupDone) Utilities.notifyPlayers(context.server(), "Done!");
else log.sendInfoAL(context, "Done!"); else log.sendInfoAL(context, "Done!");

View File

@ -34,7 +34,9 @@ import java.nio.file.Path;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream; import java.util.stream.Stream;
/** /**
@ -53,9 +55,18 @@ public abstract class AbstractCompressor {
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(outStream); BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(outStream);
OutputStream arc = createArchiveOutputStream(bufferedOutputStream, ctx, coreLimit); OutputStream arc = createArchiveOutputStream(bufferedOutputStream, ctx, coreLimit);
Stream<Path> fileStream = Files.walk(inputFile)) { Stream<Path> fileStream = Files.walk(inputFile)) {
AtomicInteger fileCounter = new AtomicInteger(0);
var it = fileStream var it = fileStream
.filter(path -> !Utilities.isBlacklisted(inputFile.relativize(path))) .filter(path -> !Utilities.isBlacklisted(inputFile.relativize(path)))
.filter(Files::isRegularFile).iterator(); .filter(Files::isRegularFile)
.peek(x -> fileCounter.incrementAndGet()).toList()
.iterator();
log.info("File count: {}", fileCounter.get());
CountDownLatch latch = new CountDownLatch(fileCounter.get());
while(it.hasNext()) { while(it.hasNext()) {
Path file = it.next(); Path file = it.next();
@ -66,7 +77,8 @@ public abstract class AbstractCompressor {
file, file,
inputFile.relativize(file).toString(), inputFile.relativize(file).toString(),
fileHashBuilder, fileHashBuilder,
brokenFileHandler), brokenFileHandler,
latch),
arc arc
); );
} catch (IOException e) { } catch (IOException e) {
@ -78,37 +90,21 @@ public abstract class AbstractCompressor {
} }
} }
latch.await();
Instant now = Instant.now(); Instant now = Instant.now();
CompressionStatus status = new CompressionStatus ( CompressionStatus status = new CompressionStatus (
fileHashBuilder.getValue(), fileHashBuilder.getValue(),
brokenFileHandler.get(),
ctx.startDate(), start.toEpochMilli(), now.toEpochMilli(), ctx.startDate(), start.toEpochMilli(), now.toEpochMilli(),
brokenFileHandler.get() TextileBackup.VERSION
); );
addEntry(new StatusFileInputSupplier(status.serialize()), arc); addEntry(new StatusFileInputSupplier(status.serialize()), arc);
finish(arc); finish(arc);
} /*catch(NoSpaceLeftOnDeviceException e) { } finally {
log.error("""
CRITICAL ERROR OCCURRED!
The backup is corrupt!
Don't panic! This is a known issue!
For help see: https://github.com/Szum123321/textile_backup/wiki/ZIP-Problems
In case this isn't it here's also the exception itself""", e);
if(ctx.initiator() == ActionInitiator.Player) {
log.sendError(ctx, "Backup failed. The file is corrupt.");
log.error("For help see: https://github.com/Szum123321/textile_backup/wiki/ZIP-Problems");
}
if(ConfigHelper.INSTANCE.get().errorErrorHandlingMode.isStrict()) keep = false;
} catch (IOException | InterruptedException | ExecutionException e) {
log.error("An exception occurred!", e);
if(ctx.initiator() == ActionInitiator.Player)
log.sendError(ctx, "Something went wrong while compressing files!");
if(ConfigHelper.INSTANCE.get().errorErrorHandlingMode.isStrict()) keep = false;
} */finally {
close(); close();
} }

View File

@ -27,19 +27,23 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicBoolean;
public class FileTreeHashBuilder { public class FileTreeHashBuilder {
private final static TextileLogger log = new TextileLogger(TextileBackup.MOD_NAME); private final static TextileLogger log = new TextileLogger(TextileBackup.MOD_NAME);
private final Object lock = new Object(); private final Object lock = new Object();
private long hash = 0, filesProcessed = 0, filesTotalSize = 0; private long hash = 0, filesProcessed = 0, filesTotalSize = 0;
private final AtomicBoolean closed = new AtomicBoolean(false);
public void update(Path path, long newHash) throws IOException { public void update(Path path, long newHash) throws IOException {
if(path.getFileName().toString().equals(CompressionStatus.DATA_FILENAME)) return; if(closed.get()) throw new RuntimeException("Hash Builder already closed!");
//log.info("Putting: {}, {}", path, newHash);
var hasher = Globals.CHECKSUM_SUPPLIER.get(); if(path.getFileName().toString().equals(CompressionStatus.DATA_FILENAME)) return;
long size = Files.size(path); long size = Files.size(path);
var hasher = Globals.CHECKSUM_SUPPLIER.get();
hasher.update(path.getFileName().toString().getBytes(StandardCharsets.UTF_8)); hasher.update(path.getFileName().toString().getBytes(StandardCharsets.UTF_8));
hasher.update(newHash); hasher.update(newHash);
@ -53,11 +57,15 @@ public class FileTreeHashBuilder {
public long getValue() { public long getValue() {
var hasher = Globals.CHECKSUM_SUPPLIER.get(); var hasher = Globals.CHECKSUM_SUPPLIER.get();
closed.set(true);
hasher.update(hash); synchronized (lock) {
hasher.update(filesProcessed); log.debug("Closing: files: {}, bytes {}, raw hash {}", filesProcessed, filesTotalSize, hash);
hasher.update(filesTotalSize); hasher.update(hash);
hasher.update(filesProcessed);
hasher.update(filesTotalSize);
return hasher.getValue(); return hasher.getValue();
}
} }
} }

View File

@ -19,39 +19,28 @@
package net.szum123321.textile_backup.core.digest; package net.szum123321.textile_backup.core.digest;
import net.szum123321.textile_backup.Globals; import net.szum123321.textile_backup.Globals;
import net.szum123321.textile_backup.TextileBackup;
import net.szum123321.textile_backup.TextileLogger;
import net.szum123321.textile_backup.core.DataLeftException; import net.szum123321.textile_backup.core.DataLeftException;
import net.szum123321.textile_backup.core.create.BrokenFileHandler; import net.szum123321.textile_backup.core.create.BrokenFileHandler;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import java.io.*; import java.io.*;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.concurrent.CountDownLatch;
//This class calculates a hash of the file on the input stream, submits it to FileTreeHashBuilder. //This class calculates a hash of the file on the input stream, submits it to FileTreeHashBuilder.
//In case the underlying stream hasn't been read completely in, puts it into BrokeFileHandler //In case the underlying stream hasn't been read completely in, puts it into BrokeFileHandler
public class HashingInputStream extends FilterInputStream { public class HashingInputStream extends FilterInputStream {
private final static TextileLogger log = new TextileLogger(TextileBackup.MOD_NAME);
private final Path path; private final Path path;
private final Hash hasher = Globals.CHECKSUM_SUPPLIER.get(); private final Hash hasher = Globals.CHECKSUM_SUPPLIER.get();
private final FileTreeHashBuilder hashBuilder; private final FileTreeHashBuilder hashBuilder;
private final BrokenFileHandler brokenFileHandler; private final BrokenFileHandler brokenFileHandler;
private final CountDownLatch latch;
private int cnt = 0;
@Override public HashingInputStream(InputStream in, Path path, FileTreeHashBuilder hashBuilder, BrokenFileHandler brokenFileHandler, CountDownLatch latch) {
public synchronized void reset() throws IOException {
log.info("Called reset! {}", path);
}
@Override
public boolean markSupported() {
return false;
}
public HashingInputStream(InputStream in, Path path, FileTreeHashBuilder hashBuilder, BrokenFileHandler brokenFileHandler) {
super(in); super(in);
this.path = path; this.path = path;
this.latch = latch;
this.hashBuilder = hashBuilder; this.hashBuilder = hashBuilder;
this.brokenFileHandler = brokenFileHandler; this.brokenFileHandler = brokenFileHandler;
} }
@ -59,34 +48,29 @@ public class HashingInputStream extends FilterInputStream {
@Override @Override
public int read(byte @NotNull [] b, int off, int len) throws IOException { public int read(byte @NotNull [] b, int off, int len) throws IOException {
int i = in.read(b, off, len); int i = in.read(b, off, len);
if(i > -1) { if(i != -1) hasher.update(b, off, i);
hasher.update(b, off, i);
cnt += i;
}
return i; return i;
} }
@Override @Override
public int read() throws IOException { public int read() throws IOException {
int i = in.read(); int i = in.read();
if(i > -1) { if(i != -1) hasher.update(i);
hasher.update(i);
cnt++;
}
return i; return i;
} }
@Override
public boolean markSupported() {
return false;
}
@Override @Override
public void close() throws IOException { public void close() throws IOException {
if(in.available() == 0) { if(in.available() == 0) hashBuilder.update(path, hasher.getValue());
long val = hasher.getValue(); else brokenFileHandler.handle(path, new DataLeftException(in.available()));
hashBuilder.update(path, val);
log.info("Read in {}, of {}, with hash {}", path, cnt, val); latch.countDown();
}
else {
brokenFileHandler.handle(path, new DataLeftException(in.available()));
//log.info("bad file {} {}",path, cnt);
}
super.close(); super.close();
} }
} }

View File

@ -67,6 +67,8 @@ public class RestoreBackupRunnable implements Runnable {
return; return;
} }
//By making a separate thread we can start unpacking an old backup instantly
//Let the server shut down gracefully, and wait for the old world backup to complete
FutureTask<Void> waitForShutdown = new FutureTask<>(() -> { FutureTask<Void> waitForShutdown = new FutureTask<>(() -> {
ctx.server().getThread().join(); //wait for server to die and save all its state ctx.server().getThread().join(); //wait for server to die and save all its state
if(config.get().backupOldWorlds) { if(config.get().backupOldWorlds) {
@ -75,6 +77,7 @@ public class RestoreBackupRunnable implements Runnable {
.newBackupContextBuilder() .newBackupContextBuilder()
.setServer(ctx.server()) .setServer(ctx.server())
.setInitiator(ActionInitiator.Restore) .setInitiator(ActionInitiator.Restore)
.dontCleanup()
.setComment("Old_World" + (ctx.comment() != null ? "_" + ctx.comment() : "")) .setComment("Old_World" + (ctx.comment() != null ? "_" + ctx.comment() : ""))
.build() .build()
).call(); ).call();
@ -82,7 +85,7 @@ public class RestoreBackupRunnable implements Runnable {
return null; return null;
}); });
new Thread(waitForShutdown).start(); new Thread(waitForShutdown, "Server shutdown wait thread").start();
try { try {
log.info("Starting decompression..."); log.info("Starting decompression...");
@ -97,11 +100,14 @@ public class RestoreBackupRunnable implements Runnable {
CompressionStatus status = CompressionStatus.readFromFile(tmp); CompressionStatus status = CompressionStatus.readFromFile(tmp);
Files.delete(tmp.resolve(CompressionStatus.DATA_FILENAME)); Files.delete(tmp.resolve(CompressionStatus.DATA_FILENAME));
log.info("Waiting for server to fully terminate...");
//locks until the backup is finished //locks until the backup is finished
waitForShutdown.get(); waitForShutdown.get();
log.info("Status: {}", status); log.info("Status: {}", status);
//TODO: check broken file array
boolean valid = status.isValid(hash); boolean valid = status.isValid(hash);
if(valid || !config.get().errorErrorHandlingMode.verify()) { if(valid || !config.get().errorErrorHandlingMode.verify()) {
if(valid) log.info("Backup valid. Restoring"); if(valid) log.info("Backup valid. Restoring");
@ -121,12 +127,11 @@ public class RestoreBackupRunnable implements Runnable {
log.error("An exception occurred while trying to restore a backup!", e); log.error("An exception occurred while trying to restore a backup!", e);
} finally { } finally {
//Regardless of what happened, we should still clean up //Regardless of what happened, we should still clean up
/* if(Files.exists(tmp)) { if(Files.exists(tmp)) {
try { try {
Utilities.deleteDirectory(tmp); Utilities.deleteDirectory(tmp);
} catch (IOException ignored) {} } catch (IOException ignored) {}
}*/ }
//TODO: uncomment
} }
//in case we're playing on client //in case we're playing on client