From a8a8cd88a040b26bd4f49ff7b46542b53601e034 Mon Sep 17 00:00:00 2001 From: Nekotekina Date: Fri, 21 Sep 2018 20:38:52 +0300 Subject: [PATCH] Implement lf_queue<>, lf_value<> lf_queue<>: unbound FIFO queue with dynamic linked-list lf_value<>: concurrently-assignable value readable without locking at the cost of memory (using dynamic linked list) Add atomic_t<>::compare_exchange --- Utilities/Atomic.h | 6 + Utilities/lockless.h | 219 ++++++++++++++++++++++++++++++++++- rpcs3/Emu/Cell/SPUThread.cpp | 1 - 3 files changed, 221 insertions(+), 5 deletions(-) diff --git a/Utilities/Atomic.h b/Utilities/Atomic.h index 90af693df2..5edb45f2e0 100644 --- a/Utilities/Atomic.h +++ b/Utilities/Atomic.h @@ -610,6 +610,12 @@ public: return atomic_storage::compare_exchange(m_data, old, exch); } + // As in std::atomic + bool compare_exchange(type& cmp_and_old, const type& exch) + { + return atomic_storage::compare_exchange(m_data, cmp_and_old, exch); + } + // Atomic operation; returns old value, or pair of old value and return value (cancel op if evaluates to false) template > std::conditional_t, type, std::pair> fetch_op(F&& func) diff --git a/Utilities/lockless.h b/Utilities/lockless.h index 430ce4cd30..ced11f0d1c 100644 --- a/Utilities/lockless.h +++ b/Utilities/lockless.h @@ -5,10 +5,10 @@ //! Simple sizeless array base for concurrent access. Cannot shrink, only growths automatically. //! There is no way to know the current size. The smaller index is, the faster it's accessed. -//! +//! //! T is the type of elements. Currently, default constructor of T shall be constexpr. //! N is initial element count, available without any memory allocation and only stored contiguously. -template +template class lf_array { // Data (default-initialized) @@ -70,7 +70,7 @@ public: { return reinterpret_cast&>(m_ctrl).load(); // Hack } - + // Acquire the place for one or more elements. u32 push_begin(u32 count = 1) { @@ -130,7 +130,7 @@ public: { return m_default_key_data; } - + // Calculate hash and array position for (std::size_t pos = Hash{}(key) % Size;; pos += Size) { @@ -328,3 +328,214 @@ public: using lf_spsc::size; using lf_spsc::operator []; }; + +// Helper type, linked list element +template +class lf_item final +{ + lf_item* m_link = nullptr; + + T m_data; + + template + friend class lf_queue; + + constexpr lf_item() = default; + + template + constexpr lf_item(lf_item* link, Args&&... args) + : m_link(link) + , m_data(std::forward(args)...) + { + } + +public: + lf_item(const lf_item&) = delete; + + lf_item& operator=(const lf_item&) = delete; + + ~lf_item() + { + for (lf_item* ptr = m_link; ptr;) + { + delete std::exchange(ptr, std::exchange(ptr->m_link, nullptr)); + } + } + + // Withdraw all other elements + std::unique_ptr> pop_all() + { + return std::unique_ptr>(std::exchange(m_link, nullptr)); + } + + [[nodiscard]] T& get() + { + return m_data; + } + + [[nodiscard]] const T& get() const + { + return m_data; + } +}; + +// Full-dynamic multi-producer queue (consumer consumes everything or nothing, thread-safe) +template +class lf_queue +{ + // Elements are added by replacing m_head + atomic_t*> m_head = nullptr; + + // Extract all elements and reverse element order (FILO to FIFO) + lf_item* reverse() noexcept + { + if (lf_item* head = m_head.load() ? m_head.exchange(nullptr) : nullptr) + { + if (lf_item* prev = head->m_link) + { + head->m_link = nullptr; + + do + { + lf_item* pprev = prev->m_link; + prev->m_link = head; + head = std::exchange(prev, pprev); + } while (prev); + } + + return head; + } + + return nullptr; + } + +public: + constexpr lf_queue() = default; + + ~lf_queue() + { + delete m_head.load(); + } + + template + void push(Args&&... args) + { + lf_item* old = m_head.load(); + lf_item* item = new lf_item(old, std::forward(args)...); + + while (!m_head.compare_exchange(old, item)) + { + item->m_link = old; + } + } + + // Withdraw the list + std::unique_ptr> pop_all() + { + return std::unique_ptr>(reverse()); + } + + // Withdraw the list and apply func(data) to each element, return the total length + template + std::size_t apply(F&& func) + { + std::size_t count = 0; + + for (std::unique_ptr> ptr(reverse()); ptr; ptr = ptr->pop_all(), count++) + { + std::invoke(std::forward(func), ptr->m_data); + } + + return count; + } + + // apply_all() overload for callable template argument + template + std::size_t apply() + { + std::size_t count = 0; + + for (std::unique_ptr> ptr(reverse()); ptr; ptr = ptr->pop_all(), count++) + { + std::invoke(F, ptr->m_data); + } + + return count; + } +}; + +// Assignable lock-free thread-safe value of any type (memory-inefficient) +template +class lf_value final +{ + atomic_t m_head; + + T m_data; + +public: + template + explicit constexpr lf_value(Args&&... args) + : m_head(this) + , m_data(std::forward(args)...) + { + } + + ~lf_value() + { + // All values are kept in the queue until the end + for (lf_value* ptr = m_head.load(); ptr != this;) + { + delete std::exchange(ptr, std::exchange(ptr->m_head.raw(), ptr)); + } + } + + // Get current head, allows to inspect old values + [[nodiscard]] const lf_value* head() const + { + return m_head.load(); + } + + // Inspect the initial (oldest) value + [[nodiscard]] const T& first() const + { + return m_data; + } + + [[nodiscard]] const T& get() const + { + return m_head.load()->m_data; + } + + [[nodiscard]] operator const T&() const + { + return m_head.load()->m_data; + } + + // Construct new value in-place + template + const T& assign(Args&&... args) + { + lf_value* val = new lf_value(std::forward(args)...); + lf_value* old = m_head.load(); + + do + { + val->m_head = old; + } + while (!m_head.compare_exchange(old, val)); + + return val->m_data; + } + + // Copy-assign new value + const T& operator =(const T& value) + { + return assign(value); + } + + // Move-assign new value + const T& operator =(T&& value) + { + return assign(std::move(value)); + } +}; diff --git a/rpcs3/Emu/Cell/SPUThread.cpp b/rpcs3/Emu/Cell/SPUThread.cpp index 14fc79d4da..6c5eb16483 100644 --- a/rpcs3/Emu/Cell/SPUThread.cpp +++ b/rpcs3/Emu/Cell/SPUThread.cpp @@ -1,6 +1,5 @@ #include "stdafx.h" #include "Utilities/JIT.h" -#include "Utilities/lockless.h" #include "Utilities/sysinfo.h" #include "Emu/Memory/vm.h" #include "Emu/System.h"