From e9e139c3647b9127405e3ed4b691373bbf6e0655 Mon Sep 17 00:00:00 2001 From: Eladash Date: Fri, 11 Nov 2022 18:35:52 +0200 Subject: [PATCH] logs improvements * Do not use unsafe pointer arithmetics exceeding existing memory. (exceeding m_fptr) * Properly use the enire 32MB of the buffer, previously only 16MB were used for cuncurrent access. * Fix a bug occurring after attempting to push 1TB (40 bits). * Flush the log before hitting a debugging breakpoint. --- Utilities/Thread.cpp | 6 +++ rpcs3/util/logs.cpp | 112 +++++++++++++++++++++++++++++++++---------- rpcs3/util/logs.hpp | 6 +++ 3 files changed, 98 insertions(+), 26 deletions(-) diff --git a/Utilities/Thread.cpp b/Utilities/Thread.cpp index 7498354d6b..5f5330ec81 100644 --- a/Utilities/Thread.cpp +++ b/Utilities/Thread.cpp @@ -1927,7 +1927,10 @@ const bool s_terminate_handler_set = []() -> bool std::set_terminate([]() { if (IsDebuggerPresent()) + { + logs::listener::sync_all(); utils::trap(); + } report_fatal_error("RPCS3 has abnormally terminated."); }); @@ -2650,7 +2653,10 @@ void thread_base::exec() sig_log.fatal("Thread terminated due to fatal error: %s", reason); if (IsDebuggerPresent()) + { + logs::listener::sync_all(); utils::trap(); + } if (const auto _this = g_tls_this_thread) { diff --git a/rpcs3/util/logs.cpp b/rpcs3/util/logs.cpp index f9f11bd6f7..011e4207a9 100644 --- a/rpcs3/util/logs.cpp +++ b/rpcs3/util/logs.cpp @@ -75,6 +75,7 @@ namespace logs // Memory-mapped buffer size constexpr u64 s_log_size = 32 * 1024 * 1024; + static_assert(s_log_size * s_log_size > s_log_size && (s_log_size & (s_log_size - 1)) == 0); // Assert on an overflowing value class file_writer { @@ -87,8 +88,8 @@ namespace logs z_stream m_zs{}; shared_mutex m_m{}; - alignas(128) atomic_t m_buf{0}; // MSB (40 bit): push begin, LSB (24 bis): push size - alignas(128) atomic_t m_out{0}; // Amount of bytes written to file + atomic_t m_buf{0}; // MSB (39 bits): push begin, LSB (25 bis): push size + atomic_t m_out{0}; // Amount of bytes written to file uchar m_zout[65536]{}; @@ -102,6 +103,9 @@ namespace logs // Append raw data void log(const char* text, usz size); + + // Ensure written to disk + void sync(); }; struct file_listener final : file_writer, public listener @@ -111,6 +115,11 @@ namespace logs ~file_listener() override = default; void log(u64 stamp, const message& msg, const std::string& prefix, const std::string& text) override; + + void sync() override + { + file_writer::sync(); + } }; struct root_listener final : public listener @@ -321,6 +330,18 @@ void logs::listener::broadcast(const logs::stored_message& msg) const } } +void logs::listener::sync() +{ +} + +void logs::listener::sync_all() +{ + for (listener* lis = get_logger(); lis; lis = lis->m_next) + { + lis->sync(); + } +} + logs::registerer::registerer(channel& _ch) { std::lock_guard lock(g_mutex); @@ -441,7 +462,7 @@ logs::file_writer::file_writer(const std::string& name, u64 max_size) { const u64 bufv = m_buf; - if (bufv & 0xffffff) + if (bufv % s_log_size) { // Wait if threads are writing logs std::this_thread::yield(); @@ -469,10 +490,7 @@ logs::file_writer::~file_writer() } // Stop writer thread - while (m_out << 24 < m_buf) - { - std::this_thread::yield(); - } + file_writer::sync(); m_out = -1; m_writer.join(); @@ -512,25 +530,27 @@ bool logs::file_writer::flush(u64 bufv) { std::lock_guard lock(m_m); - const u64 st = +m_out; - const u64 end = std::min((st + s_log_size) & ~(s_log_size - 1), bufv >> 24); + const u64 read_pos = m_out; + const u64 out_index = read_pos % s_log_size; + const u64 pushed = (bufv / s_log_size) % s_log_size; + const u64 end = std::min(out_index < pushed ? read_pos - out_index + pushed : (read_pos + s_log_size) & ~(s_log_size - 1), m_max_size); - if (end > st) + if (end > read_pos) { // Avoid writing too big fragments - const u64 size = std::min(end - st, sizeof(m_zout) / 2); + const u64 size = std::min(end - read_pos, sizeof(m_zout) / 2); // Write uncompressed - if (m_fout && st < m_max_size && m_fout.write(m_fptr.get() + st % s_log_size, size) != size) + if (m_fout && m_fout.write(m_fptr.get() + out_index, size) != size) { m_fout.close(); } // Write compressed - if (m_fout2 && st < m_max_size) + if (m_fout2) { m_zs.avail_in = static_cast(size); - m_zs.next_in = m_fptr.get() + st % s_log_size; + m_zs.next_in = m_fptr.get() + out_index; do { @@ -562,18 +582,16 @@ void logs::file_writer::log(const char* text, usz size) } // TODO: write bigger fragment directly in blocking manner - while (size && size <= 0xffffff) + while (size && size < s_log_size) { - u64 bufv = 0; - - const auto pos = m_buf.atomic_op([&](u64& v) -> uchar* + const auto [bufv, pos] = m_buf.fetch_op([&](u64& v) -> uchar* { - const u64 v1 = v >> 24; - const u64 v2 = v & 0xffffff; + const u64 out = m_out % s_log_size; + const u64 v1 = (v / s_log_size) % s_log_size; + const u64 v2 = v % s_log_size; - if (v2 + size > 0xffffff || v1 + v2 + size >= m_out + s_log_size) [[unlikely]] + if (v1 + v2 + size > (out < v1 ? out + s_log_size : out)) [[unlikely]] { - bufv = v; return nullptr; } @@ -583,22 +601,34 @@ void logs::file_writer::log(const char* text, usz size) if (!pos) [[unlikely]] { - if ((bufv & 0xffffff) + size > 0xffffff || bufv & 0xffffff) + if (m_out >= m_max_size || (!m_fout && !m_fout2)) + { + // Logging is inactive + return; + } + + if ((bufv % s_log_size) + size >= s_log_size || bufv % s_log_size) { // Concurrency limit reached std::this_thread::yield(); } + else if (!m_m.is_free()) + { + // Wait for another flush call to complete + m_m.lock_unlock(); + } else { // Queue is full, need to write out flush(bufv); } + continue; } - if (pos + size > m_fptr.get() + s_log_size) + if (pos - m_fptr.get() + size > s_log_size) { - const auto frag = m_fptr.get() + s_log_size - pos; + const auto frag = s_log_size - (pos - m_fptr.get()); std::memcpy(pos, text, frag); std::memcpy(m_fptr.get(), text + frag, size - frag); } @@ -607,11 +637,41 @@ void logs::file_writer::log(const char* text, usz size) std::memcpy(pos, text, size); } - m_buf += (u64{size} << 24) - size; + m_buf += (size * s_log_size) - size; break; } } +void logs::file_writer::sync() +{ + if (!m_fptr || (!m_fout && !m_fout2)) + { + return; + } + + // Wait for the writer thread + while ((m_out % s_log_size) * s_log_size < m_buf) + { + if (m_out >= m_max_size) + { + break; + } + + std::this_thread::yield(); + } + + // Ensure written to disk + if (m_fout) + { + m_fout.sync(); + } + + if (m_fout2) + { + m_fout2.sync(); + } +} + logs::file_listener::file_listener(const std::string& path, u64 max_size) : file_writer(path, max_size) , listener() diff --git a/rpcs3/util/logs.hpp b/rpcs3/util/logs.hpp index 5f9a2118aa..e174233ade 100644 --- a/rpcs3/util/logs.hpp +++ b/rpcs3/util/logs.hpp @@ -81,11 +81,17 @@ namespace logs // Process log message virtual void log(u64 stamp, const message& msg, const std::string& prefix, const std::string& text) = 0; + // Flush contents (file writer) + virtual void sync(); + // Add new listener static void add(listener*); // Special purpose void broadcast(const stored_message&) const; + + // Flush log to disk + static void sync_all(); }; struct alignas(16) channel : private message