From a5a2d43d7cae6af3721dc6100bcb8990d8a23a21 Mon Sep 17 00:00:00 2001 From: Nekotekina Date: Wed, 7 Sep 2016 01:38:52 +0300 Subject: [PATCH] Thread.cpp refinement Hide thread mutex Safe notify() method Other refactoring --- Utilities/Thread.cpp | 437 ++++++++++++--------- Utilities/Thread.h | 297 +++++--------- Utilities/cond.h | 6 +- Utilities/mutex.h | 20 +- Utilities/sema.h | 33 ++ rpcs3/Emu/CPU/CPUThread.cpp | 22 +- rpcs3/Emu/Cell/Modules/cellFs.cpp | 4 +- rpcs3/Emu/Cell/Modules/cellSpursSpu.cpp | 4 - rpcs3/Emu/Cell/Modules/cellVdec.cpp | 17 +- rpcs3/Emu/Cell/Modules/sys_ppu_thread_.cpp | 2 +- rpcs3/Emu/Cell/PPUThread.cpp | 15 +- rpcs3/Emu/Cell/SPUThread.cpp | 42 +- rpcs3/Emu/Cell/SPUThread.h | 8 +- rpcs3/Emu/Cell/lv2/lv2.cpp | 2 +- rpcs3/Emu/Cell/lv2/sys_cond.cpp | 4 +- rpcs3/Emu/Cell/lv2/sys_event.cpp | 4 +- rpcs3/Emu/Cell/lv2/sys_event_flag.cpp | 4 +- rpcs3/Emu/Cell/lv2/sys_interrupt.cpp | 8 +- rpcs3/Emu/Cell/lv2/sys_lwcond.cpp | 4 +- rpcs3/Emu/Cell/lv2/sys_lwmutex.cpp | 4 +- rpcs3/Emu/Cell/lv2/sys_mutex.cpp | 4 +- rpcs3/Emu/Cell/lv2/sys_ppu_thread.cpp | 2 +- rpcs3/Emu/Cell/lv2/sys_rwlock.cpp | 8 +- rpcs3/Emu/Cell/lv2/sys_semaphore.cpp | 4 +- rpcs3/Emu/Cell/lv2/sys_spu.cpp | 6 +- rpcs3/Emu/Cell/lv2/sys_spu.h | 2 +- rpcs3/Emu/Cell/lv2/sys_sync.h | 33 +- rpcs3/Emu/Cell/lv2/sys_timer.cpp | 11 +- rpcs3/Emu/Memory/wait_engine.cpp | 37 +- rpcs3/Emu/Memory/wait_engine.h | 3 +- rpcs3/Emu/PSP2/Modules/sceLibKernel.cpp | 46 ++- rpcs3/Emu/RSX/RSXThread.cpp | 4 +- rpcs3/Emu/RSX/rsx_methods.cpp | 4 +- rpcs3/Emu/System.cpp | 18 +- rpcs3/Gui/InterpreterDisAsm.cpp | 4 +- 35 files changed, 532 insertions(+), 591 deletions(-) diff --git a/Utilities/Thread.cpp b/Utilities/Thread.cpp index 34249ade2d..67c016f322 100644 --- a/Utilities/Thread.cpp +++ b/Utilities/Thread.cpp @@ -8,6 +8,7 @@ #ifdef _WIN32 #include #include +#include #else #ifdef __APPLE__ #define _XOPEN_SOURCE @@ -16,8 +17,16 @@ #include #include #include +#include +#include +#include #endif +#include "sync.h" + +thread_local u64 g_tls_fault_rsx = 0; +thread_local u64 g_tls_fault_spu = 0; + static void report_fatal_error(const std::string& msg) { std::string _msg = msg + "\n" @@ -1009,6 +1018,7 @@ bool handle_access_violation(u32 addr, bool is_writing, x64_context* context) { if (rsx::g_access_violation_handler && rsx::g_access_violation_handler(addr, is_writing)) { + g_tls_fault_rsx++; return true; } @@ -1139,6 +1149,7 @@ bool handle_access_violation(u32 addr, bool is_writing, x64_context* context) // skip processed instruction RIP(context) += i_size; + g_tls_fault_spu++; return true; } @@ -1878,103 +1889,65 @@ const bool s_self_test = []() -> bool return true; }(); -#include -#include -#include -#include -#include - -thread_local DECLARE(thread_ctrl::g_tls_this_thread) = nullptr; - -struct thread_ctrl::internal -{ - std::mutex mutex; - std::condition_variable cond; - std::condition_variable jcv; // Allows simultaneous joining - std::condition_variable icv; - - task_stack atexit; - - std::exception_ptr exception; // Stored exception - - std::chrono::high_resolution_clock::time_point time_limit; - -#ifdef _WIN32 - DWORD thread_id = 0; -#endif - - x64_context _context{}; - x64_context* const thread_ctx = &this->_context; - - atomic_t interrupt{}; // Interrupt function -}; - -thread_local thread_ctrl::internal* g_tls_internal = nullptr; - -extern std::condition_variable& get_current_thread_cv() -{ - return g_tls_internal->cond; -} - // TODO extern atomic_t g_thread_count(0); +thread_local DECLARE(thread_ctrl::g_tls_this_thread) = nullptr; + extern thread_local std::string(*g_tls_log_prefix)(); void thread_ctrl::start(const std::shared_ptr& ctrl, task_stack task) { - reinterpret_cast(ctrl->m_thread) = std::thread([ctrl, task = std::move(task)] +#ifdef _WIN32 + using thread_result = uint; + using thread_type = thread_result(__stdcall*)(void* arg); +#else + using thread_result = void*; + using thread_type = thread_result(*)(void* arg); +#endif + + // Thread entry point + const thread_type entry = [](void* arg) -> thread_result { + // Recover shared_ptr from short-circuited thread_ctrl object pointer + const std::shared_ptr ctrl = static_cast(arg)->m_self; + try { ctrl->initialize(); - task.exec(); + task_stack{std::move(ctrl->m_task)}.invoke(); } catch (...) { - ctrl->m_data->exception = std::current_exception(); + // Capture exception + ctrl->finalize(std::current_exception()); + return 0; } - ctrl->finalize(); - }); -} + ctrl->finalize(nullptr); + return 0; + }; -void thread_ctrl::wait_start(u64 timeout) -{ - m_data->time_limit = std::chrono::high_resolution_clock::now() + std::chrono::microseconds(timeout); -} + ctrl->m_self = ctrl; + ctrl->m_task = std::move(task); -bool thread_ctrl::wait_wait(u64 timeout) -{ - std::unique_lock lock(m_data->mutex, std::adopt_lock); + // TODO: implement simple thread pool +#ifdef _WIN32 + std::uintptr_t thread = _beginthreadex(nullptr, 0, entry, ctrl.get(), 0, nullptr); + verify("thread_ctrl::start" HERE), thread != 0; +#else + pthread_t thread; + verify("thread_ctrl::start" HERE), pthread_create(&thread, nullptr, entry, ctrl.get()); +#endif - if (timeout && m_data->cond.wait_until(lock, m_data->time_limit) == std::cv_status::timeout) - { - lock.release(); - return false; - } - - m_data->cond.wait(lock); - lock.release(); - return true; -} - -void thread_ctrl::test() -{ - if (m_data && m_data->exception) - { - std::rethrow_exception(m_data->exception); - } + // TODO: this is unsafe and must be duplicated in thread_ctrl::initialize + ctrl->m_thread = thread; } void thread_ctrl::initialize() { // Initialize TLS variable g_tls_this_thread = this; - g_tls_internal = this->m_data; -#ifdef _WIN32 - m_data->thread_id = GetCurrentThreadId(); -#endif g_tls_log_prefix = [] { @@ -1983,8 +1956,7 @@ void thread_ctrl::initialize() ++g_thread_count; -#if defined(_MSC_VER) - +#ifdef _MSC_VER struct THREADNAME_INFO { DWORD dwType; @@ -2010,11 +1982,10 @@ void thread_ctrl::initialize() { } } - #endif } -void thread_ctrl::finalize() noexcept +void thread_ctrl::finalize(std::exception_ptr eptr) noexcept { // Disable and discard possible interrupts interrupt_disable(); @@ -2023,135 +1994,213 @@ void thread_ctrl::finalize() noexcept // TODO vm::reservation_free(); - // Call atexit functions - if (m_data) m_data->atexit.exec(); + // Run atexit functions + m_task.invoke(); + m_task.reset(); + +#ifdef _WIN32 + ULONG64 cycles{}; + QueryThreadCycleTime(GetCurrentThread(), &cycles); + FILETIME ctime, etime, ktime, utime; + GetThreadTimes(GetCurrentThread(), &ctime, &etime, &ktime, &utime); + const u64 time = ((ktime.dwLowDateTime | (u64)ktime.dwHighDateTime << 32) + (utime.dwLowDateTime | (u64)utime.dwHighDateTime << 32)) * 100ull; +#elif __linux__ + const u64 cycles = 0; // Not supported + struct ::rusage stats{}; + ::getrusage(RUSAGE_THREAD, &stats); + const u64 time = (stats.ru_utime.tv_sec + stats.ru_stime.tv_sec) * 1000000000ull + (stats.ru_utime.tv_usec + stats.ru_stime.tv_usec) * 1000ull; +#else + const u64 cycles = 0; + const u64 time = 0; +#endif + + LOG_NOTICE(GENERAL, "Thread time: %fs (%fGc); Faults: %u [rsx:%u, spu:%u];", + time / 1000000000., + cycles / 1000000000., + vm::g_tls_fault_count, + g_tls_fault_rsx, + g_tls_fault_spu); --g_thread_count; -#ifdef _WIN32 - ULONG64 time; - QueryThreadCycleTime(GetCurrentThread(), &time); - LOG_NOTICE(GENERAL, "Thread time: %f Gc", time / 1000000000.); -#endif + // Untangle circular reference, set exception + semaphore_lock{m_mutex}, m_self.reset(), m_exception = eptr; + + // Signal joining waiters + m_jcv.notify_all(); } -void thread_ctrl::push_atexit(task_stack task) +void thread_ctrl::_push(task_stack task) { - m_data->atexit.push(std::move(task)); + g_tls_this_thread->m_task.push(std::move(task)); +} + +bool thread_ctrl::_wait_for(u64 usec) +{ + auto _this = g_tls_this_thread; + + struct half_lock + { + semaphore<>& ref; + + void lock() + { + // Used to avoid additional lock + unlock + } + + void unlock() + { + ref.post(); + } + } + _lock{_this->m_mutex}; + + if (u32 sig = _this->m_signal.load()) + { + thread_ctrl::test(); + + if (sig & 1) + { + _this->m_signal &= ~1; + return true; + } + } + + _this->m_mutex.wait(); + + while (_this->m_cond.wait(_lock, usec)) + { + if (u32 sig = _this->m_signal.load()) + { + thread_ctrl::test(); + + if (sig & 1) + { + _this->m_signal &= ~1; + return true; + } + } + + if (usec != -1) + { + return false; + } + + _this->m_mutex.wait(); + + if (u32 sig = _this->m_signal.load()) + { + if (sig & 2 && _this->m_exception) + { + _this->_throw(); + } + + if (sig & 1) + { + _this->m_signal &= ~1; + _this->m_mutex.post(); + return true; + } + } + } + + // Timeout + return false; +} + +[[noreturn]] void thread_ctrl::_throw() +{ + std::exception_ptr ex = std::exchange(m_exception, std::exception_ptr{}); + m_signal &= ~3; + m_mutex.post(); + std::rethrow_exception(std::move(ex)); +} + +void thread_ctrl::_notify(cond_variable thread_ctrl::* ptr) +{ + // Optimized lock + unlock + if (!m_mutex.get()) + { + m_mutex.wait(); + m_mutex.post(); + } + + (this->*ptr).notify_one(); } thread_ctrl::thread_ctrl(std::string&& name) : m_name(std::move(name)) { - static_assert(sizeof(std::thread) <= sizeof(m_thread), "Small storage"); - -#pragma push_macro("new") -#undef new - new (&m_thread) std::thread; -#pragma pop_macro("new") - - initialize_once(); } thread_ctrl::~thread_ctrl() { - if (reinterpret_cast(m_thread).joinable()) + if (m_thread) { - reinterpret_cast(m_thread).detach(); +#ifdef _WIN32 + CloseHandle((HANDLE)m_thread.raw()); +#else + pthread_detach(m_thread.raw()); +#endif } - - delete m_data; - - reinterpret_cast(m_thread).~thread(); } -void thread_ctrl::initialize_once() +std::exception_ptr thread_ctrl::get_exception() const { - if (UNLIKELY(!m_data)) - { - auto ptr = new thread_ctrl::internal; + semaphore_lock lock(m_mutex); + return m_exception; +} - if (!m_data.compare_and_swap_test(nullptr, ptr)) - { - delete ptr; - } +void thread_ctrl::set_exception(std::exception_ptr ptr) +{ + semaphore_lock lock(m_mutex); + m_exception = ptr; + + if (m_exception) + { + m_signal |= 2; + m_cond.notify_one(); + } + else + { + m_signal &= ~2; } } void thread_ctrl::join() { - // Increase contention counter - const u32 _j = m_joining++; +#ifdef _WIN32 + //verify("thread_ctrl::join" HERE), WaitForSingleObjectEx((HANDLE)m_thread.load(), -1, false) == WAIT_OBJECT_0; +#endif - if (LIKELY(_j >= 0x80000000)) - { - // Already joined (signal condition) - m_joining = 0x80000000; - } - else if (LIKELY(_j == 0)) - { - // Winner joins the thread - reinterpret_cast(m_thread).join(); + semaphore_lock lock(m_mutex); - // Notify others if necessary - if (UNLIKELY(m_joining.exchange(0x80000000) != 1)) - { - // Serialize for reliable notification - m_data->mutex.lock(); - m_data->mutex.unlock(); - m_data->jcv.notify_all(); - } - } - else + while (m_self) { - // Hard way - std::unique_lock lock(m_data->mutex); - m_data->jcv.wait(lock, [&] { return m_joining >= 0x80000000; }); + m_jcv.wait(lock); } - if (UNLIKELY(m_data && m_data->exception && !std::uncaught_exception())) + if (UNLIKELY(m_exception && !std::uncaught_exception())) { - std::rethrow_exception(m_data->exception); + std::rethrow_exception(m_exception); } } -void thread_ctrl::lock() -{ - m_data->mutex.lock(); -} - -void thread_ctrl::unlock() -{ - m_data->mutex.unlock(); -} - -void thread_ctrl::lock_notify() -{ - if (UNLIKELY(g_tls_this_thread == this)) - { - return; - } - - // Serialize for reliable notification, condition is assumed to be changed externally - m_data->mutex.lock(); - m_data->mutex.unlock(); - m_data->cond.notify_one(); -} - void thread_ctrl::notify() { - m_data->cond.notify_one(); + if (!(m_signal & 1)) + { + m_signal |= 1; + _notify(&thread_ctrl::m_cond); + } } -void thread_ctrl::set_exception(std::exception_ptr e) -{ - m_data->exception = e; -} +static thread_local x64_context s_tls_context{}; static void _handle_interrupt(x64_context* ctx) { // Copy context for further use (TODO: is it safe on all platforms?) - g_tls_internal->_context = *ctx; + s_tls_context = *ctx; thread_ctrl::handle_interrupt(); } @@ -2166,7 +2215,7 @@ static thread_local void(*s_tls_handler)() = nullptr; s_tls_handler(); // Restore context in the case of return - const auto ctx = g_tls_internal->thread_ctx; + const auto ctx = &s_tls_context; if (s_tls_ret_pos) { @@ -2188,26 +2237,22 @@ static thread_local void(*s_tls_handler)() = nullptr; void thread_ctrl::handle_interrupt() { const auto _this = g_tls_this_thread; - const auto ctx = g_tls_internal->thread_ctx; + const auto ctx = &s_tls_context; if (_this->m_guard & 0x80000000) { // Discard interrupt if interrupts are disabled - if (g_tls_internal->interrupt.exchange(nullptr)) + if (_this->m_iptr.exchange(nullptr)) { - _this->lock(); - _this->unlock(); - g_tls_internal->icv.notify_one(); + _this->_notify(&thread_ctrl::m_icv); } } else if (_this->m_guard == 0) { // Set interrupt immediately if no guard set - if (const auto handler = g_tls_internal->interrupt.exchange(nullptr)) + if (const auto handler = _this->m_iptr.exchange(nullptr)) { - _this->lock(); - _this->unlock(); - g_tls_internal->icv.notify_one(); + _this->_notify(&thread_ctrl::m_icv); #ifdef _WIN32 // Install function call @@ -2234,13 +2279,15 @@ void thread_ctrl::handle_interrupt() void thread_ctrl::interrupt(void(*handler)()) { + semaphore_lock lock(m_mutex); + verify(HERE), this != g_tls_this_thread; // TODO: self-interrupt - verify(HERE), m_data->interrupt.compare_and_swap_test(nullptr, handler); // TODO: multiple interrupts + verify(HERE), m_iptr.compare_and_swap_test(nullptr, handler); // TODO: multiple interrupts #ifdef _WIN32 - const auto ctx = m_data->thread_ctx; + const auto ctx = &s_tls_context; - const HANDLE nt = OpenThread(THREAD_ALL_ACCESS, FALSE, m_data->thread_id); + const HANDLE nt = (HANDLE)m_thread.load();//OpenThread(THREAD_ALL_ACCESS, FALSE, m_data->thread_id); verify(HERE), nt; verify(HERE), SuspendThread(nt) != -1; @@ -2254,28 +2301,24 @@ void thread_ctrl::interrupt(void(*handler)()) RIP(ctx) = _rip; verify(HERE), ResumeThread(nt) != -1; - CloseHandle(nt); + //CloseHandle(nt); #else - pthread_kill(reinterpret_cast(m_thread).native_handle(), SIGUSR1); + pthread_kill(m_thread.load(), SIGUSR1); #endif - std::unique_lock lock(m_data->mutex, std::adopt_lock); - - while (m_data->interrupt) + while (m_iptr) { - m_data->icv.wait(lock); + m_icv.wait(lock); } - - lock.release(); } void thread_ctrl::test_interrupt() { if (m_guard & 0x80000000) { - if (m_data->interrupt.exchange(nullptr)) + if (m_iptr.exchange(nullptr)) { - lock(), unlock(), m_data->icv.notify_one(); + _notify(&thread_ctrl::m_icv); } return; @@ -2286,18 +2329,30 @@ void thread_ctrl::test_interrupt() m_guard = 0; // Execute delayed interrupt handler - if (const auto handler = m_data->interrupt.exchange(nullptr)) + if (const auto handler = m_iptr.exchange(nullptr)) { - lock(), unlock(), m_data->icv.notify_one(); + _notify(&thread_ctrl::m_icv); return handler(); } } } -void thread_ctrl::sleep(u64 useconds) +void thread_ctrl::test() { - std::this_thread::sleep_for(std::chrono::microseconds(useconds)); + const auto _this = g_tls_this_thread; + + if (_this->m_signal & 2) + { + _this->m_mutex.wait(); + + if (_this->m_exception) + { + _this->_throw(); + } + + _this->m_mutex.post(); + } } @@ -2341,3 +2396,7 @@ void named_thread::start_thread(const std::shared_ptr& _this) on_exit(); }); } + +task_stack::task_base::~task_base() +{ +} diff --git a/Utilities/Thread.h b/Utilities/Thread.h index 8c61cbf974..f265b49773 100644 --- a/Utilities/Thread.h +++ b/Utilities/Thread.h @@ -7,6 +7,9 @@ #include #include +#include "sema.h" +#include "cond.h" + // Will report exception and call std::abort() if put in catch(...) [[noreturn]] void catch_all_exceptions(); @@ -17,19 +20,19 @@ class task_stack { std::unique_ptr next; - virtual ~task_base() = default; + virtual ~task_base(); - virtual void exec() + virtual void invoke() { if (next) { - next->exec(); + next->invoke(); } } }; - template - struct task_type : task_base + template + struct task_type final : task_base { std::remove_reference_t func; @@ -38,10 +41,10 @@ class task_stack { } - void exec() override + void invoke() final override { func(); - task_base::exec(); + task_base::invoke(); } }; @@ -50,7 +53,7 @@ class task_stack public: task_stack() = default; - template + template task_stack(F&& func) : m_stack(new task_type(std::forward(func))) { @@ -70,11 +73,11 @@ public: m_stack.reset(); } - void exec() const + void invoke() const { if (m_stack) { - m_stack->exec(); + m_stack->invoke(); } } }; @@ -82,23 +85,41 @@ public: // Thread control class class thread_ctrl final { -public: // TODO - struct internal; - -private: + // Current thread static thread_local thread_ctrl* g_tls_this_thread; - // Thread handle storage - std::aligned_storage_t<16> m_thread; + // Self pointer + std::shared_ptr m_self; - // Thread join contention counter - atomic_t m_joining{}; + // Thread handle (platform-specific) + atomic_t m_thread{0}; + + // Thread mutex + mutable semaphore<> m_mutex; + + // Thread condition variable + cond_variable m_cond; + + // Thread flags + atomic_t m_signal{0}; + + // Thread joining condition variable + cond_variable m_jcv; + + // Remotely set or caught exception + std::exception_ptr m_exception; + + // Thread initial task or atexit task + task_stack m_task; // Thread interrupt guard counter volatile u32 m_guard = 0x80000000; - // Thread internals - atomic_t m_data{}; + // Thread interrupt condition variable + cond_variable m_icv; + + // Interrupt function + atomic_t m_iptr{nullptr}; // Fixed name std::string m_name; @@ -110,19 +131,19 @@ private: void initialize(); // Called at the thread end - void finalize() noexcept; + void finalize(std::exception_ptr) noexcept; - // Get atexit function - void push_atexit(task_stack); + // Add task (atexit) + static void _push(task_stack); - // Start waiting - void wait_start(u64 timeout); + // Internal waiting function, may throw. Infinite value is -1. + static bool _wait_for(u64 usec); - // Proceed waiting - bool wait_wait(u64 timeout); + // Internal throwing function. Mutex must be locked and will be unlocked. + [[noreturn]] void _throw(); - // Check exception - void test(); + // Internal notification function + void _notify(cond_variable thread_ctrl::*); public: thread_ctrl(std::string&& name); @@ -137,63 +158,22 @@ public: return m_name; } - // Initialize internal data - void initialize_once(); + // Get exception + std::exception_ptr get_exception() const; + + // Set exception + void set_exception(std::exception_ptr ptr); // Get thread result (may throw, simultaneous joining allowed) void join(); - // Lock thread mutex - void lock(); - - // Lock conditionally (double-checked) - template - bool lock_if(F&& pred) - { - if (pred()) - { - lock(); - - try - { - if (LIKELY(pred())) - { - return true; - } - else - { - unlock(); - return false; - } - } - catch (...) - { - unlock(); - throw; - } - } - else - { - return false; - } - } - - // Unlock thread mutex (internal data must be initialized) - void unlock(); - - // Lock, unlock, notify the thread (required if the condition changed locklessly) - void lock_notify(); - - // Notify the thread (internal data must be initialized) + // Notify the thread void notify(); - // Set exception (internal data must be initialized, thread mutex must be locked) - void set_exception(std::exception_ptr); - // Internal static void handle_interrupt(); - // Interrupt thread with specified handler call (thread mutex must be locked) + // Interrupt thread with specified handler call void interrupt(void(*handler)()); // Interrupt guard recursive enter @@ -226,90 +206,45 @@ public: // Check interrupt if delayed by guard scope void test_interrupt(); - // Current thread sleeps for specified amount of microseconds. - // Wrapper for std::this_thread::sleep, doesn't require valid thread_ctrl. - [[deprecated]] static void sleep(u64 useconds); - - // Wait until pred(). Abortable, may throw. Thread must be locked. - // Timeout in microseconds (zero means infinite). - template - static inline auto wait_for(u64 useconds, F&& pred) + // Wait once with timeout. Abortable, may throw. May spuriously return false. + static inline bool wait_for(u64 usec) { - if (useconds) - { - g_tls_this_thread->wait_start(useconds); - } - - while (true) - { - g_tls_this_thread->test(); - - if (auto&& result = pred()) - { - return result; - } - else if (!g_tls_this_thread->wait_wait(useconds) && useconds) - { - return result; - } - } + return _wait_for(usec); } - // Wait once. Abortable, may throw. Thread must be locked. - // Timeout in microseconds (zero means infinite). - static inline bool wait_for(u64 useconds = 0) - { - if (useconds) - { - g_tls_this_thread->wait_start(useconds); - } - - g_tls_this_thread->test(); - - if (!g_tls_this_thread->wait_wait(useconds) && useconds) - { - return false; - } - - g_tls_this_thread->test(); - return true; - } - - // Wait until pred(). Abortable, may throw. Thread must be locked. - template - static inline auto wait(F&& pred) - { - while (true) - { - g_tls_this_thread->test(); - - if (auto&& result = pred()) - { - return result; - } - - g_tls_this_thread->wait_wait(0); - } - } - - // Wait once. Abortable, may throw. Thread must be locked. + // Wait. Abortable, may throw. static inline void wait() { - g_tls_this_thread->test(); - g_tls_this_thread->wait_wait(0); - g_tls_this_thread->test(); + _wait_for(-1); } - // Wait eternally. Abortable, may throw. Thread must be locked. + // Wait until pred(). Abortable, may throw. + template> + static inline RT wait(F&& pred) + { + while (true) + { + if (RT result = pred()) + { + return result; + } + + _wait_for(-1); + } + } + + // Wait eternally until aborted. [[noreturn]] static inline void eternalize() { while (true) { - g_tls_this_thread->test(); - g_tls_this_thread->wait_wait(0); + _wait_for(-1); } } + // Test exception (may throw). + static void test(); + // Get current thread (may be nullptr) static thread_ctrl* get_current() { @@ -320,14 +255,14 @@ public: template static inline void atexit(F&& func) { - return g_tls_this_thread->push_atexit(std::forward(func)); + _push(std::forward(func)); } - // Named thread factory + // Create detached named thread template static inline void spawn(N&& name, F&& func) { - auto&& out = std::make_shared(std::forward(name)); + auto out = std::make_shared(std::forward(name)); thread_ctrl::start(out, std::forward(func)); } @@ -382,7 +317,7 @@ public: } // Access thread_ctrl - thread_ctrl* operator->() const + thread_ctrl* get() const { return m_thread.get(); } @@ -392,60 +327,12 @@ public: return m_thread->join(); } - void lock() const - { - return m_thread->lock(); - } - - void unlock() const - { - return m_thread->unlock(); - } - - void lock_notify() const - { - return m_thread->lock_notify(); - } - void notify() const { return m_thread->notify(); } }; -// Simple thread mutex locker -class thread_lock final -{ - thread_ctrl* m_thread; - -public: - thread_lock(const thread_lock&) = delete; - - // Lock specified thread - thread_lock(thread_ctrl* thread) - : m_thread(thread) - { - m_thread->lock(); - } - - // Lock specified named_thread - thread_lock(named_thread& thread) - : thread_lock(thread.operator->()) - { - } - - // Lock current thread - thread_lock() - : thread_lock(thread_ctrl::get_current()) - { - } - - ~thread_lock() - { - m_thread->unlock(); - } -}; - // Interrupt guard scope class thread_guard final { @@ -455,24 +342,24 @@ public: thread_guard(const thread_guard&) = delete; thread_guard(thread_ctrl* thread) - : m_thread(thread) + //: m_thread(thread) { - m_thread->guard_enter(); + //m_thread->guard_enter(); } thread_guard(named_thread& thread) - : thread_guard(thread.operator->()) + //: thread_guard(thread.get()) { } thread_guard() - : thread_guard(thread_ctrl::get_current()) + //: thread_guard(thread_ctrl::get_current()) { } ~thread_guard() noexcept(false) { - m_thread->guard_leave(); + //m_thread->guard_leave(); } }; @@ -498,7 +385,7 @@ public: } // Access thread_ctrl - thread_ctrl* operator->() const + thread_ctrl* get() const { return m_thread.get(); } diff --git a/Utilities/cond.h b/Utilities/cond.h index 00c58f02e8..e63a2331a4 100644 --- a/Utilities/cond.h +++ b/Utilities/cond.h @@ -20,13 +20,13 @@ public: constexpr cond_variable() = default; // Intrusive wait algorithm for lockable objects - template + template explicit_bool_t wait(T& object, u64 usec_timeout = -1) { const u32 _old = m_value.fetch_add(1); // Increment waiter counter - (object.*Unlock)(); + object.unlock(); const bool res = imp_wait(_old, usec_timeout); - (object.*Lock)(); + object.lock(); return res; } diff --git a/Utilities/mutex.h b/Utilities/mutex.h index 05b3c84ab5..f137a3014f 100644 --- a/Utilities/mutex.h +++ b/Utilities/mutex.h @@ -99,27 +99,39 @@ public: } }; -// Simplified shared (reader) lock implementation, std::shared_lock compatible. +// Simplified shared (reader) lock implementation. class reader_lock final { shared_mutex& m_mutex; + void lock() + { + m_mutex.lock_shared(); + } + + void unlock() + { + m_mutex.unlock_shared(); + } + + friend class cond_variable; + public: reader_lock(const reader_lock&) = delete; explicit reader_lock(shared_mutex& mutex) : m_mutex(mutex) { - m_mutex.lock_shared(); + lock(); } ~reader_lock() { - m_mutex.unlock_shared(); + unlock(); } }; -// Simplified exclusive (writer) lock implementation, std::lock_guard compatible. +// Simplified exclusive (writer) lock implementation. class writer_lock final { shared_mutex& m_mutex; diff --git a/Utilities/sema.h b/Utilities/sema.h index 341370f913..d8429bcb5e 100644 --- a/Utilities/sema.h +++ b/Utilities/sema.h @@ -13,6 +13,8 @@ class semaphore_base void imp_post(s32 _old); + friend class semaphore_lock; + protected: explicit constexpr semaphore_base(s32 value) : m_value{value} @@ -108,3 +110,34 @@ public: return Max; } }; + +class semaphore_lock +{ + semaphore_base& m_base; + + void lock() + { + m_base.wait(); + } + + void unlock() + { + m_base.post(INT32_MAX); + } + + friend class cond_variable; + +public: + explicit semaphore_lock(const semaphore_lock&) = delete; + + semaphore_lock(semaphore_base& sema) + : m_base(sema) + { + lock(); + } + + ~semaphore_lock() + { + unlock(); + } +}; diff --git a/rpcs3/Emu/CPU/CPUThread.cpp b/rpcs3/Emu/CPU/CPUThread.cpp index 398b61582d..53464ebf4a 100644 --- a/rpcs3/Emu/CPU/CPUThread.cpp +++ b/rpcs3/Emu/CPU/CPUThread.cpp @@ -43,8 +43,6 @@ void cpu_thread::on_task() Emu.SendDbgCommand(DID_CREATE_THREAD, this); - std::unique_lock lock(*this); - // Check thread status while (!test(state & cpu_flag::exit)) { @@ -53,8 +51,6 @@ void cpu_thread::on_task() // check stop status if (!test(state & cpu_flag::stop)) { - if (lock) lock.unlock(); - try { cpu_task(); @@ -73,12 +69,6 @@ void cpu_thread::on_task() continue; } - if (!lock) - { - lock.lock(); - continue; - } - thread_ctrl::wait(); } } @@ -86,7 +76,7 @@ void cpu_thread::on_task() void cpu_thread::on_stop() { state += cpu_flag::exit; - lock_notify(); + notify(); } cpu_thread::~cpu_thread() @@ -100,8 +90,6 @@ cpu_thread::cpu_thread(u32 id) bool cpu_thread::check_state() { - std::unique_lock lock(*this, std::defer_lock); - while (true) { CHECK_EMU_STATUS; // check at least once @@ -116,12 +104,6 @@ bool cpu_thread::check_state() break; } - if (!lock) - { - lock.lock(); - continue; - } - thread_ctrl::wait(); } @@ -144,7 +126,7 @@ bool cpu_thread::check_state() void cpu_thread::run() { state -= cpu_flag::stop; - lock_notify(); + notify(); } void cpu_thread::set_signal() diff --git a/rpcs3/Emu/Cell/Modules/cellFs.cpp b/rpcs3/Emu/Cell/Modules/cellFs.cpp index 6e898bacf5..5342edaf3d 100644 --- a/rpcs3/Emu/Cell/Modules/cellFs.cpp +++ b/rpcs3/Emu/Cell/Modules/cellFs.cpp @@ -799,7 +799,7 @@ s32 cellFsAioRead(vm::ptr aio, vm::ptr id, fs_aio_cb_t func) { aio, func }, }); - m->thread->lock_notify(); + m->thread->notify(); return CELL_OK; } @@ -825,7 +825,7 @@ s32 cellFsAioWrite(vm::ptr aio, vm::ptr id, fs_aio_cb_t func) { aio, func }, }); - m->thread->lock_notify(); + m->thread->notify(); return CELL_OK; } diff --git a/rpcs3/Emu/Cell/Modules/cellSpursSpu.cpp b/rpcs3/Emu/Cell/Modules/cellSpursSpu.cpp index c0ddb89942..65e8fd1283 100644 --- a/rpcs3/Emu/Cell/Modules/cellSpursSpu.cpp +++ b/rpcs3/Emu/Cell/Modules/cellSpursSpu.cpp @@ -773,8 +773,6 @@ void spursSysServiceIdleHandler(SPUThread& spu, SpursKernelContext* ctxt) { bool shouldExit; - std::unique_lock lock(spu, std::defer_lock); - while (true) { vm::reservation_acquire(vm::base(spu.offset + 0x100), vm::cast(ctxt->spurs.addr(), HERE), 128); @@ -862,8 +860,6 @@ void spursSysServiceIdleHandler(SPUThread& spu, SpursKernelContext* ctxt) if (spuIdling && shouldExit == false && foundReadyWorkload == false) { // The system service blocks by making a reservation and waiting on the lock line reservation lost event. - CHECK_EMU_STATUS; - if (!lock) { lock.lock(); continue; } thread_ctrl::wait_for(1000); continue; } diff --git a/rpcs3/Emu/Cell/Modules/cellVdec.cpp b/rpcs3/Emu/Cell/Modules/cellVdec.cpp index 7ac7963fed..631fa8647e 100644 --- a/rpcs3/Emu/Cell/Modules/cellVdec.cpp +++ b/rpcs3/Emu/Cell/Modules/cellVdec.cpp @@ -75,6 +75,7 @@ struct vdec_thread : ppu_thread u64 next_pts{}; u64 next_dts{}; + std::mutex mutex; std::queue out; std::queue user_data; // TODO @@ -325,7 +326,7 @@ struct vdec_thread : ppu_thread cellVdec.trace("Got picture (pts=0x%llx[0x%llx], dts=0x%llx[0x%llx])", frame.pts, frame->pkt_pts, frame.dts, frame->pkt_dts); - thread_lock{*this}, out.push(std::move(frame)); + std::lock_guard{mutex}, out.push(std::move(frame)); cb_func(*this, id, CELL_VDEC_MSG_TYPE_PICOUT, CELL_OK, cb_arg); } @@ -437,7 +438,7 @@ s32 cellVdecClose(u32 handle) } vdec->cmd_push({vdec_cmd::close, 0}); - vdec->lock_notify(); + vdec->notify(); vdec->join(); idm::remove(handle); return CELL_OK; @@ -455,7 +456,7 @@ s32 cellVdecStartSeq(u32 handle) } vdec->cmd_push({vdec_cmd::start_seq, 0}); - vdec->lock_notify(); + vdec->notify(); return CELL_OK; } @@ -471,7 +472,7 @@ s32 cellVdecEndSeq(u32 handle) } vdec->cmd_push({vdec_cmd::end_seq, 0}); - vdec->lock_notify(); + vdec->notify(); return CELL_OK; } @@ -497,7 +498,7 @@ s32 cellVdecDecodeAu(u32 handle, CellVdecDecodeMode mode, vm::cptrcodecSpecificData, }); - vdec->lock_notify(); + vdec->notify(); return CELL_OK; } @@ -514,7 +515,7 @@ s32 cellVdecGetPicture(u32 handle, vm::cptr format, vm::ptr lock(vdec->mutex); if (vdec->out.empty()) { @@ -639,7 +640,7 @@ s32 cellVdecGetPicItem(u32 handle, vm::pptr picItem) u64 usrd; u32 frc; { - thread_lock lock(*vdec); + std::lock_guard lock(vdec->mutex); if (vdec->out.empty()) { @@ -830,7 +831,7 @@ s32 cellVdecSetFrameRate(u32 handle, CellVdecFrameRate frc) // TODO: check frc value vdec->cmd_push({vdec_cmd::set_frc, frc}); - vdec->lock_notify(); + vdec->notify(); return CELL_OK; } diff --git a/rpcs3/Emu/Cell/Modules/sys_ppu_thread_.cpp b/rpcs3/Emu/Cell/Modules/sys_ppu_thread_.cpp index 015b734c79..7d59ce3ff5 100644 --- a/rpcs3/Emu/Cell/Modules/sys_ppu_thread_.cpp +++ b/rpcs3/Emu/Cell/Modules/sys_ppu_thread_.cpp @@ -50,7 +50,7 @@ s32 sys_ppu_thread_create(vm::ptr thread_id, u32 entry, u64 arg, s32 prio, return eq.name == "_mxr000\0"_u64; })) { - thread_ctrl::sleep(50000); + thread_ctrl::wait_for(50000); } } diff --git a/rpcs3/Emu/Cell/PPUThread.cpp b/rpcs3/Emu/Cell/PPUThread.cpp index f5a9ca7c89..df44b7c043 100644 --- a/rpcs3/Emu/Cell/PPUThread.cpp +++ b/rpcs3/Emu/Cell/PPUThread.cpp @@ -312,33 +312,22 @@ void ppu_thread::cmd_pop(u32 count) cmd64 ppu_thread::cmd_wait() { - std::unique_lock lock(*this, std::defer_lock); - while (true) { if (UNLIKELY(test(state))) { - if (lock) lock.unlock(); - - if (check_state()) // check_status() requires unlocked mutex + if (check_state()) { return cmd64{}; } } - // Lightweight queue doesn't care about mutex state if (cmd64 result = cmd_queue[cmd_queue.peek()].exchange(cmd64{})) { return result; } - if (!lock) - { - lock.lock(); - continue; - } - - thread_ctrl::wait(); // Waiting requires locked mutex + thread_ctrl::wait(); } } diff --git a/rpcs3/Emu/Cell/SPUThread.cpp b/rpcs3/Emu/Cell/SPUThread.cpp index 7da1ed0bb1..1af320b8be 100644 --- a/rpcs3/Emu/Cell/SPUThread.cpp +++ b/rpcs3/Emu/Cell/SPUThread.cpp @@ -503,7 +503,7 @@ void SPUThread::process_mfc_cmd(u32 cmd) u32 SPUThread::get_events(bool waiting) { // check reservation status and set SPU_EVENT_LR if lost - if (last_raddr != 0 && !vm::reservation_test(operator->())) + if (last_raddr != 0 && !vm::reservation_test(this->get())) { ch_event_stat |= SPU_EVENT_LR; @@ -546,7 +546,7 @@ void SPUThread::set_events(u32 mask) // Notify if some events were set if (~old_stat & mask && old_stat & SPU_EVENT_WAITING && ch_event_stat & SPU_EVENT_WAITING) { - lock_notify(); + notify(); } } @@ -600,7 +600,7 @@ bool SPUThread::get_ch_value(u32 ch, u32& out) { if (!channel.try_pop(out)) { - thread_lock{*this}, thread_ctrl::wait([&] { return test(state & cpu_flag::stop) || channel.try_pop(out); }); + thread_ctrl::wait([&] { return test(state & cpu_flag::stop) || channel.try_pop(out); }); return !test(state & cpu_flag::stop); } @@ -615,8 +615,6 @@ bool SPUThread::get_ch_value(u32 ch, u32& out) // break; case SPU_RdInMbox: { - std::unique_lock lock(*this, std::defer_lock); - while (true) { if (const uint old_count = ch_in_mbox.try_pop(out)) @@ -636,12 +634,6 @@ bool SPUThread::get_ch_value(u32 ch, u32& out) return false; } - if (!lock) - { - lock.lock(); - continue; - } - thread_ctrl::wait(); } } @@ -691,8 +683,6 @@ bool SPUThread::get_ch_value(u32 ch, u32& out) case SPU_RdEventStat: { - std::unique_lock lock(*this, std::defer_lock); - // start waiting or return immediately if (u32 res = get_events(true)) { @@ -707,8 +697,6 @@ bool SPUThread::get_ch_value(u32 ch, u32& out) } else { - lock.lock(); - // simple waiting loop otherwise while (!get_events(true) && !test(state & cpu_flag::stop)) { @@ -754,8 +742,6 @@ bool SPUThread::set_ch_value(u32 ch, u32 value) { if (offset >= RAW_SPU_BASE_ADDR) { - std::unique_lock lock(*this, std::defer_lock); - while (!ch_out_intr_mbox.try_push(value)) { CHECK_EMU_STATUS; @@ -765,12 +751,6 @@ bool SPUThread::set_ch_value(u32 ch, u32 value) return false; } - if (!lock) - { - lock.lock(); - continue; - } - thread_ctrl::wait(); } @@ -961,8 +941,6 @@ bool SPUThread::set_ch_value(u32 ch, u32 value) case SPU_WrOutMbox: { - std::unique_lock lock(*this, std::defer_lock); - while (!ch_out_mbox.try_push(value)) { CHECK_EMU_STATUS; @@ -972,12 +950,6 @@ bool SPUThread::set_ch_value(u32 ch, u32 value) return false; } - if (!lock) - { - lock.lock(); - continue; - } - thread_ctrl::wait(); } @@ -1237,7 +1209,7 @@ bool SPUThread::stop_and_signal(u32 code) return false; } - group->cv.wait_for(lv2_lock, 1ms); + group->cv.wait(lv2_lock, 1000); } // change group status @@ -1278,7 +1250,7 @@ bool SPUThread::stop_and_signal(u32 code) return false; } - get_current_thread_cv().wait(lv2_lock); + LV2_UNLOCK, thread_ctrl::wait(); } // event data must be set by push() @@ -1303,7 +1275,7 @@ bool SPUThread::stop_and_signal(u32 code) if (thread && thread.get() != this) { thread->state -= cpu_flag::suspend; - thread->lock_notify(); + thread->notify(); } } @@ -1342,7 +1314,7 @@ bool SPUThread::stop_and_signal(u32 code) if (thread && thread.get() != this) { thread->state += cpu_flag::stop; - thread->lock_notify(); + thread->notify(); } } diff --git a/rpcs3/Emu/Cell/SPUThread.h b/rpcs3/Emu/Cell/SPUThread.h index ab072f9363..b5ff1e10bc 100644 --- a/rpcs3/Emu/Cell/SPUThread.h +++ b/rpcs3/Emu/Cell/SPUThread.h @@ -180,7 +180,7 @@ public: data.value |= value; }); - if (old.wait) spu.lock_notify(); + if (old.wait) spu.notify(); } // push unconditionally (overwriting previous value), may require notification @@ -193,7 +193,7 @@ public: data.value = value; }); - if (old.wait) spu.lock_notify(); + if (old.wait) spu.notify(); } // returns true on success @@ -228,7 +228,7 @@ public: // value is not cleared and may be read again }); - if (old.wait) spu.lock_notify(); + if (old.wait) spu.notify(); return old.value; } @@ -295,7 +295,7 @@ public: return false; })) { - spu.lock_notify(); + spu.notify(); } } diff --git a/rpcs3/Emu/Cell/lv2/lv2.cpp b/rpcs3/Emu/Cell/lv2/lv2.cpp index 7dcbcabb9b..70f50c3dfd 100644 --- a/rpcs3/Emu/Cell/lv2/lv2.cpp +++ b/rpcs3/Emu/Cell/lv2/lv2.cpp @@ -1013,4 +1013,4 @@ extern ppu_function_t ppu_get_syscall(u64 code) return nullptr; } -DECLARE(lv2_lock_t::mutex); +DECLARE(lv2_lock_guard::g_sema); diff --git a/rpcs3/Emu/Cell/lv2/sys_cond.cpp b/rpcs3/Emu/Cell/lv2/sys_cond.cpp index 47f8b81266..c7078782bf 100644 --- a/rpcs3/Emu/Cell/lv2/sys_cond.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_cond.cpp @@ -222,11 +222,11 @@ s32 sys_cond_wait(ppu_thread& ppu, u32 cond_id, u64 timeout) continue; } - get_current_thread_cv().wait_for(lv2_lock, std::chrono::microseconds(timeout - passed)); + LV2_UNLOCK, thread_ctrl::wait_for(timeout - passed); } else { - get_current_thread_cv().wait(lv2_lock); + LV2_UNLOCK, thread_ctrl::wait(); } } diff --git a/rpcs3/Emu/Cell/lv2/sys_event.cpp b/rpcs3/Emu/Cell/lv2/sys_event.cpp index d4fa9baffb..15200e0f34 100644 --- a/rpcs3/Emu/Cell/lv2/sys_event.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_event.cpp @@ -282,11 +282,11 @@ s32 sys_event_queue_receive(ppu_thread& ppu, u32 equeue_id, vm::ptr return CELL_ETIMEDOUT; } - get_current_thread_cv().wait_for(lv2_lock, std::chrono::microseconds(timeout - passed)); + LV2_UNLOCK, thread_ctrl::wait_for(timeout - passed); } else { - get_current_thread_cv().wait(lv2_lock); + LV2_UNLOCK, thread_ctrl::wait(); } } diff --git a/rpcs3/Emu/Cell/lv2/sys_event_flag.cpp b/rpcs3/Emu/Cell/lv2/sys_event_flag.cpp index a62444cf0a..a8a451531c 100644 --- a/rpcs3/Emu/Cell/lv2/sys_event_flag.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_event_flag.cpp @@ -162,11 +162,11 @@ s32 sys_event_flag_wait(ppu_thread& ppu, u32 id, u64 bitptn, u32 mode, vm::ptrlock_notify(); + thread->notify(); } void lv2_int_serv_t::join(ppu_thread& ppu, lv2_lock_t lv2_lock) @@ -41,14 +41,14 @@ void lv2_int_serv_t::join(ppu_thread& ppu, lv2_lock_t lv2_lock) { ppu_cmd::opcode, ppu_instructions::SC(0) }, }); - thread->lock_notify(); + thread->notify(); // Join thread (TODO) while (!test(thread->state & cpu_flag::exit)) { CHECK_EMU_STATUS; - get_current_thread_cv().wait_for(lv2_lock, 1ms); + LV2_UNLOCK, thread_ctrl::wait_for(1000); } // Cleanup @@ -155,7 +155,7 @@ void sys_interrupt_thread_eoi(ppu_thread& ppu) // Low-level PPU function example if (ppu.lr == 0 || ppu.gpr[11] != 88) { // Low-level function must disable interrupts before throwing (not related to sys_interrupt_*, it's rather coincidence) - ppu->interrupt_disable(); + ppu.get()->interrupt_disable(); throw cpu_flag::ret; } } diff --git a/rpcs3/Emu/Cell/lv2/sys_lwcond.cpp b/rpcs3/Emu/Cell/lv2/sys_lwcond.cpp index 11454ebe27..3b1f6ba95d 100644 --- a/rpcs3/Emu/Cell/lv2/sys_lwcond.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_lwcond.cpp @@ -202,11 +202,11 @@ s32 _sys_lwcond_queue_wait(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id, u64 t } } - get_current_thread_cv().wait_for(lv2_lock, std::chrono::microseconds(timeout - passed)); + LV2_UNLOCK, thread_ctrl::wait_for(timeout - passed); } else { - get_current_thread_cv().wait(lv2_lock); + LV2_UNLOCK, thread_ctrl::wait(); } } diff --git a/rpcs3/Emu/Cell/lv2/sys_lwmutex.cpp b/rpcs3/Emu/Cell/lv2/sys_lwmutex.cpp index b35b4e4a64..66f287f56b 100644 --- a/rpcs3/Emu/Cell/lv2/sys_lwmutex.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_lwmutex.cpp @@ -114,11 +114,11 @@ s32 _sys_lwmutex_lock(ppu_thread& ppu, u32 lwmutex_id, u64 timeout) return CELL_ETIMEDOUT; } - get_current_thread_cv().wait_for(lv2_lock, std::chrono::microseconds(timeout - passed)); + LV2_UNLOCK, thread_ctrl::wait_for(timeout - passed); } else { - get_current_thread_cv().wait(lv2_lock); + LV2_UNLOCK, thread_ctrl::wait(); } } diff --git a/rpcs3/Emu/Cell/lv2/sys_mutex.cpp b/rpcs3/Emu/Cell/lv2/sys_mutex.cpp index f51bf63e5b..29f098a02a 100644 --- a/rpcs3/Emu/Cell/lv2/sys_mutex.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_mutex.cpp @@ -148,11 +148,11 @@ s32 sys_mutex_lock(ppu_thread& ppu, u32 mutex_id, u64 timeout) return CELL_ETIMEDOUT; } - get_current_thread_cv().wait_for(lv2_lock, std::chrono::microseconds(timeout - passed)); + LV2_UNLOCK, thread_ctrl::wait_for(timeout - passed); } else { - get_current_thread_cv().wait(lv2_lock); + LV2_UNLOCK, thread_ctrl::wait(); } } diff --git a/rpcs3/Emu/Cell/lv2/sys_ppu_thread.cpp b/rpcs3/Emu/Cell/lv2/sys_ppu_thread.cpp index f3cae43e13..0cca39ae6d 100644 --- a/rpcs3/Emu/Cell/lv2/sys_ppu_thread.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_ppu_thread.cpp @@ -74,7 +74,7 @@ s32 sys_ppu_thread_join(ppu_thread& ppu, u32 thread_id, vm::ptr vptr) { CHECK_EMU_STATUS; - get_current_thread_cv().wait_for(lv2_lock, 1ms); + LV2_UNLOCK, thread_ctrl::wait_for(1000); } // get exit status from the register diff --git a/rpcs3/Emu/Cell/lv2/sys_rwlock.cpp b/rpcs3/Emu/Cell/lv2/sys_rwlock.cpp index 0c5a6d2d45..5424b3df0c 100644 --- a/rpcs3/Emu/Cell/lv2/sys_rwlock.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_rwlock.cpp @@ -130,11 +130,11 @@ s32 sys_rwlock_rlock(ppu_thread& ppu, u32 rw_lock_id, u64 timeout) return CELL_ETIMEDOUT; } - get_current_thread_cv().wait_for(lv2_lock, std::chrono::microseconds(timeout - passed)); + LV2_UNLOCK, thread_ctrl::wait_for(timeout - passed); } else { - get_current_thread_cv().wait(lv2_lock); + LV2_UNLOCK, thread_ctrl::wait(); } } @@ -253,11 +253,11 @@ s32 sys_rwlock_wlock(ppu_thread& ppu, u32 rw_lock_id, u64 timeout) return CELL_ETIMEDOUT; } - get_current_thread_cv().wait_for(lv2_lock, std::chrono::microseconds(timeout - passed)); + LV2_UNLOCK, thread_ctrl::wait_for(timeout - passed); } else { - get_current_thread_cv().wait(lv2_lock); + LV2_UNLOCK, thread_ctrl::wait(); } } diff --git a/rpcs3/Emu/Cell/lv2/sys_semaphore.cpp b/rpcs3/Emu/Cell/lv2/sys_semaphore.cpp index cead96f74f..a72a221d16 100644 --- a/rpcs3/Emu/Cell/lv2/sys_semaphore.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_semaphore.cpp @@ -108,11 +108,11 @@ s32 sys_semaphore_wait(ppu_thread& ppu, u32 sem_id, u64 timeout) return CELL_ETIMEDOUT; } - get_current_thread_cv().wait_for(lv2_lock, std::chrono::microseconds(timeout - passed)); + LV2_UNLOCK, thread_ctrl::wait_for(timeout - passed); } else { - get_current_thread_cv().wait(lv2_lock); + LV2_UNLOCK, thread_ctrl::wait(); } } diff --git a/rpcs3/Emu/Cell/lv2/sys_spu.cpp b/rpcs3/Emu/Cell/lv2/sys_spu.cpp index 2bd63950f8..5420062d9f 100644 --- a/rpcs3/Emu/Cell/lv2/sys_spu.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_spu.cpp @@ -421,7 +421,7 @@ s32 sys_spu_thread_group_resume(u32 id) if (thread) { thread->state -= cpu_flag::suspend; - thread->lock_notify(); + thread->notify(); } } @@ -504,7 +504,7 @@ s32 sys_spu_thread_group_terminate(u32 id, s32 value) if (thread) { thread->state += cpu_flag::stop; - thread->lock_notify(); + thread->notify(); } } @@ -563,7 +563,7 @@ s32 sys_spu_thread_group_join(u32 id, vm::ptr cause, vm::ptr status) CHECK_EMU_STATUS; - group->cv.wait_for(lv2_lock, 1ms); + group->cv.wait(lv2_lock, 1000); } switch (group->join_state & ~SPU_TGJSF_IS_JOINING) diff --git a/rpcs3/Emu/Cell/lv2/sys_spu.h b/rpcs3/Emu/Cell/lv2/sys_spu.h index 2b88620151..2e3708b9cc 100644 --- a/rpcs3/Emu/Cell/lv2/sys_spu.h +++ b/rpcs3/Emu/Cell/lv2/sys_spu.h @@ -157,7 +157,7 @@ struct lv2_spu_group_t s32 exit_status; // SPU Thread Group Exit Status atomic_t join_state; // flags used to detect exit cause - std::condition_variable cv; // used to signal waiting PPU thread + cond_variable cv; // used to signal waiting PPU thread std::weak_ptr ep_run; // port for SYS_SPU_THREAD_GROUP_EVENT_RUN events std::weak_ptr ep_exception; // TODO: SYS_SPU_THREAD_GROUP_EVENT_EXCEPTION diff --git a/rpcs3/Emu/Cell/lv2/sys_sync.h b/rpcs3/Emu/Cell/lv2/sys_sync.h index 18ee7341e9..80ae855f62 100644 --- a/rpcs3/Emu/Cell/lv2/sys_sync.h +++ b/rpcs3/Emu/Cell/lv2/sys_sync.h @@ -1,8 +1,10 @@ #pragma once #include "Utilities/SleepQueue.h" -#include -#include +#include "Utilities/Thread.h" +#include "Utilities/mutex.h" +#include "Utilities/sema.h" +#include "Utilities/cond.h" // attr_protocol (waiting scheduling policy) enum @@ -43,27 +45,26 @@ enum SYS_SYNC_NOT_ADAPTIVE = 0x2000, }; -extern std::condition_variable& get_current_thread_cv(); - -// Simple class for global mutex to pass unique_lock and check it -struct lv2_lock_t +// Temporary implementation for LV2_UNLOCK (TODO: remove it) +struct lv2_lock_guard { - using type = std::unique_lock; + static semaphore<> g_sema; - type& ref; + lv2_lock_guard(const lv2_lock_guard&) = delete; - lv2_lock_t(type& lv2_lock) - : ref(lv2_lock) + lv2_lock_guard() { - verify(HERE), ref.owns_lock(), ref.mutex() == &mutex; + g_sema.post(); } - operator type&() const + ~lv2_lock_guard() { - return ref; + g_sema.wait(); } - - static type::mutex_type mutex; }; -#define LV2_LOCK lv2_lock_t::type lv2_lock(lv2_lock_t::mutex) +using lv2_lock_t = semaphore_lock&; + +#define LV2_LOCK semaphore_lock lv2_lock(lv2_lock_guard::g_sema) + +#define LV2_UNLOCK lv2_lock_guard{} diff --git a/rpcs3/Emu/Cell/lv2/sys_timer.cpp b/rpcs3/Emu/Cell/lv2/sys_timer.cpp index e0bfa700ab..d2d9f623fd 100644 --- a/rpcs3/Emu/Cell/lv2/sys_timer.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_timer.cpp @@ -16,7 +16,8 @@ extern u64 get_system_time(); void lv2_timer_t::on_task() { - thread_lock lock(*this); + //thread_lock lock(*this); + LV2_LOCK; while (state <= SYS_TIMER_STATE_RUN) { @@ -24,7 +25,7 @@ void lv2_timer_t::on_task() if (state == SYS_TIMER_STATE_RUN) { - LV2_LOCK; + //LV2_LOCK; while (get_system_time() >= expire) { @@ -52,7 +53,7 @@ void lv2_timer_t::on_task() continue; } - thread_ctrl::wait_for(1000); + LV2_UNLOCK, thread_ctrl::wait_for(1000); } } @@ -65,7 +66,7 @@ void lv2_timer_t::on_stop() { // Signal thread using invalid state and join state = -1; - this->lock_notify(); + this->notify(); named_thread::on_stop(); } @@ -170,7 +171,7 @@ s32 _sys_timer_start(u32 timer_id, u64 base_time, u64 period) timer->expire = base_time ? base_time : start_time + period; timer->period = period; timer->state = SYS_TIMER_STATE_RUN; - timer->lock_notify(); + timer->notify(); return CELL_OK; } diff --git a/rpcs3/Emu/Memory/wait_engine.cpp b/rpcs3/Emu/Memory/wait_engine.cpp index fa96ab1393..35c1340432 100644 --- a/rpcs3/Emu/Memory/wait_engine.cpp +++ b/rpcs3/Emu/Memory/wait_engine.cpp @@ -32,19 +32,18 @@ namespace vm , m_thread(ptr->thread) { // Initialize waiter - writer_lock{s_mutex}, s_waiters.emplace(m_ptr); - - m_thread->lock(); + writer_lock lock(s_mutex); + s_waiters.emplace(m_ptr); } ~waiter() { // Reset thread - atomic_storage::store(m_ptr->thread, nullptr); - m_thread->unlock(); + m_ptr->thread = nullptr; // Remove waiter - writer_lock{s_mutex}, s_waiters.erase(m_ptr); + writer_lock lock(s_mutex); + s_waiters.erase(m_ptr); } }; @@ -54,23 +53,13 @@ namespace vm bool waiter_base::try_notify() { - const auto _t = atomic_storage::load(thread); - - if (UNLIKELY(!_t)) - { - // Return if thread not found - return false; - } - - // Lock the thread - _t->lock(); + const auto _t = thread.load(); try { // Test predicate - if (UNLIKELY(!thread || !test())) + if (UNLIKELY(!_t || !test())) { - _t->unlock(); return false; } } @@ -81,9 +70,11 @@ namespace vm } // Signal the thread with nullptr - atomic_storage::store(thread, nullptr); - _t->unlock(); - _t->notify(); + if (auto _t = thread.exchange(nullptr)) + { + _t->notify(); + } + return true; } @@ -128,10 +119,10 @@ namespace vm // Poll waiters periodically (TODO) while (notify_all() && !Emu.IsPaused() && !Emu.IsStopped()) { - thread_ctrl::sleep(50); + thread_ctrl::wait_for(50); } - thread_ctrl::sleep(1000); + thread_ctrl::wait_for(1000); } }); } diff --git a/rpcs3/Emu/Memory/wait_engine.h b/rpcs3/Emu/Memory/wait_engine.h index 99b2ed7833..a66fa85593 100644 --- a/rpcs3/Emu/Memory/wait_engine.h +++ b/rpcs3/Emu/Memory/wait_engine.h @@ -1,6 +1,7 @@ #pragma once #include "Utilities/types.h" +#include "Utilities/Atomic.h" class thread_ctrl; @@ -10,7 +11,7 @@ namespace vm { u32 addr; u32 mask; - thread_ctrl* thread{}; + atomic_t thread{}; void initialize(u32 addr, u32 size); bool try_notify(); diff --git a/rpcs3/Emu/PSP2/Modules/sceLibKernel.cpp b/rpcs3/Emu/PSP2/Modules/sceLibKernel.cpp index 8d2aa93e5f..fa3cfe037e 100644 --- a/rpcs3/Emu/PSP2/Modules/sceLibKernel.cpp +++ b/rpcs3/Emu/PSP2/Modules/sceLibKernel.cpp @@ -928,7 +928,7 @@ struct psp2_event_flag final idm::check(cmd.arg, [](auto& cpu) { cpu.state += cpu_flag::signal; - cpu.lock_notify(); + cpu.notify(); }); break; @@ -955,7 +955,7 @@ struct psp2_event_flag final { if (!exec(task::signal, cpu.id)) { - thread_lock{cpu}, thread_ctrl::wait([&] { return cpu.state.test_and_reset(cpu_flag::signal); }); + thread_ctrl::wait([&] { return cpu.state.test_and_reset(cpu_flag::signal); }); } else { @@ -980,7 +980,7 @@ private: cpu.GPR[0] = SCE_KERNEL_ERROR_EVF_MULTI; cpu.GPR[1] = pattern; cpu.state += cpu_flag::signal; - cpu->lock_notify(); + cpu.notify(); return; } @@ -1002,7 +1002,7 @@ private: cpu.GPR[0] = SCE_OK; cpu.GPR[1] = old_pattern; cpu.state += cpu_flag::signal; - cpu->lock_notify(); + cpu.notify(); } else { @@ -1100,7 +1100,7 @@ private: cpu.state += cpu_flag::signal; cpu.owner = nullptr; waiters -= attr & SCE_KERNEL_ATTR_MULTI ? 1 : cpu.id; - cpu->lock_notify(); + cpu.notify(); } } } @@ -1131,7 +1131,7 @@ private: cpu.GPR[1] = _pattern; cpu.state += cpu_flag::signal; cpu.owner = nullptr; - cpu.lock_notify(); + cpu.notify(); } pattern = _pattern; @@ -1268,20 +1268,34 @@ error_code sceKernelWaitEventFlag(ARMv7Thread& cpu, s32 evfId, u32 bitPattern, u // Second chance if (!evf->exec(psp2_event_flag::task::wait, cpu.id) || !cpu.state.test_and_reset(cpu_flag::signal)) { - thread_lock lock(cpu); - - if (!thread_ctrl::wait_for(timeout, [&] { return cpu.state.test_and_reset(cpu_flag::signal); })) + while (!cpu.state.test_and_reset(cpu_flag::signal)) { - if (!evf->exec(psp2_event_flag::task::timeout, cpu.id)) + if (timeout) { - if (!evf->exec(psp2_event_flag::task::signal, cpu.id)) + const u64 passed = get_system_time() - start_time; + + if (passed >= timeout) { - thread_ctrl::wait([&] { return cpu.state.test_and_reset(cpu_flag::signal); }); - } - else - { - cpu.state -= cpu_flag::signal; + if (!evf->exec(psp2_event_flag::task::timeout, cpu.id)) + { + if (!evf->exec(psp2_event_flag::task::signal, cpu.id)) + { + thread_ctrl::wait([&] { return cpu.state.test_and_reset(cpu_flag::signal); }); + } + else + { + cpu.state -= cpu_flag::signal; + } + } + + break; } + + thread_ctrl::wait_for(timeout - passed); + } + else + { + thread_ctrl::wait(); } } } diff --git a/rpcs3/Emu/RSX/RSXThread.cpp b/rpcs3/Emu/RSX/RSXThread.cpp index f8411ca108..40b667291b 100644 --- a/rpcs3/Emu/RSX/RSXThread.cpp +++ b/rpcs3/Emu/RSX/RSXThread.cpp @@ -377,7 +377,7 @@ namespace rsx { ppu_cmd::lle_call, vblank_handler }, }); - intr_thread->lock_notify(); + intr_thread->notify(); } continue; @@ -697,7 +697,7 @@ namespace rsx //void thread::invoke(std::function callback) //{ - // if (operator->() == thread_ctrl::get_current()) + // if (get() == thread_ctrl::get_current()) // { // while (true) // { diff --git a/rpcs3/Emu/RSX/rsx_methods.cpp b/rpcs3/Emu/RSX/rsx_methods.cpp index e0313b51eb..0dfd4d4112 100644 --- a/rpcs3/Emu/RSX/rsx_methods.cpp +++ b/rpcs3/Emu/RSX/rsx_methods.cpp @@ -779,7 +779,7 @@ namespace rsx { ppu_cmd::lle_call, rsx->flip_handler }, }); - rsx->intr_thread->lock_notify(); + rsx->intr_thread->notify(); } } @@ -793,7 +793,7 @@ namespace rsx { ppu_cmd::lle_call, rsx->user_handler }, }); - rsx->intr_thread->lock_notify(); + rsx->intr_thread->notify(); } } diff --git a/rpcs3/Emu/System.cpp b/rpcs3/Emu/System.cpp index c187dfc552..9a3de8db84 100644 --- a/rpcs3/Emu/System.cpp +++ b/rpcs3/Emu/System.cpp @@ -408,11 +408,15 @@ void Emulator::Resume() SendDbgCommand(DID_RESUME_EMU); - idm::select([](u32, cpu_thread& cpu) { - cpu.state -= cpu_flag::dbg_global_pause; - cpu.lock_notify(); - }); + LV2_LOCK; + + idm::select([](u32, cpu_thread& cpu) + { + cpu.state -= cpu_flag::dbg_global_pause; + cpu.notify(); + }); + } rpcs3::on_resume()(); @@ -437,10 +441,8 @@ void Emulator::Stop() idm::select([](u32, cpu_thread& cpu) { cpu.state += cpu_flag::dbg_global_stop; - cpu->lock(); - cpu->set_exception(std::make_exception_ptr(EmulationStopped())); - cpu->unlock(); - cpu->notify(); + cpu.get()->set_exception(std::make_exception_ptr(EmulationStopped())); + cpu.notify(); }); } diff --git a/rpcs3/Gui/InterpreterDisAsm.cpp b/rpcs3/Gui/InterpreterDisAsm.cpp index 55d0b6bd41..d1ad3126de 100644 --- a/rpcs3/Gui/InterpreterDisAsm.cpp +++ b/rpcs3/Gui/InterpreterDisAsm.cpp @@ -440,7 +440,7 @@ void InterpreterDisAsmFrame::DoRun(wxCommandEvent& WXUNUSED(event)) if (cpu && test(cpu->state & cpu_state_pause)) { cpu->state -= cpu_flag::dbg_pause; - (*cpu)->lock_notify(); + cpu->notify(); } } @@ -462,7 +462,7 @@ void InterpreterDisAsmFrame::DoStep(wxCommandEvent& WXUNUSED(event)) state -= cpu_flag::dbg_pause; }))) { - (*cpu)->lock_notify(); + cpu->notify(); } } }