diff --git a/Utilities/SMutex.cpp b/Utilities/SMutex.cpp index 684429b60e..e10296bd68 100644 --- a/Utilities/SMutex.cpp +++ b/Utilities/SMutex.cpp @@ -8,15 +8,3 @@ bool SM_IsAborted() { return Emu.IsStopped(); } - -void SM_Sleep() -{ - if (NamedThreadBase* t = GetCurrentNamedThread()) - { - t->WaitForAnySignal(); - } - else - { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } -} diff --git a/Utilities/SMutex.h b/Utilities/SMutex.h index 88d22d7a68..5d50fe18e1 100644 --- a/Utilities/SMutex.h +++ b/Utilities/SMutex.h @@ -2,7 +2,6 @@ #include "Emu/Memory/atomic_type.h" bool SM_IsAborted(); -void SM_Sleep(); enum SMutexResult { @@ -20,8 +19,7 @@ template < typename T, const u64 free_value = 0, - const u64 dead_value = 0xffffffffffffffffull, - void (*wait)() = SM_Sleep + const u64 dead_value = 0xffffffffffffffffull > class SMutexBase { @@ -118,7 +116,7 @@ public: default: return res; } - if (wait != nullptr) wait(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); if (timeout && counter++ > timeout) { diff --git a/Utilities/SQueue.h b/Utilities/SQueue.h index 83d7adc3dd..7d140e3cdd 100644 --- a/Utilities/SQueue.h +++ b/Utilities/SQueue.h @@ -33,7 +33,7 @@ public: return false; } - SM_Sleep(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); continue; } @@ -60,7 +60,7 @@ public: return false; } - SM_Sleep(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); continue; } @@ -112,7 +112,7 @@ public: break; } - SM_Sleep(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); continue; } diff --git a/Utilities/Thread.cpp b/Utilities/Thread.cpp index f3afef6a42..63f0e83078 100644 --- a/Utilities/Thread.cpp +++ b/Utilities/Thread.cpp @@ -209,63 +209,53 @@ bool thread::joinable() const return m_thr.joinable(); } -struct g_waiter_map_t -{ - // TODO: optimize (use custom lightweight readers-writer lock) - - std::mutex m_mutex; - - struct waiter - { - u64 signal_id; - NamedThreadBase* thread; - }; - - std::vector m_waiters; - -} g_waiter_map; - -bool waiter_is_stopped(const char* func_name, u64 signal_id) +bool waiter_map_t::is_stopped(u64 signal_id) { if (Emu.IsStopped()) { - LOG_WARNING(Log::HLE, "%s() aborted (signal_id=0x%llx)", func_name, signal_id); + LOG_WARNING(Log::HLE, "%s.waiter_op() aborted (signal_id=0x%llx)", m_name.c_str(), signal_id); return true; } return false; } -waiter_reg_t::waiter_reg_t(u64 signal_id) +waiter_map_t::waiter_reg_t::waiter_reg_t(waiter_map_t& map, u64 signal_id) : signal_id(signal_id) , thread(GetCurrentNamedThread()) + , map(map) { - std::lock_guard lock(g_waiter_map.m_mutex); + std::lock_guard lock(map.m_mutex); // add waiter - g_waiter_map.m_waiters.push_back({ signal_id, thread }); + map.m_waiters.push_back({ signal_id, thread }); } -waiter_reg_t::~waiter_reg_t() +waiter_map_t::waiter_reg_t::~waiter_reg_t() { - std::lock_guard lock(g_waiter_map.m_mutex); + std::lock_guard lock(map.m_mutex); // remove waiter - for (size_t i = g_waiter_map.m_waiters.size() - 1; i >= 0; i--) + for (size_t i = map.m_waiters.size() - 1; i >= 0; i--) { - if (g_waiter_map.m_waiters[i].signal_id == signal_id && g_waiter_map.m_waiters[i].thread == thread) + if (map.m_waiters[i].signal_id == signal_id && map.m_waiters[i].thread == thread) { - g_waiter_map.m_waiters.erase(g_waiter_map.m_waiters.begin() + i); + map.m_waiters.erase(map.m_waiters.begin() + i); return; } } + + LOG_ERROR(HLE, "%s(): waiter not found (signal_id=0x%llx, map='%s')", __FUNCTION__, signal_id, map.m_name.c_str()); + Emu.Pause(); } -void waiter_signal(u64 signal_id) +void waiter_map_t::notify(u64 signal_id) { - std::lock_guard lock(g_waiter_map.m_mutex); + if (!m_waiters.size()) return; + + std::lock_guard lock(m_mutex); // find waiter and signal - for (auto& v : g_waiter_map.m_waiters) + for (auto& v : m_waiters) { if (v.signal_id == signal_id) { diff --git a/Utilities/Thread.h b/Utilities/Thread.h index 02f0388e5d..04e478f498 100644 --- a/Utilities/Thread.h +++ b/Utilities/Thread.h @@ -71,35 +71,54 @@ public: bool joinable() const; }; -// for internal use (checks if Emu is stopped) -bool waiter_is_stopped(const char* func_name, u64 signal_id); - -struct waiter_reg_t +class waiter_map_t { - const u64 signal_id; - NamedThreadBase* const thread; + // TODO: optimize (use custom lightweight readers-writer lock) + std::mutex m_mutex; - waiter_reg_t(u64 signal_id); - ~waiter_reg_t(); -}; - -// wait until waiter_func() returns true, signal_id is an arbitrary number -template static __forceinline void waiter_op(const char* func_name, u64 signal_id, const WT waiter_func) -{ - // check condition - if (waiter_func()) return; - - // register waiter - waiter_reg_t waiter(signal_id); - - while (true) + struct waiter_t { - // wait for 1 ms or until signal arrived - waiter.thread->WaitForAnySignal(1); - if (waiter_is_stopped(func_name, signal_id)) break; - if (waiter_func()) break; - } -} + u64 signal_id; + NamedThreadBase* thread; + }; -// signal all threads waiting on waiter_op() with the same signal_id (signaling only hints those threads that corresponding conditions are *probably* met) -void waiter_signal(u64 signal_id); + std::vector m_waiters; + + std::string m_name; + + struct waiter_reg_t + { + const u64 signal_id; + NamedThreadBase* const thread; + waiter_map_t& map; + + waiter_reg_t(waiter_map_t& map, u64 signal_id); + ~waiter_reg_t(); + }; + + bool is_stopped(u64 signal_id); + +public: + waiter_map_t(const char* name) : m_name(name) {} + + // wait until waiter_func() returns true, signal_id is an arbitrary number + template __forceinline void waiter_op(u64 signal_id, const WT waiter_func) + { + // check condition + if (waiter_func()) return; + + // register waiter + waiter_reg_t waiter(*this, signal_id); + + while (true) + { + // wait for 1 ms or until signal arrived + waiter.thread->WaitForAnySignal(1); + if (is_stopped(signal_id)) break; + if (waiter_func()) break; + } + } + + // signal all threads waiting on waiter_op() with the same signal_id (signaling only hints those threads that corresponding conditions are *probably* met) + void notify(u64 signal_id); +}; diff --git a/rpcs3/Emu/CPU/CPUThreadManager.cpp b/rpcs3/Emu/CPU/CPUThreadManager.cpp index af9368f338..b745da0177 100644 --- a/rpcs3/Emu/CPU/CPUThreadManager.cpp +++ b/rpcs3/Emu/CPU/CPUThreadManager.cpp @@ -135,22 +135,6 @@ RawSPUThread* CPUThreadManager::GetRawSPUThread(u32 num) } } -void CPUThreadManager::NotifyThread(const u32 id) -{ - if (!id) return; - - std::lock_guard lock(m_mtx_thread); - - for (u32 i = 0; i < m_threads.size(); i++) - { - if (m_threads[i]->GetId() == id) - { - m_threads[i]->Notify(); - return; - } - } -} - void CPUThreadManager::Exec() { std::lock_guard lock(m_mtx_thread); diff --git a/rpcs3/Emu/CPU/CPUThreadManager.h b/rpcs3/Emu/CPU/CPUThreadManager.h index 480ef940f0..d43a7506c3 100644 --- a/rpcs3/Emu/CPU/CPUThreadManager.h +++ b/rpcs3/Emu/CPU/CPUThreadManager.h @@ -17,7 +17,6 @@ public: CPUThread& AddThread(CPUThreadType type); void RemoveThread(const u32 id); - void NotifyThread(const u32 id); std::vector& GetThreads() { return m_threads; } s32 GetThreadNumById(CPUThreadType type, u32 id); diff --git a/rpcs3/Emu/SysCalls/Modules/cellSync.cpp b/rpcs3/Emu/SysCalls/Modules/cellSync.cpp index 7232898418..e0dd173e8d 100644 --- a/rpcs3/Emu/SysCalls/Modules/cellSync.cpp +++ b/rpcs3/Emu/SysCalls/Modules/cellSync.cpp @@ -16,6 +16,10 @@ u32 libsre; u32 libsre_rtoc; #endif +waiter_map_t sync_mutex_wm("sync_mutex_wm"); +waiter_map_t sync_barrier_wait_wm("sync_barrier_wait_wm"); +waiter_map_t sync_barrier_notify_wm("sync_barrier_notify_wm"); + s32 syncMutexInitialize(vm::ptr mutex) { if (!mutex) @@ -60,7 +64,7 @@ s32 cellSyncMutexLock(vm::ptr mutex) }); // prx: wait until this old value is equal to m_rel - waiter_op(__FUNCTION__, mutex.addr(), [mutex, order]() + sync_mutex_wm.waiter_op(mutex.addr(), [mutex, order]() { return order == mutex->data.read_relaxed().m_rel; }); @@ -112,7 +116,7 @@ s32 cellSyncMutexUnlock(vm::ptr mutex) mutex.m_rel++; }); - waiter_signal(mutex.addr()); + sync_mutex_wm.notify(mutex.addr()); return CELL_OK; } @@ -174,12 +178,12 @@ s32 cellSyncBarrierNotify(vm::ptr barrier) return CELL_SYNC_ERROR_ALIGN; } - waiter_op(__FUNCTION__, barrier.addr(), [barrier]() + sync_barrier_notify_wm.waiter_op(barrier.addr(), [barrier]() { return barrier->data.atomic_op_sync(CELL_OK, syncBarrierTryNotifyOp) == CELL_OK; }); - waiter_signal(barrier.addr() ^ 1); + sync_barrier_wait_wm.notify(barrier.addr()); return CELL_OK; } @@ -201,7 +205,7 @@ s32 cellSyncBarrierTryNotify(vm::ptr barrier) return res; } - waiter_signal(barrier.addr() ^ 1); + sync_barrier_wait_wm.notify(barrier.addr()); return CELL_OK; } @@ -236,12 +240,12 @@ s32 cellSyncBarrierWait(vm::ptr barrier) return CELL_SYNC_ERROR_ALIGN; } - waiter_op(__FUNCTION__, barrier.addr() ^ 1, [barrier]() + sync_barrier_wait_wm.waiter_op(barrier.addr(), [barrier]() { return barrier->data.atomic_op_sync(CELL_OK, syncBarrierTryWaitOp) == CELL_OK; }); - waiter_signal(barrier.addr()); + sync_barrier_notify_wm.notify(barrier.addr()); return CELL_OK; } @@ -263,7 +267,7 @@ s32 cellSyncBarrierTryWait(vm::ptr barrier) return res; } - waiter_signal(barrier.addr()); + sync_barrier_notify_wm.notify(barrier.addr()); return CELL_OK; } diff --git a/rpcs3/Emu/SysCalls/lv2/sys_cond.cpp b/rpcs3/Emu/SysCalls/lv2/sys_cond.cpp index a51b9f977a..3d7f11570a 100644 --- a/rpcs3/Emu/SysCalls/lv2/sys_cond.cpp +++ b/rpcs3/Emu/SysCalls/lv2/sys_cond.cpp @@ -80,9 +80,7 @@ s32 sys_cond_signal(u32 cond_id) if (u32 target = (mutex->protocol == SYS_SYNC_PRIORITY ? cond->m_queue.pop_prio() : cond->m_queue.pop())) { - //cond->signal_stamp = get_system_time(); cond->signal.lock(target); - Emu.GetCPU().NotifyThread(target); if (Emu.IsStopped()) { @@ -108,9 +106,7 @@ s32 sys_cond_signal_all(u32 cond_id) while (u32 target = (mutex->protocol == SYS_SYNC_PRIORITY ? cond->m_queue.pop_prio() : cond->m_queue.pop())) { cond->signaler = GetCurrentPPUThread().GetId(); - //cond->signal_stamp = get_system_time(); cond->signal.lock(target); - Emu.GetCPU().NotifyThread(target); if (Emu.IsStopped()) { @@ -147,9 +143,7 @@ s32 sys_cond_signal_to(u32 cond_id, u32 thread_id) u32 target = thread_id; { - //cond->signal_stamp = get_system_time(); cond->signal.lock(target); - Emu.GetCPU().NotifyThread(target); } if (Emu.IsStopped()) @@ -195,7 +189,6 @@ s32 sys_cond_wait(u32 cond_id, u64 timeout) { if (cond->signal.unlock(tid, tid) == SMR_OK) { - //const u64 stamp2 = get_system_time(); if (SMutexResult res = mutex->m_mutex.trylock(tid)) { if (res != SMR_FAILED) @@ -215,14 +208,11 @@ s32 sys_cond_wait(u32 cond_id, u64 timeout) } } mutex->recursive = 1; - const volatile u64 stamp = cond->signal_stamp; cond->signal.unlock(tid); - Emu.GetCPU().NotifyThread(cond->signaler); - //ConLog.Write("sys_cond_wait(): signal latency %lld (minimum %lld)", get_system_time() - stamp, stamp2 - stamp); return CELL_OK; } - SM_Sleep(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); if (counter++ > max_counter) { diff --git a/rpcs3/Emu/SysCalls/lv2/sys_semaphore.cpp b/rpcs3/Emu/SysCalls/lv2/sys_semaphore.cpp index 7d26df4c05..722d19e4ad 100644 --- a/rpcs3/Emu/SysCalls/lv2/sys_semaphore.cpp +++ b/rpcs3/Emu/SysCalls/lv2/sys_semaphore.cpp @@ -119,11 +119,10 @@ s32 sys_semaphore_wait(u32 sem_id, u64 timeout) continue; } sem->signal = 0; - // TODO: notify signaler return CELL_OK; } - SM_Sleep(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); } } @@ -182,7 +181,7 @@ s32 sys_semaphore_post(u32 sem_id, s32 count) if (sem->signal && sem->m_queue.count()) { - SM_Sleep(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); continue; } @@ -190,7 +189,6 @@ s32 sys_semaphore_post(u32 sem_id, s32 count) { count--; sem->signal = target; - Emu.GetCPU().NotifyThread(target); } else {