From de9d859f4ac4e4272f21f803f7e6671db9347540 Mon Sep 17 00:00:00 2001 From: Nekotekina Date: Sun, 28 Feb 2021 14:39:04 +0300 Subject: [PATCH] named_thread: implement task queue atomic_ptr: implement push_head() thread_ctrl::state() triggers task queue execution. --- Utilities/Thread.cpp | 94 ++++++++++- Utilities/Thread.h | 339 ++++++++++++++++++++++++++++++++++++-- rpcs3/util/shared_ptr.hpp | 58 ++++++- 3 files changed, 471 insertions(+), 20 deletions(-) diff --git a/Utilities/Thread.cpp b/Utilities/Thread.cpp index 9f7c4c1f10..e9fa5218da 100644 --- a/Utilities/Thread.cpp +++ b/Utilities/Thread.cpp @@ -2207,6 +2207,16 @@ thread_base::native_entry thread_base::make_trampoline(u64(*entry)(thread_base* }); } +thread_state thread_ctrl::state() +{ + auto _this = g_tls_this_thread; + + // Drain execution queue + _this->exec(); + + return static_cast(_this->m_sync & 3); +} + void thread_ctrl::_wait_for(u64 usec, bool alert /* true */) { auto _this = g_tls_this_thread; @@ -2256,13 +2266,16 @@ void thread_ctrl::_wait_for(u64 usec, bool alert /* true */) } #endif - if (_this->m_sync.bit_test_reset(2)) + if (_this->m_sync.bit_test_reset(2) || _this->m_taskq) { return; } // Wait for signal and thread state abort - _this->m_sync.wait(0, 4 + 1, atomic_wait_timeout{usec <= 0xffff'ffff'ffff'ffff / 1000 ? usec * 1000 : 0xffff'ffff'ffff'ffff}); + atomic_wait::list<2> list{}; + list.set<0>(_this->m_sync, 0, 4 + 1); + list.set<1>(_this->m_taskq, nullptr); + list.wait(atomic_wait_timeout{usec <= 0xffff'ffff'ffff'ffff / 1000 ? usec * 1000 : 0xffff'ffff'ffff'ffff}); } std::string thread_ctrl::get_name_cached() @@ -2298,6 +2311,9 @@ thread_base::thread_base(native_entry entry, std::string_view name) thread_base::~thread_base() { + // Cleanup abandoned tasks: initialize default results and signal + this->exec(); + // Only cleanup on errored status if ((m_sync & 3) == 2) { @@ -2404,6 +2420,80 @@ u64 thread_base::get_cycles() } } +void thread_base::push(shared_ptr task) +{ + const auto next = &task->next; + m_taskq.push_head(*next, std::move(task)); + m_taskq.notify_one(); +} + +void thread_base::exec() +{ + if (!m_taskq) [[likely]] + { + return; + } + + while (shared_ptr head = m_taskq.exchange(null_ptr)) + { + // TODO: check if adapting reverse algorithm is feasible here + shared_ptr* prev{}; + + for (auto ptr = head.get(); ptr; ptr = ptr->next.get()) + { + utils::prefetch_exec(ptr->exec.load()); + + ptr->prev = prev; + + if (ptr->next) + { + prev = &ptr->next; + } + } + + if (!prev) + { + prev = &head; + } + + for (auto ptr = prev->get(); ptr; ptr = ptr->prev->get()) + { + if (auto task = ptr->exec.load()) [[likely]] + { + // Execute or discard (if aborting) + if ((m_sync & 3) == 0) [[likely]] + { + task(this, ptr); + } + else + { + task(nullptr, ptr); + } + + // Notify waiters + ptr->exec.release(nullptr); + ptr->exec.notify_all(); + } + + if (ptr->next) + { + // Partial cleanup + ptr->next.reset(); + } + + if (!ptr->prev) + { + break; + } + } + + if (!m_taskq) [[likely]] + { + return; + } + } +} + [[noreturn]] void thread_ctrl::emergency_exit(std::string_view reason) { if (std::string info = dump_useful_thread_info(); !info.empty()) diff --git a/Utilities/Thread.h b/Utilities/Thread.h index 9e09fdacbc..ac5425ad57 100644 --- a/Utilities/Thread.h +++ b/Utilities/Thread.h @@ -65,19 +65,50 @@ struct result_storage(&data); } - const T* get() const + const T* _get() const { return reinterpret_cast(&data); } + void init() noexcept + { + new (data) T(); + } + void destroy() noexcept { - get()->~T(); + _get()->~T(); + } +}; + +// Base class for task queue (linked list) +class thread_future +{ + friend class thread_base; + + shared_ptr next{}; + + shared_ptr* prev{}; + +protected: + atomic_t exec{}; + +public: + // Get reference to the atomic variable for inspection and waiting for + const auto& get_wait() const + { + return exec; + } + + // Wait (preset) + void wait() const + { + exec.wait(nullptr); } }; @@ -110,6 +141,9 @@ private: // Thread name atomic_ptr m_tname; + // Thread task queue (reversed linked list) + atomic_ptr m_taskq{}; + // Start thread void start(); @@ -150,6 +184,13 @@ public: // Get thread id u64 get_native_id() const; + + // Add work to the queue + void push(shared_ptr); + +private: + // Clear task queue (execute unless aborting) + void exec(); }; // Collection of global function for current thread @@ -217,11 +258,8 @@ public: return static_cast(thread).get_native_id(); } - // Read current state - static inline thread_state state() - { - return static_cast(g_tls_this_thread->m_sync & 3); - } + // Read current state, possibly executing some tasks + static thread_state state(); // Wait once with timeout. May spuriously return false. static inline void wait_for(u64 usec, bool alert = true) @@ -241,14 +279,15 @@ public: { auto _this = g_tls_this_thread; - if (_this->m_sync.bit_test_reset(2)) + if (_this->m_sync.bit_test_reset(2) || _this->m_taskq) { return; } - atomic_wait::list<2> list{}; + atomic_wait::list<3> list{}; list.set<0, Op>(wait, old); list.set<1>(_this->m_sync, 0, 4 + 1); + list.set<2>(_this->m_taskq, nullptr); list.wait(atomic_wait_timeout{usec <= 0xffff'ffff'ffff'ffff / 1000 ? usec * 1000 : 0xffff'ffff'ffff'ffff}); } @@ -324,7 +363,7 @@ class named_thread final : public Context, result_storage, thread_base if constexpr (!result::empty) { // Construct using default constructor in the case of failure - new (static_cast(static_cast(thread_ctrl::get_current()))->get()) typename result::type(); + static_cast(static_cast(thread_ctrl::get_current()))->init(); } }); @@ -347,7 +386,7 @@ class named_thread final : public Context, result_storage, thread_base else { // Construct the result using placement new (copy elision should happen) - new (result::get()) decltype(auto)(Context::operator()()); + new (result::_get()) decltype(auto)(Context::operator()()); } return thread::finalize(thread_state::finished); @@ -395,7 +434,7 @@ public: if constexpr (!result::empty) { - return *result::get(); + return *result::_get(); } } @@ -406,7 +445,279 @@ public: if constexpr (!result::empty) { - return *result::get(); + return *result::_get(); + } + } + + // Send command to the thread to invoke directly (references should be passed via std::ref()) + template + auto operator()(Arg&& arg, Args&&... args) + { + // Overloaded operator() of the Context. + constexpr bool v1 = std::is_invocable_v; + + // Anything invocable, not necessarily involving the Context. + constexpr bool v2 = std::is_invocable_v; + + // Could be pointer to a non-static member function (or data member) of the Context. + constexpr bool v3 = std::is_member_pointer_v> && std::is_invocable_v; + + // Only one invocation type shall be valid, otherwise we don't know. + static_assert((v1 + v2 + v3) == 1, "Ambiguous or invalid named_thread call."); + + if constexpr (v1) + { + class future : public thread_future, result_storage + { + // A tuple to store arguments + decltype(std::make_tuple(std::forward(arg, args...))) m_args; + + public: + future(Arg&& arg, Args&&... args) + : m_args(std::forward(arg, args...)) + { + thread_future::exec.raw() = +[](thread_base* tb, thread_future* tf) + { + const auto _this = static_cast(tf); + + if (!tb) [[unlikely]] + { + if constexpr (!future::empty && !Discard) + { + _this->init(); + } + + return; + } + + if constexpr (future::empty || Discard) + { + std::apply(*static_cast(static_cast(tb)), _this->m_args); + } + else + { + new (_this->_get()) decltype(auto)(std::apply(*static_cast(static_cast(tb)), _this->m_args)); + } + }; + } + + future(const future&) = delete; + + future& operator=(const future&) = delete; + + ~future() + { + if constexpr (!future::empty && !Discard) + { + // Should be set to null if executed + if (!this->exec) + { + this->destroy(); + } + } + } + + decltype(auto) get() + { + if constexpr (!future::empty && !Discard) + { + return *this->_get(); + } + } + + decltype(auto) get() const + { + if constexpr (!future::empty && !Discard) + { + return *this->_get(); + } + } + }; + + single_ptr target = make_single(std::forward(arg, args...)); + + if constexpr (!Discard) + { + shared_ptr result = std::move(target); + + // Copy result + thread::push(result); + return result; + } + else + { + // Move target + thread::push(std::move(target)); + return; + } + } + else if constexpr (v2) + { + class future : public thread_future, result_storage, void, Args...> + { + decltype(std::make_tuple(std::forward(args...))) m_args; + + std::decay_t m_func; + + public: + future(Arg func, Args&&... args) + : m_args(std::forward(args...)) + , m_func(func) + { + thread_future::exec.raw() = +[](thread_base* tb, thread_future* tf) + { + const auto _this = static_cast(tf); + + if (!tb) [[unlikely]] + { + if constexpr (!future::empty && !Discard) + { + _this->init(); + } + + return; + } + + if constexpr (future::empty || Discard) + { + std::apply(_this->m_func, _this->m_args); + } + else + { + new (_this->_get()) decltype(auto)(std::apply(_this->m_func, _this->m_args)); + } + }; + } + + future(const future&) = delete; + + future& operator=(const future&) = delete; + + ~future() + { + if constexpr (!future::empty && !Discard) + { + if (!this->exec) + { + this->destroy(); + } + } + } + + decltype(auto) get() + { + if constexpr (!future::empty && !Discard) + { + return *this->_get(); + } + } + + decltype(auto) get() const + { + if constexpr (!future::empty && !Discard) + { + return *this->_get(); + } + } + }; + + single_ptr target = make_single(std::forward(arg, args...)); + + if constexpr (!Discard) + { + shared_ptr result = std::move(target); + thread::push(result); + return result; + } + else + { + thread::push(std::move(target)); + return; + } + } + else if constexpr (v3) + { + class future : public thread_future, result_storage, void, Context&, Args...> + { + decltype(std::make_tuple(std::forward(args...))) m_args; + + std::decay_t m_func; + + public: + future(Arg func, Args&&... args) + : m_args(std::forward(args...)) + , m_func(func) + { + thread_future::exec.raw() = +[](thread_base* tb, thread_future* tf) + { + const auto _this = static_cast(tf); + + if (!tb) [[unlikely]] + { + if constexpr (!future::empty && !Discard) + { + _this->init(); + } + + return; + } + + if constexpr (future::empty || Discard) + { + std::apply(_this->m_func, *static_cast(static_cast(tb)), _this->m_args); + } + else + { + new (_this->_get()) decltype(auto)(std::apply(_this->m_func, *static_cast(static_cast(tb)), _this->m_args)); + } + }; + } + + future(const future&) = delete; + + future& operator=(const future&) = delete; + + ~future() + { + if constexpr (!future::empty && !Discard) + { + if (!this->exec) + { + this->destroy(); + } + } + } + + decltype(auto) get() + { + if constexpr (!future::empty && !Discard) + { + return *this->_get(); + } + } + + decltype(auto) get() const + { + if constexpr (!future::empty && !Discard) + { + return *this->_get(); + } + } + }; + + single_ptr target = make_single(std::forward(arg, args...)); + + if constexpr (!Discard) + { + shared_ptr result = std::move(target); + thread::push(result); + return result; + } + else + { + thread::push(std::move(target)); + return; + } } } diff --git a/rpcs3/util/shared_ptr.hpp b/rpcs3/util/shared_ptr.hpp index fab08fc9aa..5a2bb1d456 100644 --- a/rpcs3/util/shared_ptr.hpp +++ b/rpcs3/util/shared_ptr.hpp @@ -6,9 +6,11 @@ namespace stx { +#ifndef _MSC_VER +#pragma GCC diagnostic push #ifdef __clang__ -#pragma clang diagnostic push -#pragma clang diagnostic ignored "-Wundefined-var-template" +#pragma GCC diagnostic ignored "-Wundefined-var-template" +#endif #endif // Not defined anywhere (and produces a useless warning) @@ -19,6 +21,9 @@ namespace stx template constexpr bool is_same_ptr() noexcept { +#if !defined(_MSC_VER) && !defined(__clang__) + return true; +#else if constexpr (std::is_void_v || std::is_void_v || std::is_same_v) { return true; @@ -39,13 +44,14 @@ namespace stx { return false; } +#endif } template constexpr bool is_same_ptr_cast_v = std::is_convertible_v && is_same_ptr(); -#ifdef __clang__ -#pragma clang diagnostic pop +#ifndef _MSC_VER +#pragma GCC diagnostic pop #endif template @@ -253,6 +259,11 @@ namespace stx } }; +#ifndef _MSC_VER +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Winvalid-offsetof" +#endif + template static std::enable_if_t) && (Init || !sizeof...(Args)), single_ptr> make_single(Args&&... args) noexcept { @@ -362,6 +373,10 @@ namespace stx return r; } +#ifndef _MSC_VER +#pragma GCC diagnostic pop +#endif + // Simplified shared pointer template class shared_ptr @@ -1035,6 +1050,41 @@ namespace stx return compare_and_swap_test(reinterpret_cast&>(cmp), std::move(exch)); } + // Helper utility + void push_head(shared_type& next, shared_type exch) noexcept + { + if (exch.m_ptr) [[likely]] + { + // Add missing references first + exch.d()->refs += c_ref_mask; + } + + if (next.m_ptr) [[unlikely]] + { + // Just in case + next.reset(); + } + + atomic_ptr old; + old.m_val.raw() = m_val.load(); + + do + { + // Update old head with current value + next.m_ptr = reinterpret_cast(old.m_val.raw() >> c_ref_size); + + } while (!m_val.compare_exchange(old.m_val.raw(), reinterpret_cast(exch.m_ptr) << c_ref_size)); + + // This argument is consumed (moved from) + exch.m_ptr = nullptr; + + if (next.m_ptr) + { + // Compensation for `next` assignment + old.m_val.raw() += 1; + } + } + // Simple atomic load is much more effective than load(), but it's a non-owning reference const volatile void* observe() const noexcept {