From 67785a918cc96708b64d28ed72da2a996dd647ca Mon Sep 17 00:00:00 2001 From: Nekotekina Date: Thu, 12 Nov 2020 23:24:35 +0300 Subject: [PATCH] Implement simple thread pool --- Utilities/Thread.cpp | 100 ++++++++++++++++++++++++++++++++---- Utilities/Thread.h | 11 +++- rpcs3/Emu/CPU/CPUThread.cpp | 23 +++++---- 3 files changed, 112 insertions(+), 22 deletions(-) diff --git a/Utilities/Thread.cpp b/Utilities/Thread.cpp index c93860db66..59f69664dc 100644 --- a/Utilities/Thread.cpp +++ b/Utilities/Thread.cpp @@ -1842,8 +1842,68 @@ thread_local DECLARE(thread_ctrl::g_tls_error_callback) = nullptr; DECLARE(thread_ctrl::g_native_core_layout) { native_core_arrangement::undefined }; +static lf_fifo, 240> s_thread_pool; + +static shared_mutex s_pool_lock; + void thread_base::start(native_entry entry) { + while (u32 size = s_thread_pool.size()) + { + u32 pos = s_thread_pool.peek(); + thread_base** tls = nullptr; + + for (u32 i = pos; i < pos + size; i++) + { + auto val = s_thread_pool[i].load(); + + if (val && s_thread_pool[i].compare_and_swap_test(val, 0)) + { + tls = val; + pos = i; + break; + } + } + + if (tls) + { + // Send "this" and entry point + m_thread = reinterpret_cast(entry); + atomic_storage::store(*tls, this); + s_thread_pool[pos].notify_one(); + + { + // Using it in MPMC manner is a bit tricky, since it's originally MPSC + std::lock_guard lock(s_pool_lock); + + u32 count = 0; + + while (!s_thread_pool[s_thread_pool.peek() + count]) + { + count++; + + if (count >= s_thread_pool.size()) + { + break; + } + } + + if (count) + { + s_thread_pool.pop_end(count); + } + } + + // Wait for actual "m_thread" in return + while (m_thread == reinterpret_cast(entry)) + { + busy_wait(300); + } + + return; + } + } + #ifdef _WIN32 m_thread = ::_beginthreadex(nullptr, 0, entry, this, CREATE_SUSPENDED, nullptr); verify("thread_ctrl::start" HERE), m_thread, ::ResumeThread(reinterpret_cast(+m_thread)) != -1; @@ -1867,8 +1927,11 @@ void thread_base::initialize(void (*error_cb)(), bool(*wait_cb)(const void*)) return thread_ctrl::get_name_cached(); }; - std::string name = thread_ctrl::get_name_cached(); + set_name(thread_ctrl::get_name_cached()); +} +void thread_base::set_name(std::string name) +{ #ifdef _MSC_VER struct THREADNAME_INFO { @@ -1983,11 +2046,34 @@ bool thread_base::finalize(thread_state result_state) noexcept return !ok; } -void thread_base::finalize() noexcept +void thread_base::finalize(u64 _self) noexcept { + if (!_self) + { + // Don't even need to clean these values for detached threads + return; + } + atomic_wait_engine::set_wait_callback(nullptr); g_tls_log_prefix = []() -> std::string { return {}; }; + set_name("..pool"); thread_ctrl::g_tls_this_thread = nullptr; + + // Try to add self to thread pool + const u32 pos = s_thread_pool.push_begin(); + const auto tls = &thread_ctrl::g_tls_this_thread; + s_thread_pool[pos] = tls; + + while (s_thread_pool[pos] == tls || !atomic_storage::load(thread_ctrl::g_tls_this_thread)) + { + s_thread_pool[pos].wait(tls); + } + + // Restore thread id + const auto _this = thread_ctrl::g_tls_this_thread; + const auto entry = _this->m_thread.exchange(_self); + _this->m_thread.notify_one(); + reinterpret_cast(entry)(_this); } void thread_ctrl::_wait_for(u64 usec, bool alert /* true */) @@ -2073,14 +2159,6 @@ thread_base::thread_base(std::string_view name) thread_base::~thread_base() { - if (m_thread) - { -#ifdef _WIN32 - CloseHandle(reinterpret_cast(m_thread.raw())); -#else - pthread_detach(reinterpret_cast(m_thread.raw())); -#endif - } } bool thread_base::join() const @@ -2169,7 +2247,7 @@ void thread_ctrl::emergency_exit(std::string_view reason) delete _this; } - thread_base::finalize(); + thread_base::finalize(0); #ifdef _WIN32 _endthreadex(0); diff --git a/Utilities/Thread.h b/Utilities/Thread.h index 97063fc7ca..6e290cb6ac 100644 --- a/Utilities/Thread.h +++ b/Utilities/Thread.h @@ -91,6 +91,7 @@ struct thread_thread_name::thread_name)> // Thread base class class thread_base { +public: // Native thread entry point function type #ifdef _WIN32 using native_entry = uint(__stdcall*)(void* arg); @@ -98,6 +99,7 @@ class thread_base using native_entry = void*(*)(void* arg); #endif +private: // Thread handle (platform-specific) atomic_t m_thread{0}; @@ -129,7 +131,10 @@ class thread_base bool finalize(thread_state result) noexcept; // Cleanup after possibly deleting the thread instance - static void finalize() noexcept; + static void finalize(u64 _self) noexcept; + + // Set name for debugger + static void set_name(std::string); friend class thread_ctrl; @@ -287,9 +292,11 @@ class named_thread final : public Context, result_storage_t, thread_bas if (_this->entry_point()) { delete _this; + thread::finalize(0); + return 0; } - thread::finalize(); + thread::finalize(_this->thread::m_thread); return 0; } diff --git a/rpcs3/Emu/CPU/CPUThread.cpp b/rpcs3/Emu/CPU/CPUThread.cpp index c96e9807ff..256be6c015 100644 --- a/rpcs3/Emu/CPU/CPUThread.cpp +++ b/rpcs3/Emu/CPU/CPUThread.cpp @@ -324,6 +324,12 @@ struct cpu_counter // Unregister and wait if necessary _this->state += cpu_flag::wait; + if (slot >= std::size(cpu_array)) + { + sys_log.fatal("Index out of bounds (%u)." HERE, slot); + return; + } + std::lock_guard lock(cpu_suspend_lock); if (!cpu_array[slot].compare_and_swap_test(_this, nullptr)) @@ -344,7 +350,7 @@ struct cpu_counter if (index >= std::size(cpu_array)) { - sys_log.fatal("Index out of bounds (%u).", index); + sys_log.fatal("Index out of bounds (%u)." HERE, index); return; } @@ -493,15 +499,9 @@ void cpu_thread::operator()() static thread_local struct thread_cleanup_t { - cpu_thread* _this; + cpu_thread* _this = nullptr; std::string name; - thread_cleanup_t(cpu_thread* _this) - : _this(_this) - , name(thread_ctrl::get_name()) - { - } - void cleanup() { if (_this == nullptr) @@ -520,6 +520,8 @@ void cpu_thread::operator()() g_fxo->get()->remove(_this, s_tls_thread_slot); + s_tls_thread_slot = -1; + _this = nullptr; } @@ -531,7 +533,10 @@ void cpu_thread::operator()() cleanup(); } } - } cleanup{this}; + } cleanup; + + cleanup._this = this; + cleanup.name = thread_ctrl::get_name(); // Check thread status while (!(state & (cpu_flag::exit + cpu_flag::dbg_global_stop)) && thread_ctrl::state() != thread_state::aborting)