From ab365fe494a67000bfb97c97d688c90137868e5c Mon Sep 17 00:00:00 2001 From: Nekotekina Date: Fri, 13 Nov 2020 11:32:47 +0300 Subject: [PATCH] Fixed thread pool a bit Use 128-bit allocator instead of queue. When pool is full (128), threads just terminate as before. --- Utilities/Thread.cpp | 128 ++++++++++++++++++++---------------- Utilities/Thread.h | 15 +++-- rpcs3/Emu/CPU/CPUThread.cpp | 2 + rpcs3/util/asm.hpp | 48 ++++++++++++++ 4 files changed, 134 insertions(+), 59 deletions(-) diff --git a/Utilities/Thread.cpp b/Utilities/Thread.cpp index 59f69664dc..510c53acc8 100644 --- a/Utilities/Thread.cpp +++ b/Utilities/Thread.cpp @@ -75,6 +75,7 @@ #include "sync.h" #include "util/vm.hpp" #include "util/logs.hpp" +#include "util/asm.hpp" #include "Emu/Memory/vm_locking.h" LOG_CHANNEL(sig_log, "SIG"); @@ -1842,66 +1843,40 @@ thread_local DECLARE(thread_ctrl::g_tls_error_callback) = nullptr; DECLARE(thread_ctrl::g_native_core_layout) { native_core_arrangement::undefined }; -static lf_fifo, 240> s_thread_pool; +static atomic_t s_thread_bits{0}; -static shared_mutex s_pool_lock; +static atomic_t s_thread_pool[128]{}; void thread_base::start(native_entry entry) { - while (u32 size = s_thread_pool.size()) + for (u128 bits = s_thread_bits.load(); bits; bits &= bits - 1) { - u32 pos = s_thread_pool.peek(); - thread_base** tls = nullptr; + const u32 pos = utils::ctz128(bits); - for (u32 i = pos; i < pos + size; i++) + if (!s_thread_pool[pos]) { - auto val = s_thread_pool[i].load(); - - if (val && s_thread_pool[i].compare_and_swap_test(val, 0)) - { - tls = val; - pos = i; - break; - } + continue; } - if (tls) + thread_base** tls = s_thread_pool[pos].exchange(nullptr); + + if (!tls) { - // Send "this" and entry point - m_thread = reinterpret_cast(entry); - atomic_storage::store(*tls, this); - s_thread_pool[pos].notify_one(); - - { - // Using it in MPMC manner is a bit tricky, since it's originally MPSC - std::lock_guard lock(s_pool_lock); - - u32 count = 0; - - while (!s_thread_pool[s_thread_pool.peek() + count]) - { - count++; - - if (count >= s_thread_pool.size()) - { - break; - } - } - - if (count) - { - s_thread_pool.pop_end(count); - } - } - - // Wait for actual "m_thread" in return - while (m_thread == reinterpret_cast(entry)) - { - busy_wait(300); - } - - return; + continue; } + + // Send "this" and entry point + m_thread = reinterpret_cast(entry); + atomic_storage::release(*tls, this); + s_thread_pool[pos].notify_all(); + + // Wait for actual "m_thread" in return + while (m_thread == reinterpret_cast(entry)) + { + busy_wait(300); + } + + return; } #ifdef _WIN32 @@ -1990,7 +1965,7 @@ void thread_base::notify_abort() noexcept } } -bool thread_base::finalize(thread_state result_state) noexcept +u64 thread_base::finalize(thread_state result_state) noexcept { // Report pending errors error_code::error_report(0, 0, 0, 0); @@ -2036,6 +2011,9 @@ bool thread_base::finalize(thread_state result_state) noexcept g_tls_fault_spu, fsoft, fhard, ctxvol, ctxinv); + const u64 _self = m_thread; + m_thread.release(0); + // Return true if need to delete thread object const bool ok = m_state.exchange(result_state) <= thread_state::aborting; @@ -2043,7 +2021,7 @@ bool thread_base::finalize(thread_state result_state) noexcept m_state.notify_all(); // No detached thread supported atm - return !ok; + return _self; } void thread_base::finalize(u64 _self) noexcept @@ -2056,23 +2034,54 @@ void thread_base::finalize(u64 _self) noexcept atomic_wait_engine::set_wait_callback(nullptr); g_tls_log_prefix = []() -> std::string { return {}; }; - set_name("..pool"); thread_ctrl::g_tls_this_thread = nullptr; // Try to add self to thread pool - const u32 pos = s_thread_pool.push_begin(); + const auto [bits, ok] = s_thread_bits.fetch_op([](u128& bits) + { + if (~bits) [[likely]] + { + // Set lowest clear bit + bits |= bits + 1; + return true; + } + + return false; + }); + + if (!ok) + { +#ifdef _WIN32 + CloseHandle(reinterpret_cast(_self)); +#else + pthread_detach(reinterpret_cast(_self)); +#endif + return; + } + + set_name("..pool"); + + // Obtain id from atomic op + const u32 pos = utils::ctz128(~bits); const auto tls = &thread_ctrl::g_tls_this_thread; s_thread_pool[pos] = tls; - while (s_thread_pool[pos] == tls || !atomic_storage::load(thread_ctrl::g_tls_this_thread)) + while (s_thread_pool[pos] == tls || !atomic_storage::load(*tls)) { s_thread_pool[pos].wait(tls); } + // Free thread pool slot + s_thread_bits.atomic_op([pos](u128& val) + { + val &= ~(u128(1) << pos); + }); + // Restore thread id - const auto _this = thread_ctrl::g_tls_this_thread; + const auto _this = atomic_storage::load(*tls); const auto entry = _this->m_thread.exchange(_self); _this->m_thread.notify_one(); + reinterpret_cast(entry)(_this); } @@ -2159,6 +2168,15 @@ thread_base::thread_base(std::string_view name) thread_base::~thread_base() { + if (m_thread) + { +#ifdef _WIN32 + CloseHandle(reinterpret_cast(m_thread.raw())); +#else + pthread_detach(reinterpret_cast(m_thread.raw())); +#endif + } + } bool thread_base::join() const diff --git a/Utilities/Thread.h b/Utilities/Thread.h index 6e290cb6ac..5b1d5cc515 100644 --- a/Utilities/Thread.h +++ b/Utilities/Thread.h @@ -128,7 +128,7 @@ private: void notify_abort() noexcept; // Called at the thread end, returns true if needs destruction - bool finalize(thread_state result) noexcept; + u64 finalize(thread_state result) noexcept; // Cleanup after possibly deleting the thread instance static void finalize(u64 _self) noexcept; @@ -286,21 +286,28 @@ class named_thread final : public Context, result_storage_t, thread_bas static inline void* entry_point(void* arg) #endif { + if (auto _this = thread_ctrl::get_current()) + { + arg = _this; + } + const auto _this = static_cast(static_cast(arg)); // Perform self-cleanup if necessary - if (_this->entry_point()) + u64 _self = _this->entry_point(); + + if (!_self) { delete _this; thread::finalize(0); return 0; } - thread::finalize(_this->thread::m_thread); + thread::finalize(_self); return 0; } - bool entry_point() + u64 entry_point() { auto tls_error_cb = []() { diff --git a/rpcs3/Emu/CPU/CPUThread.cpp b/rpcs3/Emu/CPU/CPUThread.cpp index 256be6c015..7a4c86eae1 100644 --- a/rpcs3/Emu/CPU/CPUThread.cpp +++ b/rpcs3/Emu/CPU/CPUThread.cpp @@ -522,6 +522,8 @@ void cpu_thread::operator()() s_tls_thread_slot = -1; + g_tls_current_cpu_thread = nullptr; + _this = nullptr; } diff --git a/rpcs3/util/asm.hpp b/rpcs3/util/asm.hpp index 86f7f363cd..a2291137ea 100644 --- a/rpcs3/util/asm.hpp +++ b/rpcs3/util/asm.hpp @@ -206,6 +206,30 @@ namespace utils return r; } + inline u32 ctz128(u128 arg) + { + if (u64 lo = static_cast(arg)) + { + return std::countr_zero(lo); + } + else + { + return std::countr_zero(arg >> 64) + 64; + } + } + + inline u32 clz128(u128 arg) + { + if (u64 hi = static_cast(arg >> 64)) + { + return std::countl_zero(hi); + } + else + { + return std::countl_zero(arg) + 64; + } + } + #elif defined(_MSC_VER) inline void prefetch_read(const void* ptr) { @@ -287,5 +311,29 @@ namespace utils return r; } + + inline u32 ctz128(u128 arg) + { + if (!arg.lo) + { + return std::countr_zero(arg.hi) + 64u; + } + else + { + return std::countr_zero(arg.lo); + } + } + + inline u32 clz128(u128 arg) + { + if (arg.hi) + { + return std::countl_zero(arg.hi); + } + else + { + return std::countl_zero(arg.lo) + 64; + } + } #endif } // namespace utils