From 9710473a2e3e006e659cef4601bd8b5db1036fc4 Mon Sep 17 00:00:00 2001 From: Nekotekina Date: Sun, 22 Sep 2019 01:31:23 +0300 Subject: [PATCH] atomic.hpp: use native semaphores on Windows Windows: drop keyed events Linux: keep using native futex Implement unused POSIX semaphore path Implement fallback semaphore with pure std (OSX, BSD, etc) --- Utilities/sync.h | 7 +- rpcs3/util/atomic.cpp | 601 ++++++++++++++++++++++++++++++++++-------- 2 files changed, 495 insertions(+), 113 deletions(-) diff --git a/Utilities/sync.h b/Utilities/sync.h index caed15337a..79abc8bbf4 100644 --- a/Utilities/sync.h +++ b/Utilities/sync.h @@ -15,7 +15,6 @@ #include #include #include -#else #endif #include #include @@ -25,12 +24,8 @@ #include #ifdef _WIN32 -DYNAMIC_IMPORT("ntdll.dll", NtWaitForKeyedEvent, NTSTATUS(HANDLE Handle, PVOID Key, BOOLEAN Alertable, PLARGE_INTEGER Timeout)); -DYNAMIC_IMPORT("ntdll.dll", NtReleaseKeyedEvent, NTSTATUS(HANDLE Handle, PVOID Key, BOOLEAN Alertable, PLARGE_INTEGER Timeout)); +DYNAMIC_IMPORT("ntdll.dll", NtWaitForSingleObject, NTSTATUS(HANDLE Handle, BOOLEAN Alertable, PLARGE_INTEGER Timeout)); DYNAMIC_IMPORT("ntdll.dll", NtDelayExecution, NTSTATUS(BOOLEAN Alertable, PLARGE_INTEGER DelayInterval)); -inline utils::dynamic_import OptWaitOnAddress("kernel32.dll", "WaitOnAddress"); -inline utils::dynamic_import OptWakeByAddressSingle("kernel32.dll", "WakeByAddressSingle"); -inline utils::dynamic_import OptWakeByAddressAll("kernel32.dll", "WakeByAddressAll"); #endif #ifndef __linux__ diff --git a/rpcs3/util/atomic.cpp b/rpcs3/util/atomic.cpp index a12127614b..46fcf7ad1d 100644 --- a/rpcs3/util/atomic.cpp +++ b/rpcs3/util/atomic.cpp @@ -1,11 +1,24 @@ #include "atomic.hpp" +// USE_FUTEX takes precedence over USE_POSIX + +#ifdef __linux__ +#define USE_FUTEX +#define USE_POSIX +#endif + #include "Utilities/sync.h" #include "Utilities/asm.h" +#ifdef USE_POSIX +#include +#endif + #include #include #include +#include +#include // Total number of entries, should be a power of 2. static constexpr std::uintptr_t s_hashtable_size = 1u << 22; @@ -22,8 +35,18 @@ static constexpr u64 s_waiter_mask = 0x7fff'0000'0000'0000; // static constexpr u64 s_collision_bit = 0x8000'0000'0000'0000; +#ifdef USE_FUTEX +static constexpr u64 s_sema_mask = 0; +#else +// Number of search groups (defines max semaphore count as gcount * 64) +static constexpr u32 s_sema_gcount = 64; + +// Bits encoding allocated semaphore index (zero = not allocated yet) +static constexpr u64 s_sema_mask = (64 * s_sema_gcount - 1) << 2; +#endif + // Implementation detail (remaining bits out of 32 available for futex) -static constexpr u64 s_signal_mask = 0xffffffff & ~(s_waiter_mask | s_pointer_mask | s_collision_bit); +static constexpr u64 s_signal_mask = 0xffffffff & ~(s_waiter_mask | s_pointer_mask | s_collision_bit | s_sema_mask); // Callback for wait() function, returns false if wait should return static thread_local bool(*s_tls_wait_cb)(const void* data) = [](const void*) @@ -31,6 +54,140 @@ static thread_local bool(*s_tls_wait_cb)(const void* data) = [](const void*) return true; }; +#ifndef USE_FUTEX + +#ifdef USE_POSIX +using sema_handle = sem_t; +#elif defined(_WIN32) +using sema_handle = HANDLE; +#else +namespace +{ + struct dumb_sema + { + u64 count = 0; + std::mutex mutex; + std::condition_variable cond; + }; +} + +using sema_handle = std::unique_ptr; +#endif + +// Array of native semaphores +static sema_handle s_sema_list[64 * s_sema_gcount]{}; + +// Array of associated reference counters +static atomic_t s_sema_refs[64 * s_sema_gcount]{}; + +// Allocation bits (reserve first bit) +static atomic_t s_sema_bits[s_sema_gcount]{1}; + +static u32 sema_alloc() +{ + // Diversify search start points to reduce contention and increase immediate success chance +#ifdef _WIN32 + const u32 start = GetCurrentProcessorNumber(); +#elif __linux__ + const u32 start = sched_getcpu(); +#else + const u32 start = __rdtsc(); +#endif + + for (u32 i = 0; i < s_sema_gcount * 3; i++) + { + const u32 group = (i + start) % s_sema_gcount; + + const auto [bits, ok] = s_sema_bits[group].fetch_op([](u64& bits) + { + if (~bits) + { + // Set lowest clear bit + bits |= bits + 1; + return true; + } + + return false; + }); + + if (ok) + { + // Find lowest clear bit + const u32 id = group * 64 + utils::cnttz64(~bits, false); + +#ifdef USE_POSIX + // Initialize semaphore (should be very fast) + sem_init(&s_sema_list[id], 0, 0); +#elif defined(_WIN32) + if (!s_sema_list[id]) + { + s_sema_list[id] = CreateSemaphoreW(nullptr, 0, 0x7fff'ffff, nullptr); + } +#else + if (!s_sema_list[id]) + { + s_sema_list[id] = std::make_unique(); + } +#endif + + // Initialize ref counter + if (s_sema_refs[id]++) + { + std::abort(); + } + + return id; + } + } + + return 0; +} + +static void sema_free(u32 id) +{ + if (id && id < 64 * s_sema_gcount) + { + // Dereference first + if (--s_sema_refs[id]) + { + return; + } + +#ifdef USE_POSIX + // Destroy semaphore (should be very fast) + sem_destroy(&s_sema_list[id]); +#else + // No action required +#endif + + // Reset allocation bit + s_sema_bits[id / 64] &= ~(1ull << (id % 64)); + } +} + +static bool sema_get(u32 id) +{ + if (id && id < 64 * s_sema_gcount) + { + // Increment only if the semaphore is allocated + if (s_sema_refs[id].fetch_op([](u64& refs) + { + if (refs) + { + // Increase reference from non-zero value + refs++; + } + })) + { + return true; + } + } + + return false; +} + +#endif + static inline bool ptr_cmp(const void* data, std::size_t size, u64 old_value, u64 mask) { switch (size) @@ -151,25 +308,6 @@ namespace } } -#if !defined(_WIN32) && !defined(__linux__) - -void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_value, u64 timeout, u64 mask) -{ - fallback_wait(data, size, old_value, timeout, mask); -} - -void atomic_storage_futex::notify_one(const void* data) -{ - fallback_notify_one(data); -} - -void atomic_storage_futex::notify_all(const void* data) -{ - fallback_notify_all(data); -} - -#else - void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_value, u64 timeout, u64 mask) { if (!timeout) @@ -185,6 +323,8 @@ void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_valu bool fallback = false; + u32 sema_id = -1; + const auto [_, ok] = entry.fetch_op([&](u64& value) { if ((value & s_waiter_mask) == s_waiter_mask || (value & s_signal_mask) == s_signal_mask) @@ -193,15 +333,15 @@ void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_valu return false; } +#ifndef USE_FUTEX + sema_id = (value & s_sema_mask) >> 2; +#endif + if (!value || (value & s_pointer_mask) == (iptr & s_pointer_mask)) { // Store pointer bits value |= (iptr & s_pointer_mask); fallback = false; - -#ifdef _WIN32 - value += s_signal_mask & -s_signal_mask; -#endif } else { @@ -221,13 +361,93 @@ void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_valu return; } +#ifndef USE_FUTEX + for (u32 loop_count = 0; !fallback && loop_count < 7; loop_count++) + { + // Try to allocate a semaphore + if (!sema_id) + { + const u32 sema = sema_alloc(); + + if (!sema) + { + break; + } + + sema_id = entry.atomic_op([&](u64& value) -> u32 + { + if (value & s_sema_mask) + { + return (value & s_sema_mask) >> 2; + } + + // Insert allocated semaphore + value += s_signal_mask & -s_signal_mask; + value |= (u64{sema} << 2); + return 0; + }); + + if (sema_id) + { + // Drop unnecessary allocation + sema_free(sema); + } + else + { + sema_id = sema; + break; + } + } + + if (!sema_get(sema_id)) + { + sema_id = 0; + continue; + } + + // Try to increment sig (check semaphore validity) + const auto [_old, ok] = entry.fetch_op([&](u64& value) + { + if ((value & s_signal_mask) == s_signal_mask) + { + return false; + } + + if ((value & s_sema_mask) >> 2 != sema_id) + { + return false; + } + + value += s_signal_mask & -s_signal_mask; + return true; + }); + + if (!ok) + { + sema_free(sema_id); + sema_id = 0; + + if ((_old & s_signal_mask) == s_signal_mask) + { + // Break on signal overflow + break; + } + + continue; + } + + break; + } +#endif + if (fallback) { fallback_wait(data, size, old_value, timeout, mask); } - else if (ptr_cmp(data, size, old_value, mask) && s_tls_wait_cb(data)) + else if (sema_id && ptr_cmp(data, size, old_value, mask) && s_tls_wait_cb(data)) { -#ifdef _WIN32 +#ifndef USE_FUTEX +#if defined(_WIN32) && !defined(USE_POSIX) LARGE_INTEGER qw; qw.QuadPart = -static_cast(timeout / 100); @@ -237,12 +457,59 @@ void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_valu qw.QuadPart -= 1; } - if (!NtWaitForKeyedEvent(nullptr, &entry, false, timeout + 1 ? &qw : nullptr)) + if (!NtWaitForSingleObject(s_sema_list[sema_id], false, timeout + 1 ? &qw : nullptr)) { - // Return if no errors, continue if timed out - s_tls_wait_cb(nullptr); - return; + fallback = true; } +#elif defined(USE_POSIX) + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += timeout / 1'000'000'000; + ts.tv_nsec += timeout % 1'000'000'000; + ts.tv_sec += ts.tv_nsec / 1'000'000'000; + ts.tv_nsec %= 1'000'000'000; + + // It's pretty unreliable because it uses absolute time, which may jump backwards. Sigh. + if (timeout + 1) + { + if (sem_timedwait(&s_sema_list[sema_id], &ts) == 0) + { + fallback = true; + } + } + else + { + if (sem_wait(&s_sema_list[sema_id]) == 0) + { + fallback = true; + } + } +#else + dumb_sema& sema = *s_sema_list[sema_id]; + + std::unique_lock lock(sema.mutex); + + if (timeout + 1) + { + sema.cond.wait_for(lock, std::chrono::nanoseconds(timeout), [&] + { + return sema.count > 0; + }); + } + else + { + sema.cond.wait(lock, [&] + { + return sema.count > 0; + }); + } + + if (sema.count > 0) + { + sema.count--; + fallback = true; + } +#endif #else struct timespec ts; ts.tv_sec = timeout / 1'000'000'000; @@ -252,6 +519,11 @@ void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_valu #endif } + if (!sema_id) + { + fallback = true; + } + while (true) { // Try to decrement @@ -259,15 +531,21 @@ void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_valu { if (value & s_waiter_mask) { -#ifdef _WIN32 +#ifndef USE_FUTEX + // If timeout if (!fallback) { - if ((value & s_signal_mask) == 0) + if ((value & s_signal_mask) == 0 || (value & s_sema_mask) >> 2 != sema_id) { return false; } value -= s_signal_mask & -s_signal_mask; + + if ((value & s_signal_mask) == 0) + { + value &= ~s_sema_mask; + } } #endif @@ -290,22 +568,45 @@ void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_valu break; } -#ifdef _WIN32 +#ifndef USE_FUTEX +#if defined(_WIN32) && !defined(USE_POSIX) static LARGE_INTEGER instant{}; - if (!NtWaitForKeyedEvent(nullptr, &entry, false, &instant)) + if (!NtWaitForSingleObject(s_sema_list[sema_id], false, &instant)) { - break; + fallback = true; + } +#elif defined(USE_POSIX) + if (sem_trywait(&s_sema_list[sema_id]) == 0) + { + fallback = true; } #else - // Unreachable - std::terminate(); + dumb_sema& sema = *s_sema_list[sema_id]; + + std::unique_lock lock(sema.mutex); + + if (sema.count > 0) + { + sema.count--; + fallback = true; + } +#endif #endif } +#ifndef USE_FUTEX + if (sema_id) + { + sema_free(sema_id); + } +#endif + s_tls_wait_cb(nullptr); } +#ifdef USE_FUTEX + void atomic_storage_futex::notify_one(const void* data) { const std::uintptr_t iptr = reinterpret_cast(data); @@ -316,23 +617,6 @@ void atomic_storage_futex::notify_one(const void* data) { if (value & s_waiter_mask && (value & s_pointer_mask) == (iptr & s_pointer_mask)) { -#ifdef _WIN32 - if ((value & s_signal_mask) == 0) - { - // No relevant waiters, do nothing - return false; - } - - // Try to decrement if possible - value -= s_waiter_mask & -s_waiter_mask; - value -= s_signal_mask & -s_signal_mask; - - if ((value & s_waiter_mask) == 0) - { - // Reset on last waiter - value = 0; - } -#else if ((value & s_signal_mask) == s_signal_mask) { // Signal overflow, do nothing @@ -347,7 +631,7 @@ void atomic_storage_futex::notify_one(const void* data) notify_all(data); return false; } -#endif + return true; } else if (value & s_waiter_mask && value & s_collision_bit) @@ -361,11 +645,7 @@ void atomic_storage_futex::notify_one(const void* data) if (ok) { -#ifdef _WIN32 - NtReleaseKeyedEvent(nullptr, &entry, false, nullptr); -#else futex(reinterpret_cast(&entry) + 4 * IS_BE_MACHINE, FUTEX_WAKE_PRIVATE, 1); -#endif } } @@ -375,53 +655,6 @@ void atomic_storage_futex::notify_all(const void* data) atomic_t& entry = s_hashtable[(iptr >> 2) % s_hashtable_size]; - // Try to consume everything -#ifdef _WIN32 - const auto [old, ok] = entry.fetch_op([&](u64& value) - { - if (value & s_waiter_mask) - { - if ((value & s_pointer_mask) == (iptr & s_pointer_mask)) - { - if ((value & s_signal_mask) == 0) - { - // No relevant waiters, do nothing - return false; - } - - const u64 count = (value & s_signal_mask) / (s_signal_mask & -s_signal_mask); - value -= (s_waiter_mask & -s_waiter_mask) * count; - value -= (s_signal_mask & -s_signal_mask) * count; - - if ((value & s_waiter_mask) == 0) - { - // Reset on last waiter - value = 0; - } - - return true; - } - - if (value & s_collision_bit) - { - fallback_notify_all(data); - return false; - } - } - - return false; - }); - - if (!ok) - { - return; - } - - for (u64 count = old & s_signal_mask; count; count -= s_signal_mask & -s_signal_mask) - { - NtReleaseKeyedEvent(nullptr, &entry, false, nullptr); - } -#else const auto [_, ok] = entry.fetch_op([&](u64& value) { if (value & s_waiter_mask) @@ -452,7 +685,6 @@ void atomic_storage_futex::notify_all(const void* data) { futex(reinterpret_cast(&entry) + 4 * IS_BE_MACHINE, FUTEX_WAKE_PRIVATE, 0x7fffffff); } -#endif } #endif @@ -472,3 +704,158 @@ void atomic_storage_futex::raw_notify(const void* data) notify_all(data); } } + +#ifndef USE_FUTEX + +void atomic_storage_futex::notify_one(const void* data) +{ + const std::uintptr_t iptr = reinterpret_cast(data); + + atomic_t& entry = s_hashtable[(iptr >> 2) % s_hashtable_size]; + + const u64 value = entry; + + if (value & s_waiter_mask && (value & s_pointer_mask) == (iptr & s_pointer_mask)) + { + if ((value & s_signal_mask) == 0 || (value & s_sema_mask) == 0) + { + // No relevant waiters, do nothing + return; + } + } + else if (value & s_waiter_mask && value & s_collision_bit) + { + fallback_notify_one(data); + return; + } + else + { + return; + } + + const u32 sema_id = (value & s_sema_mask) >> 2; + + if (!sema_get(sema_id)) + { + return; + } + + const auto [_, ok] = entry.fetch_op([&](u64& value) + { + if ((value & s_waiter_mask) == 0 || (value & s_pointer_mask) != (iptr & s_pointer_mask)) + { + return false; + } + + if ((value & s_signal_mask) == 0 || (value & s_sema_mask) >> 2 != sema_id) + { + return false; + } + + value -= s_signal_mask & -s_signal_mask; + + // Reset allocated semaphore on last waiter + if ((value & s_signal_mask) == 0) + { + value &= ~s_sema_mask; + } + + return true; + }); + + if (ok) + { +#ifdef USE_POSIX + sem_post(&s_sema_list[sema_id]); +#elif defined(_WIN32) + ReleaseSemaphore(s_sema_list[sema_id], 1, nullptr); +#else + dumb_sema& sema = *s_sema_list[sema_id]; + + sema.mutex.lock(); + sema.count += 1; + sema.mutex.unlock(); + sema.cond.notify_one(); +#endif + } + + sema_free(sema_id); +} + +void atomic_storage_futex::notify_all(const void* data) +{ + const std::uintptr_t iptr = reinterpret_cast(data); + + atomic_t& entry = s_hashtable[(iptr >> 2) % s_hashtable_size]; + + const u64 value = entry; + + if (value & s_waiter_mask && (value & s_pointer_mask) == (iptr & s_pointer_mask)) + { + if ((value & s_signal_mask) == 0 || (value & s_sema_mask) == 0) + { + // No relevant waiters, do nothing + return; + } + } + else if (value & s_waiter_mask && value & s_collision_bit) + { + fallback_notify_all(data); + return; + } + else + { + return; + } + + const u32 sema_id = (value & s_sema_mask) >> 2; + + if (!sema_get(sema_id)) + { + return; + } + + const auto [_, count] = entry.fetch_op([&](u64& value) -> u32 + { + if ((value & s_waiter_mask) == 0 || (value & s_pointer_mask) != (iptr & s_pointer_mask)) + { + return 0; + } + + if ((value & s_signal_mask) == 0 || (value & s_sema_mask) >> 2 != sema_id) + { + return 0; + } + + const u32 r = (value & s_signal_mask) / (s_signal_mask & -s_signal_mask); + value &= ~s_sema_mask; + value &= ~s_signal_mask; + return r; + }); + +#ifdef USE_POSIX + for (u32 i = 0; i < count; i++) + { + sem_post(&s_sema_list[sema_id]); + } +#elif defined(_WIN32) + if (count) + { + ReleaseSemaphore(s_sema_list[sema_id], count, nullptr); + } +#else + if (count) + { + dumb_sema& sema = *s_sema_list[sema_id]; + + sema.mutex.lock(); + sema.count += count; + sema.mutex.unlock(); + sema.cond.notify_all(); + } +#endif + + sema_free(sema_id); +} + +#endif