diff --git a/Utilities/Thread.cpp b/Utilities/Thread.cpp index a44dc08c6e..f85f0ef136 100644 --- a/Utilities/Thread.cpp +++ b/Utilities/Thread.cpp @@ -86,6 +86,8 @@ LOG_CHANNEL(vm_log, "VM"); thread_local u64 g_tls_fault_all = 0; thread_local u64 g_tls_fault_rsx = 0; thread_local u64 g_tls_fault_spu = 0; +thread_local u64 g_tls_wait_time = 0; +thread_local u64 g_tls_wait_fail = 0; extern thread_local std::string(*g_tls_log_prefix)(); template <> @@ -1888,21 +1890,32 @@ void thread_base::start(native_entry entry, void(*trampoline)()) #endif } -void thread_base::initialize(void (*error_cb)(), bool(*wait_cb)(const void*)) +void thread_base::initialize(void (*error_cb)()) { // Initialize TLS variables thread_ctrl::g_tls_this_thread = this; thread_ctrl::g_tls_error_callback = error_cb; - // Initialize atomic wait callback - atomic_wait_engine::set_wait_callback(wait_cb); - g_tls_log_prefix = [] { return thread_ctrl::get_name_cached(); }; + atomic_wait_engine::set_wait_callback([](const void*, u64 attempts, u64 stamp0) -> bool + { + if (attempts == umax) + { + g_tls_wait_time += __rdtsc() - stamp0; + } + else if (attempts > 1) + { + g_tls_wait_fail += attempts - 1; + } + + return true; + }); + set_name(thread_ctrl::get_name_cached()); } @@ -1949,23 +1962,6 @@ void thread_base::set_name(std::string name) #endif } -void thread_base::notify_abort() noexcept -{ - u64 tid = m_thread.load(); -#ifdef _WIN32 - tid = GetThreadId(reinterpret_cast(tid)); -#endif - - while (auto ptr = m_state_notifier.load()) - { - // Since this function is not perfectly implemented, run it in a loop - if (atomic_wait_engine::raw_notify(ptr, tid)) - { - break; - } - } -} - u64 thread_base::finalize(thread_state result_state) noexcept { // Report pending errors @@ -2004,22 +2000,30 @@ u64 thread_base::finalize(thread_state result_state) noexcept return thread_ctrl::get_name_cached(); }; - sig_log.notice("Thread time: %fs (%fGc); Faults: %u [rsx:%u, spu:%u]; [soft:%u hard:%u]; Switches:[vol:%u unvol:%u]", + sig_log.notice("Thread time: %fs (%fGc); Faults: %u [rsx:%u, spu:%u]; [soft:%u hard:%u]; Switches:[vol:%u unvol:%u]; Wait:[%.3fs, spur:%u]", time / 1000000000., cycles / 1000000000., g_tls_fault_all, g_tls_fault_rsx, g_tls_fault_spu, - fsoft, fhard, ctxvol, ctxinv); + fsoft, fhard, ctxvol, ctxinv, + g_tls_wait_time / (utils::get_tsc_freq() / 1.), + g_tls_wait_fail); + + atomic_wait_engine::set_wait_callback(nullptr); const u64 _self = m_thread; m_thread.release(0); - // Return true if need to delete thread object - const bool ok = m_state.exchange(result_state) <= thread_state::aborting; + // Return true if need to delete thread object (no) + const bool ok = 0 == (3 & ~m_sync.fetch_op([&](u64& v) + { + v &= -4; + v |= static_cast(result_state); + })); // Signal waiting threads - m_state.notify_all(); + m_sync.notify_all(2); // No detached thread supported atm return _self; @@ -2154,12 +2158,13 @@ void thread_ctrl::_wait_for(u64 usec, bool alert /* true */) } #endif - if (_this->m_signal && _this->m_signal.exchange(0)) + if (_this->m_sync.btr(2)) { return; } - _this->m_signal.wait(0, atomic_wait_timeout{usec <= 0xffff'ffff'ffff'ffff / 1000 ? usec * 1000 : 0xffff'ffff'ffff'ffff}); + // Wait for signal and thread state abort + _this->m_sync.wait(0, 4 + 1, atomic_wait_timeout{usec <= 0xffff'ffff'ffff'ffff / 1000 ? usec * 1000 : 0xffff'ffff'ffff'ffff}); } std::string thread_ctrl::get_name_cached() @@ -2200,23 +2205,52 @@ thread_base::~thread_base() bool thread_base::join() const { - for (auto state = m_state.load(); state != thread_state::finished && state != thread_state::errored;) + // Hacked for too sleepy threads (1ms) TODO: make sure it's unneeded and remove + const auto timeout = Emu.IsStopped() ? atomic_wait_timeout{1'000'000} : atomic_wait_timeout::inf; + + bool warn = false; + auto stamp0 = __rdtsc(); + + for (u64 i = 0; (m_sync & 3) <= 1; i++) { - m_state.wait(state); - state = m_state; + m_sync.wait(0, 2, timeout); + + if (m_sync & 2) + { + break; + } + + if (i > 20 && Emu.IsStopped()) + { + stamp0 = __rdtsc(); + atomic_wait_engine::raw_notify(0, get_native_id()); + stamp0 = __rdtsc() - stamp0; + warn = true; + } } - return m_state.load() == thread_state::finished; + if (warn) + { + sig_log.error("Thread [%s] is too sleepy. Took %.3fµs to wake it up!", *m_tname.load(), stamp0 / (utils::get_tsc_freq() / 1000000.)); + } + + return (m_sync & 3) == 3; } void thread_base::notify() { - // Increment with saturation - if (m_signal.try_inc()) - { - // Considered impossible to have a situation when not notified - m_signal.notify_all(); - } + // Set notification + m_sync |= 4; + m_sync.notify_one(4); +} + +u64 thread_base::get_native_id() const +{ +#ifdef _WIN32 + return GetThreadId(reinterpret_cast(m_thread.load())); +#else + return m_thread.load(); +#endif } u64 thread_base::get_cycles() @@ -2242,7 +2276,7 @@ u64 thread_base::get_cycles() { cycles = static_cast(thread_time.tv_sec) * 1'000'000'000 + thread_time.tv_nsec; #endif - if (const u64 old_cycles = m_cycles.exchange(cycles)) + if (const u64 old_cycles = m_sync.fetch_op([&](u64& v){ v &= 7; v |= (cycles << 3); }) >> 3) { return cycles - old_cycles; } @@ -2252,7 +2286,7 @@ u64 thread_base::get_cycles() } else { - return m_cycles; + return m_sync >> 3; } } diff --git a/Utilities/Thread.h b/Utilities/Thread.h index 4356f1fdcf..c4f84f8c6a 100644 --- a/Utilities/Thread.h +++ b/Utilities/Thread.h @@ -33,10 +33,11 @@ enum class thread_class : u32 enum class thread_state : u32 { - created, // Initial state - aborting, // The thread has been joined in the destructor or explicitly aborted - errored, // Set after the emergency_exit call - finished // Final state, always set at the end of thread execution + created = 0, // Initial state + aborting = 1, // The thread has been joined in the destructor or explicitly aborted + errored = 2, // Set after the emergency_exit call + finished = 3, // Final state, always set at the end of thread execution + mask = 3 }; class need_wakeup {}; @@ -101,31 +102,19 @@ public: private: // Thread handle (platform-specific) - atomic_t m_thread{0}; + atomic_t m_thread{0}; - // Thread playtoy, that shouldn't be used - atomic_t m_signal{0}; - - // Thread state - atomic_t m_state = thread_state::created; - - // Thread state notification info - atomic_t m_state_notifier{nullptr}; + // Thread state and cycles + atomic_t m_sync{0}; // Thread name stx::atomic_cptr m_tname; - // - atomic_t m_cycles = 0; - // Start thread void start(native_entry, void(*)()); // Called at the thread start - void initialize(void (*error_cb)(), bool(*wait_cb)(const void*)); - - // May be called in destructor - void notify_abort() noexcept; + void initialize(void (*error_cb)()); // Called at the thread end, returns true if needs destruction u64 finalize(thread_state result) noexcept; @@ -158,6 +147,9 @@ public: // Notify the thread void notify(); + + // Get thread id + u64 get_native_id() const; }; // Collection of global function for current thread @@ -220,15 +212,15 @@ public: } template - static void raw_notify(named_thread& thread) + static u64 get_native_id(named_thread& thread) { - static_cast(thread).notify_abort(); + return static_cast(thread).get_native_id(); } // Read current state static inline thread_state state() { - return g_tls_this_thread->m_state; + return static_cast(g_tls_this_thread->m_sync & 3); } // Wait once with timeout. May spuriously return false. @@ -312,40 +304,13 @@ class named_thread final : public Context, result_storage_t, thread_bas u64 entry_point() { - auto tls_error_cb = []() + thread::initialize([]() { if constexpr (!result::empty) { // Construct using default constructor in the case of failure new (static_cast(static_cast(thread_ctrl::get_current()))->get()) typename result::type(); } - }; - - thread::initialize(tls_error_cb, [](const void* data) - { - const auto _this = thread_ctrl::get_current(); - - if (_this->m_state >= thread_state::aborting) - { - _this->m_state_notifier.store(data); - return false; - } - - if (!data) - { - _this->m_state_notifier.release(data); - return true; - } - - _this->m_state_notifier.store(data); - - if (_this->m_state >= thread_state::aborting) - { - _this->m_state_notifier.release(nullptr); - return false; - } - - return true; }); if constexpr (result::empty) @@ -422,7 +387,7 @@ public: // Access thread state operator thread_state() const { - return thread::m_state.load(); + return static_cast(thread::m_sync.load() & 3); } // Try to abort by assigning thread_state::aborting (UB if assigning different state) @@ -430,11 +395,11 @@ public: { ASSUME(s == thread_state::aborting); - if (s == thread_state::aborting && thread::m_state.compare_and_swap_test(thread_state::created, s)) + if (s == thread_state::aborting && thread::m_sync.fetch_op([](u64& v){ return !(v & 3) && (v |= 1); }).second) { if (s == thread_state::aborting) { - thread::notify_abort(); + thread::m_sync.notify_one(1); } if constexpr (std::is_base_of_v) diff --git a/rpcs3/Emu/CPU/CPUThread.cpp b/rpcs3/Emu/CPU/CPUThread.cpp index ca6a1e8341..5559b7231f 100644 --- a/rpcs3/Emu/CPU/CPUThread.cpp +++ b/rpcs3/Emu/CPU/CPUThread.cpp @@ -1015,8 +1015,6 @@ void cpu_thread::stop_all() noexcept } else { - std::lock_guard lock(g_fxo->get()->cpu_suspend_lock); - auto on_stop = [](u32, cpu_thread& cpu) { cpu.state += cpu_flag::dbg_global_stop; diff --git a/rpcs3/Emu/Cell/SPUThread.cpp b/rpcs3/Emu/Cell/SPUThread.cpp index 153af38325..8bdc2e9f91 100644 --- a/rpcs3/Emu/Cell/SPUThread.cpp +++ b/rpcs3/Emu/Cell/SPUThread.cpp @@ -4385,8 +4385,12 @@ bool spu_thread::stop_and_signal(u32 code) return true; }); - if (thread.get() != this) - thread_ctrl::raw_notify(*thread); + while (thread.get() != this && thread->state & cpu_flag::wait) + { + // TODO: replace with proper solution + if (atomic_wait_engine::raw_notify(nullptr, thread_ctrl::get_native_id(*thread))) + break; + } } } diff --git a/rpcs3/Emu/Cell/lv2/sys_spu.cpp b/rpcs3/Emu/Cell/lv2/sys_spu.cpp index 5a1d525a40..dea049def1 100644 --- a/rpcs3/Emu/Cell/lv2/sys_spu.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_spu.cpp @@ -1021,9 +1021,11 @@ error_code sys_spu_thread_group_terminate(ppu_thread& ppu, u32 id, s32 value) for (auto& thread : group->threads) { - if (thread && group->running) + while (thread && group->running && thread->state & cpu_flag::wait) { - thread_ctrl::raw_notify(*thread); + // TODO: replace with proper solution + if (atomic_wait_engine::raw_notify(nullptr, thread_ctrl::get_native_id(*thread))) + break; } } diff --git a/rpcs3/util/atomic.cpp b/rpcs3/util/atomic.cpp index 83e92acad6..79d47b1fd8 100644 --- a/rpcs3/util/atomic.cpp +++ b/rpcs3/util/atomic.cpp @@ -29,13 +29,13 @@ static constexpr std::size_t s_hashtable_size = 1u << 17; static constexpr std::uintptr_t s_ref_mask = (1u << 17) - 1; // Fix for silly on-first-use initializer -static constexpr auto s_null_wait_cb = [](const void*){ return true; }; +static bool s_null_wait_cb(const void*, u64, u64){ return true; }; // Callback for wait() function, returns false if wait should return -static thread_local bool(*s_tls_wait_cb)(const void* data) = s_null_wait_cb; +static thread_local bool(*s_tls_wait_cb)(const void* data, u64 attempts, u64 stamp0) = s_null_wait_cb; // Fix for silly on-first-use initializer -static constexpr auto s_null_notify_cb = [](const void*, u64){}; +static void s_null_notify_cb(const void*, u64){}; // Callback for notification functions for optimizations static thread_local void(*s_tls_notify_cb)(const void* data, u64 progress) = s_null_notify_cb; @@ -46,21 +46,12 @@ static inline bool operator &(atomic_wait::op lhs, atomic_wait::op_flag rhs) } // Compare data in memory with old value, and return true if they are equal -template static NEVER_INLINE bool #ifdef _WIN32 __vectorcall #endif ptr_cmp(const void* data, u32 _size, __m128i old128, __m128i mask128, atomic_wait::info* ext = nullptr) { - if constexpr (CheckCb) - { - if (!s_tls_wait_cb(data)) - { - return false; - } - } - using atomic_wait::op; using atomic_wait::op_flag; @@ -210,7 +201,7 @@ ptr_cmp(const void* data, u32 _size, __m128i old128, __m128i mask128, atomic_wai { for (auto e = ext; e->data; e++) { - if (!ptr_cmp(e->data, e->size, e->old, e->mask)) + if (!ptr_cmp(e->data, e->size, e->old, e->mask)) { return false; } @@ -1021,6 +1012,11 @@ atomic_wait_engine::wait(const void* data, u32 size, __m128i old_value, u64 time { const auto stamp0 = atomic_wait::get_unique_tsc(); + if (!s_tls_wait_cb(data, 0, stamp0)) + { + return; + } + const std::uintptr_t iptr = reinterpret_cast(data) & (~s_ref_mask >> 17); const auto root = &s_hashtable[iptr % s_hashtable_size]; @@ -1126,6 +1122,8 @@ atomic_wait_engine::wait(const void* data, u32 size, __m128i old_value, u64 time bool fallback = false; #endif + u64 attempts = 0; + while (ptr_cmp(data, size, old_value, mask, ext)) { #ifdef USE_FUTEX @@ -1213,6 +1211,11 @@ atomic_wait_engine::wait(const void* data, u32 size, __m128i old_value, u64 time // TODO: reduce timeout instead break; } + + if (!s_tls_wait_cb(data, ++attempts, stamp0)) + { + break; + } } while (!fallback) @@ -1261,7 +1264,7 @@ atomic_wait_engine::wait(const void* data, u32 size, __m128i old_value, u64 time verify(HERE), root == root->slot_free(iptr, slot); - s_tls_wait_cb(nullptr); + s_tls_wait_cb(data, -1, stamp0); } template @@ -1326,7 +1329,7 @@ alert_sema(u32 cond_id, const void* data, u64 tid, u32 size, __m128i mask, __m12 return ok; } -void atomic_wait_engine::set_wait_callback(bool(*cb)(const void* data)) +void atomic_wait_engine::set_wait_callback(bool(*cb)(const void*, u64, u64)) { if (cb) { @@ -1334,7 +1337,7 @@ void atomic_wait_engine::set_wait_callback(bool(*cb)(const void* data)) } else { - s_tls_wait_cb = [](const void*){ return true; }; + s_tls_wait_cb = s_null_wait_cb; } } @@ -1346,7 +1349,7 @@ void atomic_wait_engine::set_notify_callback(void(*cb)(const void*, u64)) } else { - s_tls_notify_cb = [](const void*, u64){}; + s_tls_notify_cb = s_null_notify_cb; } } diff --git a/rpcs3/util/atomic.hpp b/rpcs3/util/atomic.hpp index c1746214be..24c277d830 100644 --- a/rpcs3/util/atomic.hpp +++ b/rpcs3/util/atomic.hpp @@ -239,7 +239,7 @@ private: notify_all(const void* data, u32 size, __m128i mask128, __m128i val128); public: - static void set_wait_callback(bool(*cb)(const void* data)); + static void set_wait_callback(bool(*cb)(const void* data, u64 attempts, u64 stamp0)); static void set_notify_callback(void(*cb)(const void* data, u64 progress)); static bool raw_notify(const void* data, u64 thread_id = 0); };