From 505c917e56672d210885bcb3c30441c5758c7caf Mon Sep 17 00:00:00 2001 From: carbotaniuman <41451839+carbotaniuman@users.noreply.github.com> Date: Fri, 12 Jun 2020 22:35:08 -0500 Subject: [PATCH] Make Statistics immutable --- src/main/java/mdnet/base/MangaDexClient.java | 145 ++++++++++++------ src/main/java/mdnet/base/Statistics.java | 58 ------- .../mdnet/base/settings/ClientSettings.java | 6 - src/main/java/mdnet/cache/DiskLruCache.java | 2 +- src/main/kotlin/mdnet/base/Netty.kt | 14 +- src/main/kotlin/mdnet/base/Statistics.kt | 11 ++ src/main/kotlin/mdnet/base/web/Application.kt | 26 ++-- 7 files changed, 129 insertions(+), 133 deletions(-) delete mode 100644 src/main/java/mdnet/base/Statistics.java create mode 100644 src/main/kotlin/mdnet/base/Statistics.kt diff --git a/src/main/java/mdnet/base/MangaDexClient.java b/src/main/java/mdnet/base/MangaDexClient.java index 3a09e17..928fc8e 100644 --- a/src/main/java/mdnet/base/MangaDexClient.java +++ b/src/main/java/mdnet/base/MangaDexClient.java @@ -2,7 +2,9 @@ package mdnet.base; import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import com.google.gson.reflect.TypeToken; import mdnet.base.settings.ClientSettings; +import mdnet.base.settings.WebSettings; import mdnet.base.web.ApplicationKt; import mdnet.base.web.WebUiKt; import mdnet.cache.DiskLruCache; @@ -12,13 +14,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.*; +import java.util.ArrayList; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; public class MangaDexClient { + private final static Gson GSON = new GsonBuilder().setPrettyPrinting().create(); private final static Logger LOGGER = LoggerFactory.getLogger(MangaDexClient.class); // This lock protects the Http4kServer from concurrent restart attempts @@ -27,13 +31,16 @@ public class MangaDexClient { private final ServerHandler serverHandler; private final ClientSettings clientSettings; private final AtomicReference statistics; - private ServerSettings serverSettings; - // if this is null, then the server has shutdown - private Http4kServer engine; + private ServerSettings serverSettings; + private Http4kServer engine; // if this is null, then the server has shutdown private Http4kServer webUi; private DiskLruCache cache; + // these variables are for runLoop(); + private int counter = 0; + private long lastBytesSent = 0; + public MangaDexClient(ClientSettings clientSettings) { this.clientSettings = clientSettings; this.serverHandler = new ServerHandler(clientSettings); @@ -42,14 +49,23 @@ public class MangaDexClient { try { cache = DiskLruCache.open(new File("cache"), 3, 3, clientSettings.getMaxCacheSizeMib() * 1024 * 1024 /* MiB to bytes */); + + DiskLruCache.Snapshot snapshot = cache.get("statistics"); + if (snapshot != null) { + String json = snapshot.getString(0); + snapshot.close(); + statistics.set(GSON.fromJson(json, new TypeToken>() { + }.getType())); + } else { + statistics.set(new Statistics()); + } + lastBytesSent = statistics.get().getBytesSent(); } catch (IOException e) { MangaDexClient.dieWithError(e); } } - // This function also does most of the program initialization. public void runLoop() { - statistics.set(new Statistics(0)); loginAndStartServer(); if (serverSettings.getLatestBuild() > Constants.CLIENT_BUILD) { if (LOGGER.isWarnEnabled()) { @@ -58,26 +74,19 @@ public class MangaDexClient { } } + if (clientSettings.getWebSettings() != null) { + webUi = WebUiKt.getUiServer(clientSettings.getWebSettings(), statistics); + webUi.start(); + } + if (LOGGER.isInfoEnabled()) { LOGGER.info("MDNet initialization completed successfully. Starting normal operation."); } - webUi = WebUiKt.getUiServer(clientSettings.getWebSettings(), statistics); - webUi.start(); - - // we don't really care about the Atomic part here - AtomicInteger counter = new AtomicInteger(); - // ping keep-alive every 45 seconds executorService.scheduleAtFixedRate(() -> { - int num = counter.get(); - if (num == 80) { - counter.set(0); - - // if server is stopped due to egress limits, restart it - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Hourly update: refreshing statistics"); - } - statistics.set(new Statistics(statistics.get().getSequenceNumber() + 1)); + if (counter == 80) { + counter = 0; + lastBytesSent = statistics.get().getBytesSent(); if (engine == null) { if (LOGGER.isInfoEnabled()) { @@ -87,7 +96,7 @@ public class MangaDexClient { loginAndStartServer(); } } else { - counter.set(num + 1); + counter++; } // if the server is offline then don't try and refresh certs @@ -95,8 +104,9 @@ public class MangaDexClient { return; } - if (clientSettings.getMaxBandwidthMibPerHour() != 0 && clientSettings.getMaxBandwidthMibPerHour() * 1024 - * 1024 /* MiB to bytes */ < statistics.get().getBytesSent().get()) { + long currentBytesSent = statistics.get().getBytesSent() - lastBytesSent; + if (clientSettings.getMaxBandwidthMibPerHour() != 0 + && clientSettings.getMaxBandwidthMibPerHour() * 1024 * 1024 /* MiB to bytes */ < currentBytesSent) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Shutting down server as hourly bandwidth limit reached"); } @@ -170,6 +180,12 @@ public class MangaDexClient { logoutAndStopServer(); } + webUi.close(); + try { + cache.close(); + } catch (IOException e) { + LOGGER.error("Cache failed to close", e); + } } public static void main(String[] args) { @@ -184,47 +200,28 @@ public class MangaDexClient { MangaDexClient.dieWithError("Expected one argument: path to config file, or nothing"); } - Gson gson = new GsonBuilder().setPrettyPrinting().create(); ClientSettings settings; try { - settings = gson.fromJson(new FileReader(file), ClientSettings.class); + settings = GSON.fromJson(new FileReader(file), ClientSettings.class); } catch (FileNotFoundException ignored) { settings = new ClientSettings(); LOGGER.warn("Settings file {} not found, generating file", file); try (FileWriter writer = new FileWriter(file)) { - writer.write(gson.toJson(settings)); + writer.write(GSON.toJson(settings)); } catch (IOException e) { MangaDexClient.dieWithError(e); } } - if (!ClientSettings.isSecretValid(settings.getClientSecret())) - MangaDexClient.dieWithError("Config Error: API Secret is invalid, must be 52 alphanumeric characters"); - - if (settings.getClientPort() == 0) { - MangaDexClient.dieWithError("Config Error: Invalid port number"); - } - - if (settings.getMaxCacheSizeMib() < 1024) { - MangaDexClient.dieWithError("Config Error: Invalid max cache size, must be >= 1024 MiB (1GiB)"); - } - - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Client settings loaded: {}", settings); - } - - MangaDexClient client = new MangaDexClient(settings); - Runtime.getRuntime().addShutdownHook(new Thread(client::shutdown)); - client.runLoop(); + validateSettings(settings); if (settings.getWebSettings() != null) { - // java.io.ByteArrayOutputStream out = new java.io.ByteArrayOutputStream(); - // System.setOut(new java.io.PrintStream(out)); + WebSettings webSettings = settings.getWebSettings(); + // TODO: system.out redirect - ClientSettings finalSettings = settings; new Thread(() -> { - WebConsole webConsole = new WebConsole(finalSettings.getWebSettings().getUiWebsocketPort()) { + WebConsole webConsole = new WebConsole(webSettings.getUiWebsocketPort()) { @Override protected void parseMessage(String message) { System.out.println(message); @@ -235,6 +232,14 @@ public class MangaDexClient { // TODO: webConsole.sendMessage(t,m) whenever system.out is written to }).start(); } + + if (LOGGER.isInfoEnabled()) { + LOGGER.info("Client settings loaded: {}", settings); + } + + MangaDexClient client = new MangaDexClient(settings); + Runtime.getRuntime().addShutdownHook(new Thread(client::shutdown)); + client.runLoop(); } public static void dieWithError(Throwable e) { @@ -246,8 +251,48 @@ public class MangaDexClient { public static void dieWithError(String error) { if (LOGGER.isErrorEnabled()) { - LOGGER.error("Critical Error: " + error); + LOGGER.error("Critical Error: {}", error); } System.exit(1); } + + public static void validateSettings(ClientSettings settings) { + if (!isSecretValid(settings.getClientSecret())) + MangaDexClient.dieWithError("Config Error: API Secret is invalid, must be 52 alphanumeric characters"); + + if (settings.getClientPort() == 0) { + MangaDexClient.dieWithError("Config Error: Invalid port number"); + } + + if (settings.getMaxCacheSizeMib() < 1024) { + MangaDexClient.dieWithError("Config Error: Invalid max cache size, must be >= 1024 MiB (1GiB)"); + } + + if (settings.getThreads() < 4) { + MangaDexClient.dieWithError("Config Error: Invalid number of threads, must be >= 8"); + } + + if (settings.getMaxBandwidthMibPerHour() < 0) { + MangaDexClient.dieWithError("Config Error: Max bandwidth must be >= 0"); + } + + if (settings.getMaxBurstRateKibPerSecond() < 0) { + MangaDexClient.dieWithError("Config Error: Max burst rate must be >= 0"); + } + + if (settings.getWebSettings() != null) { + if (settings.getWebSettings().getUiPort() == 0) { + MangaDexClient.dieWithError("Config Error: Invalid UI port number"); + } + + if (settings.getWebSettings().getUiWebsocketPort() == 0) { + MangaDexClient.dieWithError("Config Error: Invalid websocket port number"); + } + } + } + + public static boolean isSecretValid(String clientSecret) { + final int CLIENT_KEY_LENGTH = 52; + return Pattern.matches("^[a-zA-Z0-9]{" + CLIENT_KEY_LENGTH + "}$", clientSecret); + } } diff --git a/src/main/java/mdnet/base/Statistics.java b/src/main/java/mdnet/base/Statistics.java deleted file mode 100644 index 1da1fee..0000000 --- a/src/main/java/mdnet/base/Statistics.java +++ /dev/null @@ -1,58 +0,0 @@ -package mdnet.base; - -import com.google.gson.annotations.SerializedName; - -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -public class Statistics { - @SerializedName("requests_served") - private final AtomicInteger requestsServed; - @SerializedName("cache_hits") - private final AtomicInteger cacheHits; - @SerializedName("cache_misses") - private final AtomicInteger cacheMisses; - @SerializedName("bytes_sent") - private final AtomicLong bytesSent; - @SerializedName("sequence_number") - private final int sequenceNumber; - - public Statistics(int sequenceNumber) { - requestsServed = new AtomicInteger(); - cacheHits = new AtomicInteger(); - cacheMisses = new AtomicInteger(); - bytesSent = new AtomicLong(); - this.sequenceNumber = sequenceNumber; - } - - public AtomicInteger getRequestsServed() { - return requestsServed; - } - - public AtomicInteger getCacheHits() { - return cacheHits; - } - - public AtomicInteger getCacheMisses() { - return cacheMisses; - } - - public AtomicLong getBytesSent() { - return bytesSent; - } - - public int getSequenceNumber() { - return sequenceNumber; - } - - @Override - public String toString() { - return "Statistics{" + - "requestsServed=" + requestsServed + - ", cacheHits=" + cacheHits + - ", cacheMisses=" + cacheMisses + - ", bytesSent=" + bytesSent + - ", sequenceNumber=" + sequenceNumber + - '}'; - } -} diff --git a/src/main/java/mdnet/base/settings/ClientSettings.java b/src/main/java/mdnet/base/settings/ClientSettings.java index 8e854c5..9c628a7 100644 --- a/src/main/java/mdnet/base/settings/ClientSettings.java +++ b/src/main/java/mdnet/base/settings/ClientSettings.java @@ -3,7 +3,6 @@ package mdnet.base.settings; import com.google.gson.annotations.SerializedName; import java.util.Objects; -import java.util.regex.Pattern; public final class ClientSettings { @SerializedName("max_cache_size_mib") @@ -75,9 +74,4 @@ public final class ClientSettings { + maxBandwidthMibPerHour + ", maxBurstRateKibPerSecond=" + maxBurstRateKibPerSecond + ", clientPort=" + clientPort + ", clientSecret='" + "" + '\'' + ", threads=" + getThreads() + '}'; } - - public static boolean isSecretValid(String clientSecret) { - final int CLIENT_KEY_LENGTH = 52; - return Pattern.matches("^[a-zA-Z0-9]{" + CLIENT_KEY_LENGTH + "}$", clientSecret); - } } diff --git a/src/main/java/mdnet/cache/DiskLruCache.java b/src/main/java/mdnet/cache/DiskLruCache.java index e8c0556..8a9da44 100644 --- a/src/main/java/mdnet/cache/DiskLruCache.java +++ b/src/main/java/mdnet/cache/DiskLruCache.java @@ -411,7 +411,7 @@ public final class DiskLruCache implements Closeable { return getImpl(key); } - public synchronized Snapshot getImpl(String key) throws IOException { + private synchronized Snapshot getImpl(String key) throws IOException { checkNotClosed(); Entry entry = lruEntries.get(key); if (entry == null) { diff --git a/src/main/kotlin/mdnet/base/Netty.kt b/src/main/kotlin/mdnet/base/Netty.kt index ccb0490..3b8d9c1 100644 --- a/src/main/kotlin/mdnet/base/Netty.kt +++ b/src/main/kotlin/mdnet/base/Netty.kt @@ -37,26 +37,26 @@ import javax.net.ssl.SSLException private val LOGGER = LoggerFactory.getLogger("Application") -class Netty(private val tls: ServerSettings.TlsCert, private val clientSettings: ClientSettings, private val stats: AtomicReference) : ServerConfig { - private val threadsToAllocate = clientSettings.getThreads() - +class Netty(private val tls: ServerSettings.TlsCert, private val clientSettings: ClientSettings, private val statistics: AtomicReference) : ServerConfig { override fun toServer(httpHandler: HttpHandler): Http4kServer = object : Http4kServer { - private val masterGroup = NioEventLoopGroup(threadsToAllocate) - private val workerGroup = NioEventLoopGroup(threadsToAllocate) + private val masterGroup = NioEventLoopGroup(clientSettings.threads) + private val workerGroup = NioEventLoopGroup(clientSettings.threads) private lateinit var closeFuture: ChannelFuture private lateinit var address: InetSocketAddress private val burstLimiter = object : GlobalTrafficShapingHandler( workerGroup, 1024 * clientSettings.maxBurstRateKibPerSecond, 0, 50) { override fun doAccounting(counter: TrafficCounter) { - stats.get().bytesSent.getAndAdd(counter.cumulativeWrittenBytes()) + statistics.getAndUpdate { + it.copy(bytesSent = it.bytesSent + counter.cumulativeWrittenBytes()) + } counter.resetCumulativeTime() } } override fun start(): Http4kServer = apply { if (LOGGER.isInfoEnabled) { - LOGGER.info("Starting webserver with {} threads", threadsToAllocate) + LOGGER.info("Starting webserver with {} threads", clientSettings.threads) } val (mainCert, chainCert) = getX509Certs(tls.certificate) diff --git a/src/main/kotlin/mdnet/base/Statistics.kt b/src/main/kotlin/mdnet/base/Statistics.kt new file mode 100644 index 0000000..8c725b5 --- /dev/null +++ b/src/main/kotlin/mdnet/base/Statistics.kt @@ -0,0 +1,11 @@ +package mdnet.base + +import com.google.gson.annotations.SerializedName + +data class Statistics( + @field:SerializedName("requests_served") val requestsServed: Int = 0, + @field:SerializedName("cache_hits") val cacheHits: Int = 0, + @field:SerializedName("cache_misses") val cacheMisses: Int = 0, + @field:SerializedName("browser_cached") val browserCached: Int = 0, + @field:SerializedName("bytes_sent") val bytesSent: Long = 0 +) diff --git a/src/main/kotlin/mdnet/base/web/Application.kt b/src/main/kotlin/mdnet/base/web/Application.kt index 3daead4..985132f 100644 --- a/src/main/kotlin/mdnet/base/web/Application.kt +++ b/src/main/kotlin/mdnet/base/web/Application.kt @@ -31,8 +31,6 @@ import java.io.BufferedInputStream import java.io.BufferedOutputStream import java.io.InputStream import java.security.MessageDigest -import java.time.format.DateTimeFormatter -import java.util.* import java.util.concurrent.Executors import java.util.concurrent.atomic.AtomicReference import javax.crypto.Cipher @@ -59,7 +57,6 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting .build()) .setMaxConnTotal(THREADS_TO_ALLOCATE) .setMaxConnPerRoute(THREADS_TO_ALLOCATE) - // Have it at the maximum open sockets a user can have in most modern OSes. No reason to limit this, just limit it at the Netty side. .build()) val app = { dataSaver: Boolean -> @@ -83,8 +80,9 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting md5Bytes("$chapterHash.$fileName") } val cacheId = printHexString(rc4Bytes) - - statistics.get().requestsServed.incrementAndGet() + statistics.getAndUpdate { + it.copy(requestsServed = it.requestsServed + 1) + } // Netty doesn't do Content-Length or Content-Type, so we have the pleasure of doing that ourselves fun respondWithImage(input: InputStream, length: String?, type: String, lastModified: String?): Response = @@ -113,10 +111,12 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting val snapshot = cache.get(cacheId) if (snapshot != null) { - statistics.get().cacheHits.incrementAndGet() - // our files never change, so it's safe to use the browser cache if (request.header("If-Modified-Since") != null) { + statistics.getAndUpdate { + it.copy(browserCached = it.browserCached + 1) + } + if (LOGGER.isInfoEnabled) { LOGGER.info("Request for $sanitizedUri cached by browser") } @@ -127,6 +127,10 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting Response(Status.NOT_MODIFIED) .header("Last-Modified", lastModified) } else { + statistics.getAndUpdate { + it.copy(cacheHits = it.cacheHits + 1) + } + if (LOGGER.isInfoEnabled) { LOGGER.info("Request for $sanitizedUri hit cache") } @@ -137,7 +141,10 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting ) } } else { - statistics.get().cacheMisses.incrementAndGet() + statistics.getAndUpdate { + it.copy(cacheMisses = it.cacheMisses + 1) + } + if (LOGGER.isInfoEnabled) { LOGGER.info("Request for $sanitizedUri missed cache") } @@ -225,9 +232,6 @@ private fun getRc4(key: ByteArray): Cipher { return rc4 } -private val HTTP_TIME_FORMATTER = DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss O", Locale.ENGLISH) - - private fun md5Bytes(stringToHash: String): ByteArray { val digest = MessageDigest.getInstance("MD5") return digest.digest(stringToHash.toByteArray())