diff --git a/Utilities/sync.h b/Utilities/sync.h index f8d3028b59..71b52c5e93 100644 --- a/Utilities/sync.h +++ b/Utilities/sync.h @@ -16,6 +16,7 @@ #include #include #include +#include #endif #include #include diff --git a/rpcs3/util/atomic.cpp b/rpcs3/util/atomic.cpp index 2a304e31cb..b9e18d0da6 100644 --- a/rpcs3/util/atomic.cpp +++ b/rpcs3/util/atomic.cpp @@ -15,6 +15,7 @@ #include #include #include +#include // Hashtable size factor (can be set to 0 to stress-test collisions) static constexpr uint s_hashtable_power = 16; @@ -50,7 +51,7 @@ static NEVER_INLINE bool #ifdef _WIN32 __vectorcall #endif -ptr_cmp(const void* data, u32 size, __m128i old128, __m128i mask128) +ptr_cmp(const void* data, u32 size, __m128i old128, __m128i mask128, atomic_wait::info* ext = nullptr) { if constexpr (CheckCb) { @@ -63,12 +64,14 @@ ptr_cmp(const void* data, u32 size, __m128i old128, __m128i mask128) const u64 old_value = _mm_cvtsi128_si64(old128); const u64 mask = _mm_cvtsi128_si64(mask128); + bool result = false; + switch (size) { - case 1: return (reinterpret_cast*>(data)->load() & mask) == (old_value & mask); - case 2: return (reinterpret_cast*>(data)->load() & mask) == (old_value & mask); - case 4: return (reinterpret_cast*>(data)->load() & mask) == (old_value & mask); - case 8: return (reinterpret_cast*>(data)->load() & mask) == (old_value & mask); + case 1: result = (reinterpret_cast*>(data)->load() & mask) == (old_value & mask); break; + case 2: result = (reinterpret_cast*>(data)->load() & mask) == (old_value & mask); break; + case 4: result = (reinterpret_cast*>(data)->load() & mask) == (old_value & mask); break; + case 8: result = (reinterpret_cast*>(data)->load() & mask) == (old_value & mask); break; case 16: { const auto v0 = std::bit_cast<__m128i>(atomic_storage::load(*reinterpret_cast(data))); @@ -76,11 +79,7 @@ ptr_cmp(const void* data, u32 size, __m128i old128, __m128i mask128) const auto v2 = _mm_and_si128(v1, mask128); const auto v3 = _mm_packs_epi16(v2, v2); - if (_mm_cvtsi128_si64(v3) == 0) - { - return true; - } - + result = _mm_cvtsi128_si64(v3) == 0; break; } default: @@ -90,6 +89,23 @@ ptr_cmp(const void* data, u32 size, __m128i old128, __m128i mask128) } } + // Check other wait variables if provided + if (result) + { + if (ext) [[unlikely]] + { + for (auto e = ext; e->data; e++) + { + if (!ptr_cmp(e->data, e->size, e->old, e->mask)) + { + return false; + } + } + } + + return true; + } + return false; } @@ -146,6 +162,8 @@ cmp_mask(u32 size1, __m128i mask1, __m128i val1, u32 size2, __m128i mask2, __m12 return true; } +static atomic_t s_min_tsc{0}; + namespace atomic_wait { // Essentially a fat semaphore @@ -157,7 +175,8 @@ namespace atomic_wait u64 tid = reinterpret_cast(pthread_self()); #endif atomic_t sync{}; - u32 size{}; + u16 link{0}; + u16 size{}; u64 tsc0{}; const void* ptr{}; __m128i mask{}; @@ -165,7 +184,7 @@ namespace atomic_wait #ifdef USE_STD // Standard CV/mutex pair (often contains pthread_cond_t/pthread_mutex_t) - std::condition_variable cond; + std::condition_variable cv; std::mutex mtx; #endif @@ -195,7 +214,7 @@ namespace atomic_wait // Not super efficient: locking is required to avoid lost notifications mtx.lock(); mtx.unlock(); - cond.notify_all(); + cv.notify_all(); #elif defined(_WIN32) if (NtWaitForAlertByThreadId) { @@ -219,7 +238,7 @@ namespace atomic_wait if (mtx.try_lock()) { mtx.unlock(); - cond.notify_all(); + cv.notify_all(); return true; } @@ -266,6 +285,8 @@ static u32 cond_alloc() // Determine whether there is a free slot or not if (!s_cond_sema.try_inc(UINT16_MAX + 1)) { + // Temporarily placed here + fmt::raw_error("Thread semaphore limit " STRINGIZE(UINT16_MAX) " reached in atomic wait."); return 0; } @@ -354,9 +375,9 @@ static void cond_free(u32 cond_id) s_cond_sema--; } -static u32 cond_lock(atomic_t* sema) +static atomic_wait::cond_handle* cond_id_lock(u32 cond_id) { - while (const u32 cond_id = sema->load()) + if (cond_id - 1 < u32{UINT16_MAX}) { const auto [old, ok] = s_cond_refs[cond_id].fetch_op([](u32& ref) { @@ -372,13 +393,26 @@ static u32 cond_lock(atomic_t* sema) if (ok) { - return cond_id; + return cond_get(cond_id); } if (old == UINT32_MAX) { fmt::raw_error("Thread limit " STRINGIZE(UINT32_MAX) " for a single address reached in atomic notifier."); } + } + + return nullptr; +} + +static u32 cond_lock(atomic_t* sema) +{ + while (const u32 cond_id = sema->load()) + { + if (cond_id_lock(cond_id)) + { + return cond_id; + } if (sema->load() != cond_id) { @@ -636,7 +670,7 @@ static void slot_free(std::uintptr_t iptr, atomic_wait::sync_var* loc, u64 lv = } } -static void slot_free(atomic_wait::sync_var* slot, const void* data) +static void slot_free(const void* data) { const std::uintptr_t iptr = reinterpret_cast(data); @@ -707,6 +741,7 @@ static atomic_wait::sync_var* slot_alloc(const void* data) if (!ok) { // Expected only on top level + fmt::raw_error("Thread limit " STRINGIZE(UINT16_MAX) " reached in atomic wait hashtable."); return nullptr; } @@ -736,50 +771,122 @@ static atomic_wait::sync_var* slot_alloc(const void* data) return slot; } +u64 atomic_wait::get_unique_tsc() +{ + const u64 stamp0 = __rdtsc(); + + return s_min_tsc.atomic_op([&](u64& tsc) + { + if (stamp0 <= s_min_tsc) + { + // Add 1 if new stamp is too old + return ++tsc; + } + else + { + // Update last tsc with new stamp otherwise + return ((tsc = stamp0)); + } + }); +} + SAFE_BUFFERS void #ifdef _WIN32 __vectorcall #endif -atomic_wait_engine::wait(const void* data, u32 size, __m128i old_value, u64 timeout, __m128i mask) +atomic_wait_engine::wait(const void* data, u32 size, __m128i old_value, u64 timeout, __m128i mask, atomic_wait::info* ext) { + const auto stamp0 = atomic_wait::get_unique_tsc(); + const auto slot = slot_alloc(data); - verify(HERE), slot; + std::array slot_ext{}; + + uint ext_size = 0; + + if (ext) [[unlikely]] + { + for (auto e = ext; e->data; e++) + { + if (data == e->data) + { + fmt::raw_error("Address duplication in atomic_wait::list" HERE); + } + + for (u32 j = 0; j < ext_size; j++) + { + if (e->data == ext[j].data) + { + fmt::raw_error("Address duplication in atomic_wait::list" HERE); + } + } + + // Allocate additional slots + slot_ext[ext_size++] = slot_alloc(e->data); + } + } const u32 cond_id = cond_alloc(); - if (cond_id == 0) + verify(HERE), cond_id; + + u32 cond_id_ext[atomic_wait::max_list - 1]{}; + + for (u32 i = 0; i < ext_size; i++) { - fmt::raw_error("Thread limit " STRINGIZE(UINT16_MAX) " reached in atomic wait."); + cond_id_ext[i] = cond_alloc(); } - auto sema = slot->sema_alloc(); + const auto sema = slot->sema_alloc(); - while (!sema) + verify(HERE), sema; + + std::array*, atomic_wait::max_list - 1> sema_ext{}; + + std::array cond_ext{}; + + for (u32 i = 0; i < ext_size; i++) { - if (timeout + 1 || ptr_cmp(data, size, old_value, mask)) - { - cond_free(cond_id); - slot_free(slot, data); - return; - } + // Allocate cond id location ("sema") in *corresponding* slot + sema_ext[i] = slot_ext[i]->sema_alloc(); - // TODO - busy_wait(30000); - sema = slot->sema_alloc(); + // Get actual semaphores + cond_ext[i] = cond_get(cond_id_ext[i]); } // Save for notifiers const auto cond = cond_get(cond_id); // Store some info for notifiers (some may be unused) - cond->size = size; + cond->link = 0; + cond->size = static_cast(size); cond->mask = mask; cond->oldv = old_value; cond->ptr = data; - cond->tsc0 = __rdtsc(); + cond->tsc0 = stamp0; + + for (u32 i = 0; i < ext_size; i++) + { + // Extensions point to original cond_id, copy remaining info + cond_ext[i]->link = cond_id; + cond_ext[i]->size = static_cast(ext[i].size); + cond_ext[i]->mask = ext[i].mask; + cond_ext[i]->oldv = ext[i].old; + cond_ext[i]->ptr = ext[i].data; + cond_ext[i]->tsc0 = cond->tsc0; + + // Cannot be notified, should be redirected to main semaphore + cond_ext[i]->sync.release(4); + } + + cond->sync.release(1); + + for (u32 i = 0; i < ext_size; i++) + { + // Final deployment + sema_ext[i]->release(static_cast(cond_id_ext[i])); + } - cond->sync = 1; sema->store(static_cast(cond_id)); #ifdef USE_STD @@ -794,7 +901,7 @@ atomic_wait_engine::wait(const void* data, u32 size, __m128i old_value, u64 time bool fallback = false; #endif - while (ptr_cmp(data, size, old_value, mask)) + while (ptr_cmp(data, size, old_value, mask, ext)) { #ifdef USE_FUTEX struct timespec ts; @@ -824,11 +931,11 @@ atomic_wait_engine::wait(const void* data, u32 size, __m128i old_value, u64 time if (timeout + 1) { - cond->cond.wait_for(lock, std::chrono::nanoseconds(timeout)); + cond->cv.wait_for(lock, std::chrono::nanoseconds(timeout)); } else { - cond->cond.wait(lock); + cond->cv.wait(lock); } #elif defined(_WIN32) LARGE_INTEGER qw; @@ -919,9 +1026,20 @@ atomic_wait_engine::wait(const void* data, u32 size, __m128i old_value, u64 time } #endif + // Release resources in reverse order + for (u32 i = ext_size - 1; i != umax; i--) + { + slot_ext[i]->sema_free(sema_ext[i]); + } + slot->sema_free(sema); - slot_free(slot, data); + for (u32 i = ext_size - 1; i != umax; i--) + { + slot_free(ext[i].data); + } + + slot_free(data); s_tls_wait_cb(nullptr); } @@ -946,12 +1064,24 @@ alert_sema(atomic_t* sema, const void* data, u64 info, u32 size, __m128i ma bool ok = false; - if (cond->sync && (!size ? (!info || cond->tid == info) : cond->ptr == data && cmp_mask(size, mask, new_value, cond->size, cond->mask, cond->oldv))) + if (cond->sync && (!size ? (!info || cond->tid == info) : (cond->ptr == data && cmp_mask(size, mask, new_value, cond->size, cond->mask, cond->oldv)))) { - if ((!size && cond->forced_wakeup()) || (size && cond->sync.load() == 1 && cond->sync.compare_and_swap_test(1, 2))) + // Redirect if necessary + const auto _old = cond; + const auto _new = _old->link ? cond_id_lock(_old->link) : _old; + + if (_new && _new->tsc0 == _old->tsc0) { - ok = true; - cond->alert_native(); + if ((!size && _new->forced_wakeup()) || (size && _new->sync.load() == 1 && _new->sync.compare_and_swap_test(1, 2))) + { + ok = true; + _new->alert_native(); + } + } + + if (_new && _new != _old) + { + cond_free(_old->link); } } @@ -1000,10 +1130,19 @@ bool atomic_wait_engine::raw_notify(const void* data, u64 thread_id) return false; } + u32 val = 0; + std::memcpy(&val, reinterpret_cast(s_cond_list + i) + offsetof(atomic_wait::cond_handle, sync), sizeof(val)); + + if (val == 0 || val >= 3) + { + // Dirty optimization, read possibly uninitialized memory and skip forced signaled or secondary semaphores + return false; + } + if (thread_id) { u64 tid = 0; - std::memcpy(&tid, &cond_get(i)->tid, sizeof(tid)); + std::memcpy(&tid, reinterpret_cast(s_cond_list + i) + offsetof(atomic_wait::cond_handle, tid), sizeof(tid)); if (tid != thread_id) { @@ -1027,7 +1166,7 @@ bool atomic_wait_engine::raw_notify(const void* data, u64 thread_id) if (!thread_id || cond->tid == thread_id) { - if (cond->forced_wakeup()) + if (!cond->link && cond->forced_wakeup()) { cond->alert_native(); @@ -1139,6 +1278,7 @@ atomic_wait_engine::notify_all(const void* data, u32 size, __m128i mask, __m128i u64 copy = slot->get_sema_bits(); u64 lock = 0; u32 lock_ids[64]{}; + u32 lock_id2[64]{}; for (u64 bits = copy; bits; bits &= bits - 1) { @@ -1158,7 +1298,15 @@ atomic_wait_engine::notify_all(const void* data, u32 size, __m128i mask, __m128i if (cond->sync && cond->ptr == data && cmp_mask(size, mask, new_value, cond->size, cond->mask, cond->oldv)) { - if (cond->sync.load() == 1 && cond->sync.compare_and_swap_test(1, 2)) + const auto _old = cond; + const auto _new = _old->link ? cond_id_lock(_old->link) : _old; + + if (_new && _old != _new) + { + lock_id2[id] = _old->link; + } + + if (_new && _new->tsc0 == _old->tsc0 && _new->sync.load() == 1 && _new->sync.compare_and_swap_test(1, 2)) { // Ok. continue; @@ -1177,7 +1325,9 @@ atomic_wait_engine::notify_all(const void* data, u32 size, __m128i mask, __m128i { const u32 id = std::countr_zero(bits); - if (cond_get(lock_ids[id])->try_alert_native()) + const auto cond_id = lock_id2[id] ? lock_id2[id] : lock_ids[id]; + + if (cond_get(cond_id)->try_alert_native()) { s_tls_notify_cb(data, ++progress); @@ -1190,7 +1340,11 @@ atomic_wait_engine::notify_all(const void* data, u32 size, __m128i mask, __m128i // Proceed with remaining bits using "normal" blocking waiting for (u64 bits = copy; bits; bits &= bits - 1) { - cond_get(lock_ids[std::countr_zero(bits)])->alert_native(); + const u32 id = std::countr_zero(bits); + + const auto cond_id = lock_id2[id] ? lock_id2[id] : lock_ids[id]; + + cond_get(cond_id)->alert_native(); s_tls_notify_cb(data, ++progress); } @@ -1200,6 +1354,11 @@ atomic_wait_engine::notify_all(const void* data, u32 size, __m128i mask, __m128i { const u32 id = std::countr_zero(bits); + if (u32 cond_id = lock_id2[id]) + { + cond_free(cond_id); + } + if (u32 cond_id = lock_ids[id]) { cond_free(cond_id); diff --git a/rpcs3/util/atomic.hpp b/rpcs3/util/atomic.hpp index 66d99bdfbe..a5dbb656ae 100644 --- a/rpcs3/util/atomic.hpp +++ b/rpcs3/util/atomic.hpp @@ -17,9 +17,141 @@ enum class atomic_wait_timeout : u64 // Unused externally namespace atomic_wait { + constexpr uint max_list = 8; + struct sync_var; struct slot_info; struct sema_handle; + + struct info + { + const void* data; + u32 size; + __m128i old; + __m128i mask; + + template + constexpr void set_value(T value) + { + static_assert((sizeof(T) & (sizeof(T) - 1)) == 0); + static_assert(sizeof(T) <= 16); + + if constexpr (sizeof(T) <= 8) + { + old = _mm_cvtsi64_si128(std::bit_cast, T>(value)); + } + else if constexpr (sizeof(T) == 16) + { + old = std::bit_cast<__m128i>(value); + } + } + + void set_value() + { + old = _mm_setzero_si128(); + } + + template + constexpr void set_mask(T value) + { + static_assert((sizeof(T) & (sizeof(T) - 1)) == 0); + static_assert(sizeof(T) <= 16); + + if constexpr (sizeof(T) <= 8) + { + mask = _mm_cvtsi64_si128(std::bit_cast, T>(value)); + } + else if constexpr (sizeof(T) == 16) + { + mask = std::bit_cast<__m128i>(value); + } + } + + void set_mask() + { + mask = _mm_set1_epi64x(-1); + } + }; + + template + class list + { + static_assert(Max <= max_list, "Too many elements in the atomic wait list."); + + // Null-terminated list of wait info + info m_info[Max + 1]{}; + + public: + constexpr list() noexcept = default; + + constexpr list(const list&) noexcept = default; + + constexpr list& operator=(const list&) noexcept = default; + + template + constexpr list(atomic_t&... vars) + : m_info{{&vars.raw(), sizeof(U), _mm_setzero_si128(), _mm_set1_epi64x(-1)}...} + { + static_assert(sizeof...(U) <= Max); + } + + template + constexpr list& values(U... values) + { + static_assert(sizeof...(U) <= Max); + + auto* ptr = m_info; + ((ptr++)->template set_value(values), ...); + return *this; + } + + template + constexpr list& masks(T... masks) + { + static_assert(sizeof...(U) <= Max); + + auto* ptr = m_info; + ((ptr++)->template set_mask(masks), ...); + return *this; + } + + template + constexpr void set(atomic_t& var, U value) + { + static_assert(Index < Max); + + m_info[Index].data = &var.raw(); + m_info[Index].size = sizeof(T2); + m_info[Index].template set_value(value); + m_info[Index].mask = _mm_set1_epi64x(-1); + } + + template + constexpr void set(atomic_t& var, U value, V mask) + { + static_assert(Index < Max); + + m_info[Index].data = &var.raw(); + m_info[Index].size = sizeof(T2); + m_info[Index].template set_value(value); + m_info[Index].template set_mask(mask); + } + + // Timeout is discouraged + void wait(atomic_wait_timeout timeout = atomic_wait_timeout::inf); + + // Same as wait + void start() + { + wait(); + } + }; + + template + list(atomic_t&... vars) -> list; + + // RDTSC with adjustment for being unique + u64 get_unique_tsc(); } // Helper for waitable atomics (as in C++20 std::atomic) @@ -29,11 +161,14 @@ private: template friend class atomic_t; + template + friend class atomic_wait::list; + static void #ifdef _WIN32 __vectorcall #endif - wait(const void* data, u32 size, __m128i old128, u64 timeout, __m128i mask128); + wait(const void* data, u32 size, __m128i old128, u64 timeout, __m128i mask128, atomic_wait::info* extension = nullptr); static void #ifdef _WIN32 @@ -53,6 +188,14 @@ public: static bool raw_notify(const void* data, u64 thread_id = 0); }; +template +void atomic_wait::list::wait(atomic_wait_timeout timeout) +{ + static_assert(Max, "Cannot initiate atomic wait with empty list."); + + atomic_wait_engine::wait(m_info[0].data, m_info[0].size, m_info[0].old, static_cast(timeout), m_info[0].mask, m_info + 1); +} + // Helper class, provides access to compiler-specific atomic intrinsics template struct atomic_storage