Redesign of Compression an archival logic.

Added ZipCompressor
Permanently fixed java.util.concurrent.ExecutionException: java.io.IOException: No space left on device.
This commit is contained in:
szymon 2020-11-30 10:32:22 +01:00
parent a5114b49cc
commit 3b522e2881
11 changed files with 246 additions and 89 deletions

View File

@ -42,4 +42,6 @@ public class Statics {
public static final AtomicBoolean globalShutdownBackupFlag = new AtomicBoolean(true); public static final AtomicBoolean globalShutdownBackupFlag = new AtomicBoolean(true);
public static AwaitThread restoreAwaitThread = null; public static AwaitThread restoreAwaitThread = null;
public static File untouchableFile; public static File untouchableFile;
public static boolean tmpAvailable;
} }

View File

@ -52,6 +52,15 @@ public class TextileBackup implements ModInitializer {
System.exit(1); System.exit(1);
} }
if(Statics.CONFIG.format == ConfigHandler.ArchiveFormat.ZIP) {
Statics.tmpAvailable = Utilities.isTmpAvailable();
if(!Statics.tmpAvailable) {
Statics.LOGGER.warn("WARNING! It seems like the temporary folder is not accessible on this system!\n" +
"This will cause problems with multithreaded zip compression, so a normal one will be used instead.\n" +
"For more info please read: https://github.com/Szum123321/textile_backup/wiki/ZIP-Problems");
}
}
if(Statics.CONFIG.backupInterval > 0) if(Statics.CONFIG.backupInterval > 0)
ServerTickEvents.END_SERVER_TICK.register(Statics.scheduler::tick); ServerTickEvents.END_SERVER_TICK.register(Statics.scheduler::tick);

View File

@ -46,6 +46,28 @@ public class Utilities {
.getSession() .getSession()
.getWorldDirectory(RegistryKey.of(Registry.DIMENSION, DimensionType.OVERWORLD_REGISTRY_KEY.getValue())); .getWorldDirectory(RegistryKey.of(Registry.DIMENSION, DimensionType.OVERWORLD_REGISTRY_KEY.getValue()));
} }
public static File getBackupRootPath(String worldName) {
File path = new File(Statics.CONFIG.path).getAbsoluteFile();
if (Statics.CONFIG.perWorldBackup)
path = path.toPath().resolve(worldName).toFile();
if (!path.exists()) {
path.mkdirs();
}
return path;
}
public static boolean isTmpAvailable() {
try {
File tmp = File.createTempFile("textile_backup_tmp_test", String.valueOf(Instant.now().getEpochSecond()));
return tmp.delete();
} catch (IOException ignored) {}
return false;
}
public static void disableWorldSaving(MinecraftServer server) { public static void disableWorldSaving(MinecraftServer server) {
for (ServerWorld serverWorld : server.getWorlds()) { for (ServerWorld serverWorld : server.getWorlds()) {

View File

@ -21,6 +21,10 @@ package net.szum123321.textile_backup.core.create;
import net.szum123321.textile_backup.Statics; import net.szum123321.textile_backup.Statics;
import net.szum123321.textile_backup.core.create.compressors.*; import net.szum123321.textile_backup.core.create.compressors.*;
import net.szum123321.textile_backup.core.Utilities; import net.szum123321.textile_backup.core.Utilities;
import net.szum123321.textile_backup.core.create.compressors.tar.LZMACompressor;
import net.szum123321.textile_backup.core.create.compressors.tar.ParallelBZip2Compressor;
import net.szum123321.textile_backup.core.create.compressors.tar.ParallelGzipCompressor;
import net.szum123321.textile_backup.core.create.compressors.ParallelZipCompressor;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -70,12 +74,16 @@ public class MakeBackupRunnable implements Runnable {
coreCount = Math.min(Statics.CONFIG.compressionCoreCountLimit, Runtime.getRuntime().availableProcessors()); coreCount = Math.min(Statics.CONFIG.compressionCoreCountLimit, Runtime.getRuntime().availableProcessors());
} }
Statics.LOGGER.trace("Running compression on {} threads. Available cores = {}", coreCount, Runtime.getRuntime().availableProcessors()); Statics.LOGGER.trace("Running compression on {} threads. Available cores: {}", coreCount, Runtime.getRuntime().availableProcessors());
switch (Statics.CONFIG.format) { switch (Statics.CONFIG.format) {
case ZIP: case ZIP: {
ParallelZipCompressor.createArchive(world, outFile, context, coreCount); if(Statics.tmpAvailable && coreCount > 1)
ParallelZipCompressor.getInstance().createArchive(world, outFile, context, coreCount);
else
ZipCompressor.getInstance().createArchive(world, outFile, context, coreCount);
break; break;
}
case BZIP2: case BZIP2:
ParallelBZip2Compressor.getInstance().createArchive(world, outFile, context, coreCount); ParallelBZip2Compressor.getInstance().createArchive(world, outFile, context, coreCount);
@ -93,7 +101,7 @@ public class MakeBackupRunnable implements Runnable {
Statics.LOGGER.warn("Specified compressor ({}) is not supported! Zip will be used instead!", Statics.CONFIG.format); Statics.LOGGER.warn("Specified compressor ({}) is not supported! Zip will be used instead!", Statics.CONFIG.format);
Statics.LOGGER.sendError(context.getCommandSource(), "Error! No correct compression format specified! Using default compressor!"); Statics.LOGGER.sendError(context.getCommandSource(), "Error! No correct compression format specified! Using default compressor!");
ParallelZipCompressor.createArchive(world, outFile, context, coreCount); ZipCompressor.getInstance().createArchive(world, outFile, context, coreCount);
break; break;
} }

View File

@ -21,53 +21,54 @@ package net.szum123321.textile_backup.core.create.compressors;
import net.szum123321.textile_backup.Statics; import net.szum123321.textile_backup.Statics;
import net.szum123321.textile_backup.core.Utilities; import net.szum123321.textile_backup.core.Utilities;
import net.szum123321.textile_backup.core.create.BackupContext; import net.szum123321.textile_backup.core.create.BackupContext;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.utils.IOUtils;
import java.io.*; import java.io.*;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.concurrent.ExecutionException;
public abstract class AbstractTarCompressor { public abstract class AbstractCompressor {
protected abstract OutputStream openCompressorStream(OutputStream outputStream, int coreCountLimit) throws IOException;
public void createArchive(File inputFile, File outputFile, BackupContext ctx, int coreLimit) { public void createArchive(File inputFile, File outputFile, BackupContext ctx, int coreLimit) {
Statics.LOGGER.sendInfo(ctx, "Starting compression...");
Instant start = Instant.now(); Instant start = Instant.now();
try (FileOutputStream outStream = new FileOutputStream(outputFile); try (FileOutputStream outStream = new FileOutputStream(outputFile);
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(outStream); BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(outStream);
OutputStream compressorOutputStream = openCompressorStream(bufferedOutputStream, coreLimit); OutputStream arc = createArchiveOutputStream(bufferedOutputStream, ctx, coreLimit)) {
TarArchiveOutputStream arc = new TarArchiveOutputStream(compressorOutputStream)) {
arc.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX);
arc.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_POSIX);
Files.walk(inputFile.toPath()) Files.walk(inputFile.toPath())
.filter(path -> !Utilities.isBlacklisted(inputFile.toPath().relativize(path))) .filter(path -> !Utilities.isBlacklisted(inputFile.toPath().relativize(path)))
.map(Path::toFile) .map(Path::toFile)
.filter(File::isFile) .filter(File::isFile)
.forEach(file -> { .forEach(file -> {
try (FileInputStream fileInputStream = new FileInputStream(file)){ try {
ArchiveEntry entry = arc.createArchiveEntry(file, inputFile.toPath().relativize(file.toPath()).toString()); addEntry(file, inputFile.toPath().relativize(file.toPath()).toString(), arc);
arc.putArchiveEntry(entry);
IOUtils.copy(fileInputStream, arc);
arc.closeArchiveEntry();
} catch (IOException e) { } catch (IOException e) {
Statics.LOGGER.error("An exception occurred while trying to compress: {}", file.getName(), e); Statics.LOGGER.error("An exception occurred while trying to compress: {}", file.getName(), e);
Statics.LOGGER.sendError(ctx, "Something went wrong while compressing files!"); Statics.LOGGER.sendError(ctx, "Something went wrong while compressing files!");
} }
}); });
} catch (IOException e) {
finish(arc);
} catch (IOException | InterruptedException | ExecutionException e) {
Statics.LOGGER.error("An exception occurred!", e); Statics.LOGGER.error("An exception occurred!", e);
Statics.LOGGER.sendError(ctx, "Something went wrong while compressing files!"); Statics.LOGGER.sendError(ctx, "Something went wrong while compressing files!");
} }
close();
Statics.LOGGER.sendInfo(ctx, "Compression took: {} seconds.", Utilities.formatDuration(Duration.between(start, Instant.now()))); Statics.LOGGER.sendInfo(ctx, "Compression took: {} seconds.", Utilities.formatDuration(Duration.between(start, Instant.now())));
} }
protected abstract OutputStream createArchiveOutputStream(OutputStream stream, BackupContext ctx, int coreLimit) throws IOException;
protected abstract void addEntry(File file, String entryName, OutputStream arc) throws IOException;
protected void finish(OutputStream arc) throws InterruptedException, ExecutionException, IOException {
;//Basically this function is only needed for the ParallelZipCompressor to write out ParallelScatterZipCreator
}
protected void close() {
;//Same as above, just for ParallelGzipCompressor to shutdown ExecutorService
}
} }

View File

@ -19,17 +19,11 @@
package net.szum123321.textile_backup.core.create.compressors; package net.szum123321.textile_backup.core.create.compressors;
import net.szum123321.textile_backup.Statics; import net.szum123321.textile_backup.Statics;
import net.szum123321.textile_backup.core.Utilities;
import net.szum123321.textile_backup.core.create.BackupContext; import net.szum123321.textile_backup.core.create.BackupContext;
import org.apache.commons.compress.archivers.zip.*; import org.apache.commons.compress.archivers.zip.*;
import org.apache.commons.compress.parallel.InputStreamSupplier; import org.apache.commons.compress.parallel.InputStreamSupplier;
import java.io.*; import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.zip.ZipEntry; import java.util.zip.ZipEntry;
@ -39,53 +33,40 @@ import java.util.zip.ZipEntry;
answer by: answer by:
https://stackoverflow.com/users/2987755/dkb https://stackoverflow.com/users/2987755/dkb
*/ */
public class ParallelZipCompressor { public class ParallelZipCompressor extends ZipCompressor {
public static void createArchive(File inputFile, File outputFile, BackupContext ctx, int coreLimit) { private ParallelScatterZipCreator scatterZipCreator;
Statics.LOGGER.sendInfo(ctx, "Starting compression...");
Instant start = Instant.now(); public static ParallelZipCompressor getInstance() {
return new ParallelZipCompressor();
}
Path rootPath = inputFile.toPath(); @Override
protected OutputStream createArchiveOutputStream(OutputStream stream, BackupContext ctx, int coreLimit) {
scatterZipCreator = new ParallelScatterZipCreator(Executors.newFixedThreadPool(coreLimit));
return super.createArchiveOutputStream(stream, ctx, coreLimit);
}
try (FileOutputStream fileOutputStream = new FileOutputStream(outputFile); @Override
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream); protected void addEntry(File file, String entryName, OutputStream arc) throws IOException {
ZipArchiveOutputStream arc = new ZipArchiveOutputStream(bufferedOutputStream)) { ZipArchiveEntry entry = (ZipArchiveEntry)((ZipArchiveOutputStream)arc).createArchiveEntry(file, entryName);
ParallelScatterZipCreator scatterZipCreator = new ParallelScatterZipCreator(Executors.newFixedThreadPool(coreLimit)); if(ZipCompressor.isDotDat(file.getName()))
entry.setMethod(ZipEntry.STORED);
else
entry.setMethod(ZipEntry.DEFLATED);
arc.setMethod(ZipArchiveOutputStream.DEFLATED); entry.setTime(System.currentTimeMillis());
arc.setUseZip64(Zip64Mode.AsNeeded);
arc.setLevel(Statics.CONFIG.compression);
arc.setComment("Created on: " + Utilities.getDateTimeFormatter().format(LocalDateTime.now()));
Files.walk(inputFile.toPath()) scatterZipCreator.addArchiveEntry(entry, new FileInputStreamSupplier(file));
.filter(path -> !Utilities.isBlacklisted(inputFile.toPath().relativize(path))) }
.map(Path::toFile)
.filter(File::isFile)
.forEach(file -> {
try { //IOException gets thrown only when arc is closed
ZipArchiveEntry entry = (ZipArchiveEntry)arc.createArchiveEntry(file, rootPath.relativize(file.toPath()).toString());
entry.setMethod(ZipEntry.DEFLATED); @Override
scatterZipCreator.addArchiveEntry(entry, new FileInputStreamSupplier(file)); protected void finish(OutputStream arc) throws InterruptedException, ExecutionException, IOException {
} catch (IOException e) { scatterZipCreator.writeTo((ZipArchiveOutputStream) arc);
Statics.LOGGER.error("An exception occurred while trying to compress: {}", file.getName(), e);
Statics.LOGGER.sendError(ctx, "Something went wrong while compressing files!");
}
});
scatterZipCreator.writeTo(arc);
} catch (IOException | InterruptedException | ExecutionException e) {
Statics.LOGGER.error("An exception occurred!", e);
Statics.LOGGER.sendError(ctx, "Something went wrong while compressing files!");
}
Statics.LOGGER.sendInfo(ctx, "Compression took: {} seconds.", Utilities.formatDuration(Duration.between(start, Instant.now())));
} }
static class FileInputStreamSupplier implements InputStreamSupplier { static class FileInputStreamSupplier implements InputStreamSupplier {
private final File sourceFile; private final File sourceFile;
private InputStream stream;
FileInputStreamSupplier(File sourceFile) { FileInputStreamSupplier(File sourceFile) {
this.sourceFile = sourceFile; this.sourceFile = sourceFile;
@ -93,12 +74,12 @@ public class ParallelZipCompressor {
public InputStream get() { public InputStream get() {
try { try {
stream = new BufferedInputStream(new FileInputStream(sourceFile)); return new FileInputStream(sourceFile);
} catch (IOException e) { } catch (IOException e) {
Statics.LOGGER.error("An exception occurred while trying to create input stream!", e); Statics.LOGGER.error("An exception occurred while trying to create input stream from file: {}!", sourceFile.getName(), e);
} }
return stream; return null;
} }
} }
} }

View File

@ -0,0 +1,70 @@
/*
* A simple backup mod for Fabric
* Copyright (C) 2020 Szum123321
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package net.szum123321.textile_backup.core.create.compressors;
import net.szum123321.textile_backup.Statics;
import net.szum123321.textile_backup.core.Utilities;
import net.szum123321.textile_backup.core.create.BackupContext;
import org.apache.commons.compress.archivers.zip.Zip64Mode;
import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream;
import org.apache.commons.compress.utils.IOUtils;
import java.io.*;
import java.time.LocalDateTime;
public class ZipCompressor extends AbstractCompressor {
public static ZipCompressor getInstance() {
return new ZipCompressor();
}
@Override
protected OutputStream createArchiveOutputStream(OutputStream stream, BackupContext ctx, int coreLimit) {
ZipArchiveOutputStream arc = new ZipArchiveOutputStream(stream);
arc.setMethod(ZipArchiveOutputStream.DEFLATED);
arc.setUseZip64(Zip64Mode.AsNeeded);
arc.setLevel(Statics.CONFIG.compression);
arc.setComment("Created on: " + Utilities.getDateTimeFormatter().format(LocalDateTime.now()));
return arc;
}
@Override
protected void addEntry(File file, String entryName, OutputStream arc) throws IOException {
try (FileInputStream fileInputStream = new FileInputStream(file)){
ZipArchiveEntry entry = (ZipArchiveEntry)((ZipArchiveOutputStream)arc).createArchiveEntry(file, entryName);
if(isDotDat(file.getName()))
entry.setMethod(ZipArchiveOutputStream.STORED);
((ZipArchiveOutputStream)arc).putArchiveEntry(entry);
IOUtils.copy(fileInputStream, arc);
((ZipArchiveOutputStream)arc).closeArchiveEntry();
}
}
protected static boolean isDotDat(String filename) {
String[] arr = filename.split("\\.");
return arr[arr.length - 1].contains("dat"); //includes dat_old
}
}

View File

@ -0,0 +1,56 @@
/*
* A simple backup mod for Fabric
* Copyright (C) 2020 Szum123321
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package net.szum123321.textile_backup.core.create.compressors.tar;
import net.szum123321.textile_backup.core.create.BackupContext;
import net.szum123321.textile_backup.core.create.compressors.AbstractCompressor;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.utils.IOUtils;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
public abstract class AbstractTarArchiver extends AbstractCompressor {
protected abstract OutputStream getCompressorOutputStream(OutputStream stream, BackupContext ctx, int coreLimit) throws IOException;
@Override
protected OutputStream createArchiveOutputStream(OutputStream stream, BackupContext ctx, int coreLimit) throws IOException {
TarArchiveOutputStream tar = new TarArchiveOutputStream(getCompressorOutputStream(stream, ctx, coreLimit));
tar.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX);
tar.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_POSIX);
return tar;
}
@Override
protected void addEntry(File file, String entryName, OutputStream arc) throws IOException {
try (FileInputStream fileInputStream = new FileInputStream(file)){
TarArchiveEntry entry = (TarArchiveEntry)((TarArchiveOutputStream) arc).createArchiveEntry(file, entryName);
((TarArchiveOutputStream)arc).putArchiveEntry(entry);
IOUtils.copy(fileInputStream, arc);
((TarArchiveOutputStream)arc).closeArchiveEntry();
}
}
}

View File

@ -16,21 +16,20 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>. * along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
package net.szum123321.textile_backup.core.create.compressors; package net.szum123321.textile_backup.core.create.compressors.tar;
import net.szum123321.textile_backup.core.create.BackupContext;
import org.apache.commons.compress.compressors.xz.XZCompressorOutputStream; import org.apache.commons.compress.compressors.xz.XZCompressorOutputStream;
import java.io.*; import java.io.*;
public class LZMACompressor extends AbstractTarCompressor { public class LZMACompressor extends AbstractTarArchiver {
private static final LZMACompressor INSTANCE = new LZMACompressor();
public static LZMACompressor getInstance() { public static LZMACompressor getInstance() {
return INSTANCE; return new LZMACompressor();
} }
@Override @Override
protected OutputStream openCompressorStream(OutputStream outputStream, int coreCountLimit) throws IOException { protected OutputStream getCompressorOutputStream(OutputStream stream, BackupContext ctx, int coreLimit) throws IOException {
return new XZCompressorOutputStream(outputStream); return new XZCompressorOutputStream(stream);
} }
} }

View File

@ -16,22 +16,21 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>. * along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
package net.szum123321.textile_backup.core.create.compressors; package net.szum123321.textile_backup.core.create.compressors.tar;
import net.szum123321.textile_backup.core.create.BackupContext;
import org.at4j.comp.bzip2.BZip2OutputStream; import org.at4j.comp.bzip2.BZip2OutputStream;
import org.at4j.comp.bzip2.BZip2OutputStreamSettings; import org.at4j.comp.bzip2.BZip2OutputStreamSettings;
import java.io.*; import java.io.*;
public class ParallelBZip2Compressor extends AbstractTarCompressor { public class ParallelBZip2Compressor extends AbstractTarArchiver {
private static final ParallelBZip2Compressor INSTANCE = new ParallelBZip2Compressor();
public static ParallelBZip2Compressor getInstance() { public static ParallelBZip2Compressor getInstance() {
return INSTANCE; return new ParallelBZip2Compressor();
} }
@Override @Override
protected OutputStream openCompressorStream(OutputStream outputStream, int coreCountLimit) throws IOException { protected OutputStream getCompressorOutputStream(OutputStream stream, BackupContext ctx, int coreLimit) throws IOException {
return new BZip2OutputStream(outputStream, new BZip2OutputStreamSettings().setNumberOfEncoderThreads(coreCountLimit)); return new BZip2OutputStream(stream, new BZip2OutputStreamSettings().setNumberOfEncoderThreads(coreLimit));
} }
} }

View File

@ -16,22 +16,32 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>. * along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
package net.szum123321.textile_backup.core.create.compressors; package net.szum123321.textile_backup.core.create.compressors.tar;
import net.szum123321.textile_backup.core.create.BackupContext;
import org.anarres.parallelgzip.ParallelGZIPOutputStream; import org.anarres.parallelgzip.ParallelGZIPOutputStream;
import java.io.*; import java.io.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
public class ParallelGzipCompressor extends AbstractTarCompressor { public class ParallelGzipCompressor extends AbstractTarArchiver {
private static final ParallelGzipCompressor INSTANCE = new ParallelGzipCompressor(); private ExecutorService executorService;
public static ParallelGzipCompressor getInstance() { public static ParallelGzipCompressor getInstance() {
return INSTANCE; return new ParallelGzipCompressor();
} }
@Override @Override
protected OutputStream openCompressorStream(OutputStream outputStream, int coreCountLimit) throws IOException { protected OutputStream getCompressorOutputStream(OutputStream stream, BackupContext ctx, int coreLimit) throws IOException {
return new ParallelGZIPOutputStream(outputStream, Executors.newFixedThreadPool(coreCountLimit)); executorService = Executors.newFixedThreadPool(coreLimit);
return new ParallelGZIPOutputStream(stream, executorService);
}
@Override
protected void close() {
//it seems like ParallelGZIPOutputStream doesn't shut down its ExecutorService, so to not leave garbage I shutdown it
executorService.shutdown();
} }
} }