From 5248240e104bc2be76ca23e680d7aa24dda0da31 Mon Sep 17 00:00:00 2001 From: Nekotekina Date: Wed, 4 Nov 2020 17:19:35 +0300 Subject: [PATCH] atomic.cpp: improvements. Reduced static memory amount for waitable atomics. Allow notifier to skip notifications if wait/notify masks don't overlap. Improve raw_notify to wake up the thread by its id, add thread_id arg. Add optional mask argument to notify_one() and notify_all(). --- Utilities/Thread.cpp | 9 +- Utilities/sync.h | 1 + rpcs3/Emu/Cell/PPUThread.cpp | 2 +- rpcs3/Emu/Cell/SPUThread.cpp | 6 +- rpcs3/Emu/Memory/vm_reservation.h | 20 +- rpcs3/util/atomic.cpp | 557 ++++++++++++++++++------------ rpcs3/util/atomic.hpp | 72 +++- 7 files changed, 418 insertions(+), 249 deletions(-) diff --git a/Utilities/Thread.cpp b/Utilities/Thread.cpp index a088028bdf..a0ed6d88fd 100644 --- a/Utilities/Thread.cpp +++ b/Utilities/Thread.cpp @@ -1900,14 +1900,15 @@ void thread_base::initialize(void (*error_cb)(), bool(*wait_cb)(const void*)) void thread_base::notify_abort() noexcept { - m_signal.try_inc(); + u64 tid = m_thread.load(); +#ifdef _WIN32 + tid = GetThreadId(reinterpret_cast(tid)); +#endif while (auto ptr = m_state_notifier.load()) { // Since this function is not perfectly implemented, run it in a loop - atomic_storage_futex::raw_notify(ptr); - - if (m_state_notifier.load() == ptr) + if (atomic_storage_futex::raw_notify(ptr, tid)) { break; } diff --git a/Utilities/sync.h b/Utilities/sync.h index bd589827e0..f8d3028b59 100644 --- a/Utilities/sync.h +++ b/Utilities/sync.h @@ -7,6 +7,7 @@ #include "dynamic_library.h" #ifdef _WIN32 +#define NOMINMAX #include #include #elif __linux__ diff --git a/rpcs3/Emu/Cell/PPUThread.cpp b/rpcs3/Emu/Cell/PPUThread.cpp index c4c60cdd5e..8cb5998314 100644 --- a/rpcs3/Emu/Cell/PPUThread.cpp +++ b/rpcs3/Emu/Cell/PPUThread.cpp @@ -1842,7 +1842,7 @@ static bool ppu_store_reservation(ppu_thread& ppu, u32 addr, u64 reg_value) return false; }()) { - res.notify_all(); + res.notify_all(-128); if (addr == ppu.last_faddr) { diff --git a/rpcs3/Emu/Cell/SPUThread.cpp b/rpcs3/Emu/Cell/SPUThread.cpp index cd420ed90d..3ef0155276 100644 --- a/rpcs3/Emu/Cell/SPUThread.cpp +++ b/rpcs3/Emu/Cell/SPUThread.cpp @@ -2090,7 +2090,7 @@ void spu_thread::do_dma_transfer(spu_thread* _this, const spu_mfc_cmd& args, u8* v &= ~wmask; }); - bits->notify_all(); + bits->notify_all(wmask); if (size == size0) { @@ -2588,7 +2588,7 @@ bool spu_thread::do_putllc(const spu_mfc_cmd& args) return success; }()) { - vm::reservation_notifier(addr, 128).notify_all(); + vm::reservation_notifier(addr, 128).notify_all(-128); raddr = 0; perf0.reset(); return true; @@ -2683,7 +2683,7 @@ void spu_thread::do_putlluc(const spu_mfc_cmd& args) } do_cell_atomic_128_store(addr, _ptr(args.lsa & 0x3ff80)); - vm::reservation_notifier(addr, 128).notify_all(); + vm::reservation_notifier(addr, 128).notify_all(-128); } void spu_thread::do_mfc(bool wait) diff --git a/rpcs3/Emu/Memory/vm_reservation.h b/rpcs3/Emu/Memory/vm_reservation.h index be3903421c..73db620ff3 100644 --- a/rpcs3/Emu/Memory/vm_reservation.h +++ b/rpcs3/Emu/Memory/vm_reservation.h @@ -126,7 +126,7 @@ namespace vm _xend(); #endif if constexpr (Ack) - res.notify_all(); + res.notify_all(-128); return; } else @@ -140,7 +140,7 @@ namespace vm _xend(); #endif if constexpr (Ack) - res.notify_all(); + res.notify_all(-128); return result; } else @@ -201,7 +201,7 @@ namespace vm #endif res += 127; if (Ack) - res.notify_all(); + res.notify_all(-128); return; } else @@ -215,7 +215,7 @@ namespace vm #endif res += 127; if (Ack) - res.notify_all(); + res.notify_all(-128); return result; } else @@ -250,7 +250,7 @@ namespace vm }); if constexpr (Ack) - res.notify_all(); + res.notify_all(-128); return; } else @@ -270,7 +270,7 @@ namespace vm }); if (Ack && result) - res.notify_all(); + res.notify_all(-128); return result; } } @@ -287,7 +287,7 @@ namespace vm } if constexpr (Ack) - res.notify_all(); + res.notify_all(-128); return; } else @@ -307,7 +307,7 @@ namespace vm } if (Ack && result) - res.notify_all(); + res.notify_all(-128); return result; } } @@ -399,7 +399,7 @@ namespace vm if constexpr (Ack) { - res.notify_all(); + res.notify_all(-128); } } else @@ -409,7 +409,7 @@ namespace vm if constexpr (Ack) { - res.notify_all(); + res.notify_all(-128); } return result; diff --git a/rpcs3/util/atomic.cpp b/rpcs3/util/atomic.cpp index b0367b1d17..b40d7fef11 100644 --- a/rpcs3/util/atomic.cpp +++ b/rpcs3/util/atomic.cpp @@ -45,12 +45,12 @@ static thread_local bool(*s_tls_wait_cb)(const void* data) = [](const void*){ re static thread_local void(*s_tls_notify_cb)(const void* data, u64 progress) = [](const void*, u64){}; // Compare data in memory with old value, and return true if they are equal -template -static inline bool +template +static NEVER_INLINE bool #ifdef _WIN32 __vectorcall #endif -ptr_cmp(const void* data, std::size_t size, __m128i old128, __m128i mask128) +ptr_cmp(const void* data, u32 size, __m128i old128, __m128i mask128) { if constexpr (CheckCb) { @@ -60,14 +60,6 @@ ptr_cmp(const void* data, std::size_t size, __m128i old128, __m128i mask128) } } - if constexpr (CheckData) - { - if (!data) - { - return false; - } - } - const u64 old_value = _mm_cvtsi128_si64(old128); const u64 mask = _mm_cvtsi128_si64(mask128); @@ -79,7 +71,7 @@ ptr_cmp(const void* data, std::size_t size, __m128i old128, __m128i mask128) case 8: return (reinterpret_cast*>(data)->load() & mask) == (old_value & mask); case 16: { - const auto v0 = _mm_load_si128(reinterpret_cast(data)); + const auto v0 = std::bit_cast<__m128i>(atomic_storage::load(*reinterpret_cast(data))); const auto v1 = _mm_xor_si128(v0, old128); const auto v2 = _mm_and_si128(v1, mask128); const auto v3 = _mm_packs_epi16(v2, v2); @@ -89,29 +81,85 @@ ptr_cmp(const void* data, std::size_t size, __m128i old128, __m128i mask128) return true; } } + default: + { + fprintf(stderr, "ptr_cmp(): bad size (size=%u)" HERE "\n", size); + std::abort(); + } } return false; } -#ifdef USE_STD +// Returns true if mask overlaps, or the argument is invalid +static bool +#ifdef _WIN32 +__vectorcall +#endif +cmp_mask(u32 size1, __m128i mask1, __m128i val1, u32 size2, __m128i mask2, __m128i val2) +{ + // In force wake up, one of the size arguments is zero + const u32 size = std::min(size1, size2); + + if (!size) [[unlikely]] + { + return true; + } + + // Generate masked value inequality bits + const auto v0 = _mm_and_si128(_mm_and_si128(mask1, mask2), _mm_xor_si128(val1, val2)); + + if (size <= 8) + { + // Generate sized mask + const u64 mask = UINT64_MAX >> ((64 - size * 8) & 63); + + if (!(_mm_cvtsi128_si64(v0) & mask)) + { + return false; + } + } + else if (size == 16) + { + if (!_mm_cvtsi128_si64(_mm_packs_epi16(v0, v0))) + { + return false; + } + } + else + { + fprintf(stderr, "cmp_mask(): bad size (size1=%u, size2=%u)" HERE "\n", size1, size2); + std::abort(); + } + + return true; +} + namespace { - // Standard CV/mutex pair + // Essentially a fat semaphore struct cond_handle { +#ifdef _WIN32 + u64 tid = GetCurrentThreadId(); +#else + u64 tid = reinterpret_cast(pthread_self()); +#endif + atomic_t sync{}; + u32 size{}; + __m128i mask{}; + __m128i oldv{}; + +#ifdef USE_STD + // Standard CV/mutex pair (often contains pthread_cond_t/pthread_mutex_t) std::condition_variable cond; std::mutex mtx; - - cond_handle() noexcept - { - mtx.lock(); - } +#endif }; } -// Arbitrary max allowed thread number -static constexpr u32 s_max_conds = 512 * 64; +// Arbitrary max allowed thread number (to fit in 15 bits) +static constexpr u32 s_max_conds = 512 * 64 - 1; static std::aligned_storage_t s_cond_list[s_max_conds]{}; @@ -183,8 +231,8 @@ static void cond_free(u32 cond_id) { if (cond_id - 1 >= s_max_conds) { - // Ignore bad id because it may contain notifier lock - return; + fprintf(stderr, "cond_free(): bad id %u" HERE "\n", cond_id); + std::abort(); } // Call the destructor @@ -196,38 +244,27 @@ static void cond_free(u32 cond_id) // Release the semaphore s_cond_sema--; } -#endif namespace { - struct sync_var + struct alignas(128) sync_var { constexpr sync_var() noexcept = default; // Reference counter, owning pointer, collision bit and optionally selected slot atomic_t addr_ref{}; - // Allocated semaphore bits (max 60) + // Allocated semaphore bits (max 56, to make total size 128) atomic_t sema_bits{}; // Semaphores (one per thread), data is platform-specific but 0 means empty - atomic_t sema_data[60]{}; + atomic_t sema_data[56]{}; - atomic_t* sema_alloc() + atomic_t* sema_alloc() { -#ifdef USE_STD - const u32 cond_id = cond_alloc(); - - if (cond_id == 0) - { - // Too many threads - return nullptr; - } -#endif - const auto [bits, ok] = sema_bits.fetch_op([](u64& bits) { - if (bits + 1 < (1ull << 60)) + if (bits + 1 < (1ull << 56)) { // Set lowest clear bit bits |= bits + 1; @@ -240,50 +277,40 @@ namespace if (ok) [[likely]] { // Find lowest clear bit - const auto sema = &sema_data[std::countr_one(bits)]; - -#if defined(USE_STD) - sema->release(cond_id); -#elif defined(USE_FUTEX) - sema->release(1); -#elif defined(_WIN32) - if (NtWaitForAlertByThreadId) - { - sema->release(GetCurrentThreadId()); - } - else - { - sema->release(1); - } -#endif - - return sema; + return &sema_data[std::countr_one(bits)]; } return nullptr; } - void sema_free(atomic_t* sema) + void sema_free(atomic_t* sema) { if (sema < sema_data || sema >= std::end(sema_data)) { std::abort(); } - // Clear sema -#ifdef USE_STD - cond_free(sema->exchange(0)); -#else - sema->release(0); -#endif + const u32 cond_id = sema->fetch_and(0x8000); + + if (!cond_id || cond_id >> 15) + { + // Delegated cleanup + return; + } + + // Free + cond_free(cond_id); + // Clear sema bit sema_bits &= ~(1ull << (sema - sema_data)); } }; + + static_assert(sizeof(sync_var) == 128); } // Main hashtable for atomic wait. -alignas(64) static sync_var s_hashtable[s_hashtable_size]{}; +alignas(128) static sync_var s_hashtable[s_hashtable_size]{}; namespace { @@ -300,7 +327,7 @@ namespace static constexpr u32 s_slot_gcount = (s_hashtable_power > 7 ? 4096 : 256) / 64; // Array of slot branch objects -alignas(64) static slot_info s_slot_list[s_slot_gcount * 64]{}; +alignas(128) static slot_info s_slot_list[s_slot_gcount * 64]{}; // Allocation bits static atomic_t s_slot_bits[s_slot_gcount]{}; @@ -434,7 +461,7 @@ SAFE_BUFFERS void #ifdef _WIN32 __vectorcall #endif -atomic_storage_futex::wait(const void* data, std::size_t size, __m128i old_value, u64 timeout, __m128i mask) +atomic_storage_futex::wait(const void* data, u32 size, __m128i old_value, u64 timeout, __m128i mask) { const std::uintptr_t iptr = reinterpret_cast(data); @@ -531,10 +558,12 @@ atomic_storage_futex::wait(const void* data, std::size_t size, __m128i old_value lv = eq_bits + 1; } -#ifdef _WIN32 - // May be used by NtWaitForAlertByThreadId - u32 thread_id[16]{GetCurrentThreadId()}; -#endif + const u32 cond_id = cond_alloc(); + + if (cond_id == 0) + { + fmt::raw_error("Thread limit (32767) reached in atomic wait."); + } auto sema = slot->sema_alloc(); @@ -542,6 +571,7 @@ atomic_storage_futex::wait(const void* data, std::size_t size, __m128i old_value { if (timeout + 1 || ptr_cmp(data, size, old_value, mask)) { + cond_free(cond_id); slot_free(iptr, &s_hashtable[iptr % s_hashtable_size]); return; } @@ -551,9 +581,20 @@ atomic_storage_futex::wait(const void* data, std::size_t size, __m128i old_value sema = slot->sema_alloc(); } + // Save for notifiers + const auto cond = cond_get(cond_id); + + // Store some info for notifiers + cond->size = size; + cond->mask = mask; + cond->oldv = old_value; + + cond->sync = 1; + sema->release(cond_id); + #ifdef USE_STD - // Create mutex for condition variable (already locked) - std::unique_lock lock(cond_get(sema->load() & 0x7fffffff)->mtx, std::adopt_lock); + // Lock mutex + std::unique_lock lock(cond->mtx); #endif // Can skip unqueue process if true @@ -563,40 +604,41 @@ atomic_storage_futex::wait(const void* data, std::size_t size, __m128i old_value bool fallback = false; #endif - while (ptr_cmp(data, size, old_value, mask)) + while (ptr_cmp(data, size, old_value, mask) && cond->sync != 3) { #ifdef USE_FUTEX struct timespec ts; ts.tv_sec = timeout / 1'000'000'000; ts.tv_nsec = timeout % 1'000'000'000; - if (sema->load() > 1) [[unlikely]] + if (cond->sync.load() > 1) [[unlikely]] { // Signaled prematurely - sema->release(1); - } - else - { - futex(sema, FUTEX_WAIT_PRIVATE, 1, timeout + 1 ? &ts : nullptr); - } -#elif defined(USE_STD) - const u32 val = sema->load(); - - if (val >> 31) - { - // Locked by notifier - if (!ptr_cmp(data, size, old_value, mask)) + if (cond->sync.load() == 3 || !cond->sync.compare_and_swap_test(2, 1)) { break; } } - else if (timeout + 1) + else { - cond_get(val)->cond.wait_for(lock, std::chrono::nanoseconds(timeout)); + futex(&cond->sync, FUTEX_WAIT_PRIVATE, 1, timeout + 1 ? &ts : nullptr); + } +#elif defined(USE_STD) + if (cond->sync.load() > 1) [[unlikely]] + { + if (cond->sync.load() == 3 || !cond->sync.compare_and_swap_test(2, 1)) + { + break; + } + } + + if (timeout + 1) + { + cond->cond.wait_for(lock, std::chrono::nanoseconds(timeout)); } else { - cond_get(val)->cond.wait(lock); + cond->cond.wait(lock); } #elif defined(_WIN32) LARGE_INTEGER qw; @@ -608,21 +650,20 @@ atomic_storage_futex::wait(const void* data, std::size_t size, __m128i old_value qw.QuadPart -= 1; } - if (NtWaitForAlertByThreadId) + if (fallback) [[unlikely]] { - if (fallback) [[unlikely]] + if (!cond->sync.compare_and_swap_test(2, 1)) { - // Restart waiting - if (sema->load() == umax) - { - sema->release(thread_id[0]); - } - fallback = false; + break; } - // Let's assume it can return spuriously - switch (DWORD status = NtWaitForAlertByThreadId(thread_id, timeout + 1 ? &qw : nullptr)) + fallback = false; + } + + if (NtWaitForAlertByThreadId) + { + switch (DWORD status = NtWaitForAlertByThreadId(cond, timeout + 1 ? &qw : nullptr)) { case NTSTATUS_ALERTED: fallback = true; break; case NTSTATUS_TIMEOUT: break; @@ -635,15 +676,7 @@ atomic_storage_futex::wait(const void* data, std::size_t size, __m128i old_value } else { - if (fallback) - { - // Restart waiting - verify(HERE), sema->load() == 2; - sema->release(1); - fallback = false; - } - - if (!NtWaitForKeyedEvent(nullptr, sema, false, timeout + 1 ? &qw : nullptr)) + if (NtWaitForKeyedEvent(nullptr, sema, false, timeout + 1 ? &qw : nullptr) == NTSTATUS_SUCCESS) { // Error code assumed to be timeout fallback = true; @@ -663,14 +696,15 @@ atomic_storage_futex::wait(const void* data, std::size_t size, __m128i old_value #if defined(_WIN32) static LARGE_INTEGER instant{}; + if (cond->sync.compare_and_swap_test(1, 2)) + { + // Succeeded in self-notifying + break; + } + if (NtWaitForAlertByThreadId) { - if (sema->compare_and_swap_test(thread_id[0], -1)) - { - break; - } - - if (NtWaitForAlertByThreadId(thread_id, &instant) == NTSTATUS_ALERTED) + if (NtWaitForAlertByThreadId(cond, &instant) == NTSTATUS_ALERTED) { break; } @@ -678,26 +712,21 @@ atomic_storage_futex::wait(const void* data, std::size_t size, __m128i old_value continue; } - if (sema->compare_and_swap_test(1, 2)) - { - // Succeeded in self-notifying - break; - } - if (!NtWaitForKeyedEvent(nullptr, sema, false, &instant)) { // Succeeded in obtaining an event without waiting break; } + + continue; #endif } -#ifdef _WIN32 - verify(HERE), thread_id[0] == GetCurrentThreadId(); -#endif - #ifdef USE_STD - lock.unlock(); + if (lock) + { + lock.unlock(); + } #endif slot->sema_free(sema); @@ -708,105 +737,98 @@ atomic_storage_futex::wait(const void* data, std::size_t size, __m128i old_value } // Platform specific wake-up function -static inline bool alert_sema(atomic_t* sema, const void* data, u64 progress) +static NEVER_INLINE bool +#ifdef _WIN32 +__vectorcall +#endif +alert_sema(atomic_t* sema, const void* data, u64 info, u32 size, __m128i mask, __m128i new_value) { -#ifdef USE_FUTEX - if (sema->load() == 1 && sema->compare_and_swap_test(1, 2)) + auto [cond_id, ok] = sema->fetch_op([](u16& id) { - if (!progress) + // Check if not zero and not locked + if (!id || id & 0x8000) { - // Imminent notification - s_tls_notify_cb(data, 0); + return false; } - // Use "wake all" arg for robustness, only 1 thread is expected - futex(sema, FUTEX_WAKE_PRIVATE, 0x7fff'ffff); + // Dirty optimization: prevent attempting to lock dead or uninitialized sync vars + u32 sync_var = 0; + std::memcpy(&sync_var, reinterpret_cast(s_cond_list) + (sizeof(cond_handle) * (id - 1) + offsetof(cond_handle, sync)), sizeof(sync_var)); + + if (!sync_var) + { + return false; + } + + // Set notify lock + id |= 0x8000; return true; - } -#elif defined(USE_STD) - // Check if not zero and not locked - u32 old_val = sema->load(); + }); - if (((old_val - 1) >> 31) == 0) + if (!ok) [[unlikely]] { - const auto [cond_id, ok] = sema->fetch_op([](u32& id) - { - if ((id - 1) >> 31) - { - return false; - } - - // Set notify lock - id |= 1u << 31; - return true; - }); - - if (ok) - { - if (auto cond = cond_get(cond_id)) - { - if (!progress) - { - // Imminent notification - s_tls_notify_cb(data, 0); - } - - // Not super efficient: locking is required to avoid lost notifications - cond->mtx.lock(); - cond->mtx.unlock(); - cond->cond.notify_all(); - - // Try to remove notifier lock gracefully - if (!sema->compare_and_swap_test(cond_id | (1u << 31), cond_id)) [[unlikely]] - { - // Cleanup helping - cond_free(cond_id); - return false; - } - - return true; - } - } - } -#elif defined(_WIN32) - if (NtWaitForAlertByThreadId) - { - u32 tid = sema->load(); - - // Check if tid is neither 0 nor -1 - if (tid + 1 > 1 && sema->compare_and_swap_test(tid, -1)) - { - if (!progress) - { - // Imminent notification - s_tls_notify_cb(data, 0); - } - - if (NtAlertThreadByThreadId(tid) == NTSTATUS_SUCCESS) - { - // Could be some dead thread otherwise - return true; - } - } - return false; } - if (sema->load() == 1 && sema->compare_and_swap_test(1, 2)) + const auto cond = cond_get(cond_id); + + ok = false; + + if (cond && cond->sync && (!size ? (!info || cond->tid == info) : cmp_mask(size, mask, new_value, cond->size, cond->mask, cond->oldv))) { - if (!progress) + if ((!size && cond->sync.exchange(3) == 1) || (size && cond->sync.load() == 1 && cond->sync.compare_and_swap_test(1, 2))) { // Imminent notification - s_tls_notify_cb(data, 0); - } + if (!size || !info) + { + s_tls_notify_cb(data, 0); + } - // Can wait in rare cases, which is its annoying weakness - NtReleaseKeyedEvent(nullptr, sema, 1, nullptr); - return true; - } +#ifdef USE_FUTEX + // Use "wake all" arg for robustness, only 1 thread is expected + futex(&cond->sync, FUTEX_WAKE_PRIVATE, 0x7fff'ffff); + ok = true; +#elif defined(USE_STD) + // Not super efficient: locking is required to avoid lost notifications + cond->mtx.lock(); + cond->mtx.unlock(); + cond->cond.notify_all(); + ok = true; +#elif defined(_WIN32) + if (NtWaitForAlertByThreadId) + { + if (NtAlertThreadByThreadId(cond->tid) == NTSTATUS_SUCCESS) + { + // Could be some dead thread otherwise + ok = true; + } + } + else + { + // Can wait in rare cases, which is its annoying weakness + if (NtReleaseKeyedEvent(nullptr, sema, 1, nullptr) == NTSTATUS_SUCCESS) + { + // Can't fail + ok = true; + } + } #endif + } + } - return false; + // Remove lock, check if cond_id is already removed (leaving only 0x8000) + if (sema->fetch_and(0x7fff) == 0x8000) + { + cond_free(cond_id); + + // Cleanup, a little hacky obtainment of the host variable + const auto slot = std::launder(reinterpret_cast(reinterpret_cast(sema) & -128)); + + // Remove slot bit + slot->sema_bits &= ~(1ull << (sema - slot->sema_data)); + } + + return ok; } void atomic_storage_futex::set_wait_callback(bool(*cb)(const void* data)) @@ -833,15 +855,47 @@ void atomic_storage_futex::set_notify_callback(void(*cb)(const void*, u64)) } } -void atomic_storage_futex::raw_notify(const void* data) +bool atomic_storage_futex::raw_notify(const void* data, u64 thread_id) { - if (data) + const std::uintptr_t iptr = reinterpret_cast(data); + + const auto slot = slot_get(iptr, &s_hashtable[(iptr) % s_hashtable_size]); + + if (!slot) { - notify_all(data); + return false; } + + u64 progress = 0; + + for (u64 bits = slot->sema_bits.load(); bits; bits &= bits - 1) + { + const auto sema = &slot->sema_data[std::countr_zero(bits)]; + + // Forced notification + if (alert_sema(sema, data, thread_id, 0, _mm_setzero_si128(), _mm_setzero_si128())) + { + s_tls_notify_cb(data, ++progress); + + if (thread_id == 0) + { + // Works like notify_all in this case + continue; + } + + break; + } + } + + s_tls_notify_cb(data, -1); + return progress != 0; } -void atomic_storage_futex::notify_one(const void* data) +void +#ifdef _WIN32 +__vectorcall +#endif +atomic_storage_futex::notify_one(const void* data, u32 size, __m128i mask, __m128i new_value) { const std::uintptr_t iptr = reinterpret_cast(data); @@ -858,7 +912,7 @@ void atomic_storage_futex::notify_one(const void* data) { const auto sema = &slot->sema_data[std::countr_zero(bits)]; - if (alert_sema(sema, data, progress)) + if (alert_sema(sema, data, progress, size, mask, new_value)) { s_tls_notify_cb(data, ++progress); break; @@ -868,7 +922,11 @@ void atomic_storage_futex::notify_one(const void* data) s_tls_notify_cb(data, -1); } -void atomic_storage_futex::notify_all(const void* data) +void +#ifdef _WIN32 +__vectorcall +#endif +atomic_storage_futex::notify_all(const void* data, u32 size, __m128i mask, __m128i new_value) { const std::uintptr_t iptr = reinterpret_cast(data); @@ -881,13 +939,15 @@ void atomic_storage_futex::notify_all(const void* data) u64 progress = 0; -#if defined(_WIN32) && !defined(USE_FUTEX) +#if defined(_WIN32) && !defined(USE_FUTEX) && !defined(USE_STD) + // Special path for Windows 7 if (!NtAlertThreadByThreadId) { // Make a copy to filter out waiters that fail some checks u64 copy = slot->sema_bits.load(); + u64 lock = 0; + u32 lock_ids[56]{}; - // Used for making non-blocking syscall static LARGE_INTEGER instant{}; for (u64 bits = copy; bits; bits &= bits - 1) @@ -896,16 +956,46 @@ void atomic_storage_futex::notify_all(const void* data) const auto sema = &slot->sema_data[id]; - if (sema->load() == 1 && sema->compare_and_swap_test(1, 2)) + auto [cond_id, ok] = sema->fetch_op([](u16& id) { - // Waiters locked for notification - if (bits == copy) + if (!id || id & 0x8000) { - // Notify imminent notification - s_tls_notify_cb(data, 0); + return false; } - continue; + u32 sync_var = 0; + std::memcpy(&sync_var, reinterpret_cast(s_cond_list) + (sizeof(cond_handle) * (id - 1) + offsetof(cond_handle, sync)), sizeof(sync_var)); + + if (!sync_var) + { + return false; + } + + // Set notify lock + id |= 0x8000; + return true; + }); + + if (ok) + { + // Add lock bit for cleanup + lock |= 1ull << id; + lock_ids[id] = cond_id; + + const auto cond = cond_get(cond_id); + + if (cond && cond->sync && cmp_mask(size, mask, new_value, cond->size, cond->mask, cond->oldv)) + { + if (cond->sync.load() == 1 && cond->sync.compare_and_swap_test(1, 2)) + { + if (bits == copy) + { + s_tls_notify_cb(data, 0); + } + + continue; + } + } } // Remove the bit from next stage @@ -921,7 +1011,7 @@ void atomic_storage_futex::notify_all(const void* data) const auto sema = &slot->sema_data[id]; - if (NtReleaseKeyedEvent(nullptr, sema, 1, &instant)) + if (NtReleaseKeyedEvent(nullptr, sema, 1, &instant) != NTSTATUS_SUCCESS) { // Failed to notify immediately continue; @@ -941,6 +1031,23 @@ void atomic_storage_futex::notify_all(const void* data) s_tls_notify_cb(data, ++progress); } + // Cleanup locked notifiers + for (u64 bits = lock; bits; bits &= bits - 1) + { + const u32 id = std::countr_zero(bits); + + const auto sema = &slot->sema_data[id]; + + if (sema->fetch_and(0x7fff) == 0x8000) + { + const u32 id = std::countr_zero(bits); + + cond_free(lock_ids[id]); + + slot->sema_bits &= ~(1ull << id); + } + } + s_tls_notify_cb(data, -1); return; } @@ -950,7 +1057,7 @@ void atomic_storage_futex::notify_all(const void* data) { const auto sema = &slot->sema_data[std::countr_zero(bits)]; - if (alert_sema(sema, data, progress)) + if (alert_sema(sema, data, progress, size, mask, new_value)) { s_tls_notify_cb(data, ++progress); continue; diff --git a/rpcs3/util/atomic.hpp b/rpcs3/util/atomic.hpp index a01095a4e7..bce593d930 100644 --- a/rpcs3/util/atomic.hpp +++ b/rpcs3/util/atomic.hpp @@ -25,14 +25,24 @@ private: #ifdef _WIN32 __vectorcall #endif - wait(const void* data, std::size_t size, __m128i old128, u64 timeout, __m128i mask128); - static void notify_one(const void* data); - static void notify_all(const void* data); + wait(const void* data, u32 size, __m128i old128, u64 timeout, __m128i mask128); + + static void +#ifdef _WIN32 + __vectorcall +#endif + notify_one(const void* data, u32 size, __m128i mask128, __m128i val128); + + static void +#ifdef _WIN32 + __vectorcall +#endif + notify_all(const void* data, u32 size, __m128i mask128, __m128i val128); public: static void set_wait_callback(bool(*cb)(const void* data)); static void set_notify_callback(void(*cb)(const void* data, u64 progress)); - static void raw_notify(const void* data); + static bool raw_notify(const void* data, u64 thread_id = 0); }; // Helper class, provides access to compiler-specific atomic intrinsics @@ -1260,12 +1270,62 @@ public: void notify_one() noexcept { - atomic_storage_futex::notify_one(&m_data); + if constexpr (sizeof(T) <= 8) + { + const __m128i _new = _mm_cvtsi64_si128(std::bit_cast>(load())); + atomic_storage_futex::notify_one(&m_data, sizeof(T), _mm_set1_epi64x(-1), _new); + } + else if constexpr (sizeof(T) == 16) + { + const __m128i _new = std::bit_cast<__m128i>(load()); + atomic_storage_futex::notify_one(&m_data, sizeof(T), _mm_set1_epi64x(-1), _new); + } + } + + void notify_one(type mask_value) noexcept + { + if constexpr (sizeof(T) <= 8) + { + const __m128i mask = _mm_cvtsi64_si128(std::bit_cast>(mask_value)); + const __m128i _new = _mm_cvtsi64_si128(std::bit_cast>(load())); + atomic_storage_futex::notify_one(&m_data, sizeof(T), mask, _new); + } + else if constexpr (sizeof(T) == 16) + { + const __m128i mask = std::bit_cast<__m128i>(mask_value); + const __m128i _new = std::bit_cast<__m128i>(load()); + atomic_storage_futex::notify_one(&m_data, sizeof(T), mask, _new); + } } void notify_all() noexcept { - atomic_storage_futex::notify_all(&m_data); + if constexpr (sizeof(T) <= 8) + { + const __m128i _new = _mm_cvtsi64_si128(std::bit_cast>(load())); + atomic_storage_futex::notify_all(&m_data, sizeof(T), _mm_set1_epi64x(-1), _new); + } + else if constexpr (sizeof(T) == 16) + { + const __m128i _new = std::bit_cast<__m128i>(load()); + atomic_storage_futex::notify_all(&m_data, sizeof(T), _mm_set1_epi64x(-1), _new); + } + } + + void notify_all(type mask_value) noexcept + { + if constexpr (sizeof(T) <= 8) + { + const __m128i mask = _mm_cvtsi64_si128(std::bit_cast>(mask_value)); + const __m128i _new = _mm_cvtsi64_si128(std::bit_cast>(load())); + atomic_storage_futex::notify_all(&m_data, sizeof(T), mask, _new); + } + else if constexpr (sizeof(T) == 16) + { + const __m128i mask = std::bit_cast<__m128i>(mask_value); + const __m128i _new = std::bit_cast<__m128i>(load()); + atomic_storage_futex::notify_all(&m_data, sizeof(T), mask, _new); + } } };