1
0
mirror of https://github.com/RPCS3/rpcs3.git synced 2024-11-22 02:32:36 +01:00

named_thread: implement task queue

atomic_ptr: implement push_head()

thread_ctrl::state() triggers task queue execution.
This commit is contained in:
Nekotekina 2021-02-28 14:39:04 +03:00
parent 3aaa0172d5
commit de9d859f4a
3 changed files with 471 additions and 20 deletions

View File

@ -2207,6 +2207,16 @@ thread_base::native_entry thread_base::make_trampoline(u64(*entry)(thread_base*
});
}
thread_state thread_ctrl::state()
{
auto _this = g_tls_this_thread;
// Drain execution queue
_this->exec();
return static_cast<thread_state>(_this->m_sync & 3);
}
void thread_ctrl::_wait_for(u64 usec, bool alert /* true */)
{
auto _this = g_tls_this_thread;
@ -2256,13 +2266,16 @@ void thread_ctrl::_wait_for(u64 usec, bool alert /* true */)
}
#endif
if (_this->m_sync.bit_test_reset(2))
if (_this->m_sync.bit_test_reset(2) || _this->m_taskq)
{
return;
}
// Wait for signal and thread state abort
_this->m_sync.wait(0, 4 + 1, atomic_wait_timeout{usec <= 0xffff'ffff'ffff'ffff / 1000 ? usec * 1000 : 0xffff'ffff'ffff'ffff});
atomic_wait::list<2> list{};
list.set<0>(_this->m_sync, 0, 4 + 1);
list.set<1>(_this->m_taskq, nullptr);
list.wait(atomic_wait_timeout{usec <= 0xffff'ffff'ffff'ffff / 1000 ? usec * 1000 : 0xffff'ffff'ffff'ffff});
}
std::string thread_ctrl::get_name_cached()
@ -2298,6 +2311,9 @@ thread_base::thread_base(native_entry entry, std::string_view name)
thread_base::~thread_base()
{
// Cleanup abandoned tasks: initialize default results and signal
this->exec();
// Only cleanup on errored status
if ((m_sync & 3) == 2)
{
@ -2404,6 +2420,80 @@ u64 thread_base::get_cycles()
}
}
void thread_base::push(shared_ptr<thread_future> task)
{
const auto next = &task->next;
m_taskq.push_head(*next, std::move(task));
m_taskq.notify_one();
}
void thread_base::exec()
{
if (!m_taskq) [[likely]]
{
return;
}
while (shared_ptr<thread_future> head = m_taskq.exchange(null_ptr))
{
// TODO: check if adapting reverse algorithm is feasible here
shared_ptr<thread_future>* prev{};
for (auto ptr = head.get(); ptr; ptr = ptr->next.get())
{
utils::prefetch_exec(ptr->exec.load());
ptr->prev = prev;
if (ptr->next)
{
prev = &ptr->next;
}
}
if (!prev)
{
prev = &head;
}
for (auto ptr = prev->get(); ptr; ptr = ptr->prev->get())
{
if (auto task = ptr->exec.load()) [[likely]]
{
// Execute or discard (if aborting)
if ((m_sync & 3) == 0) [[likely]]
{
task(this, ptr);
}
else
{
task(nullptr, ptr);
}
// Notify waiters
ptr->exec.release(nullptr);
ptr->exec.notify_all();
}
if (ptr->next)
{
// Partial cleanup
ptr->next.reset();
}
if (!ptr->prev)
{
break;
}
}
if (!m_taskq) [[likely]]
{
return;
}
}
}
[[noreturn]] void thread_ctrl::emergency_exit(std::string_view reason)
{
if (std::string info = dump_useful_thread_info(); !info.empty())

View File

@ -65,19 +65,50 @@ struct result_storage<Ctx, std::enable_if_t<!std::is_void_v<std::invoke_result_t
using type = T;
T* get()
T* _get()
{
return reinterpret_cast<T*>(&data);
}
const T* get() const
const T* _get() const
{
return reinterpret_cast<const T*>(&data);
}
void init() noexcept
{
new (data) T();
}
void destroy() noexcept
{
get()->~T();
_get()->~T();
}
};
// Base class for task queue (linked list)
class thread_future
{
friend class thread_base;
shared_ptr<thread_future> next{};
shared_ptr<thread_future>* prev{};
protected:
atomic_t<void(*)(thread_base*, thread_future*)> exec{};
public:
// Get reference to the atomic variable for inspection and waiting for
const auto& get_wait() const
{
return exec;
}
// Wait (preset)
void wait() const
{
exec.wait<atomic_wait::op_ne>(nullptr);
}
};
@ -110,6 +141,9 @@ private:
// Thread name
atomic_ptr<std::string> m_tname;
// Thread task queue (reversed linked list)
atomic_ptr<thread_future> m_taskq{};
// Start thread
void start();
@ -150,6 +184,13 @@ public:
// Get thread id
u64 get_native_id() const;
// Add work to the queue
void push(shared_ptr<thread_future>);
private:
// Clear task queue (execute unless aborting)
void exec();
};
// Collection of global function for current thread
@ -217,11 +258,8 @@ public:
return static_cast<thread_base&>(thread).get_native_id();
}
// Read current state
static inline thread_state state()
{
return static_cast<thread_state>(g_tls_this_thread->m_sync & 3);
}
// Read current state, possibly executing some tasks
static thread_state state();
// Wait once with timeout. May spuriously return false.
static inline void wait_for(u64 usec, bool alert = true)
@ -241,14 +279,15 @@ public:
{
auto _this = g_tls_this_thread;
if (_this->m_sync.bit_test_reset(2))
if (_this->m_sync.bit_test_reset(2) || _this->m_taskq)
{
return;
}
atomic_wait::list<2> list{};
atomic_wait::list<3> list{};
list.set<0, Op>(wait, old);
list.set<1>(_this->m_sync, 0, 4 + 1);
list.set<2>(_this->m_taskq, nullptr);
list.wait(atomic_wait_timeout{usec <= 0xffff'ffff'ffff'ffff / 1000 ? usec * 1000 : 0xffff'ffff'ffff'ffff});
}
@ -324,7 +363,7 @@ class named_thread final : public Context, result_storage<Context>, thread_base
if constexpr (!result::empty)
{
// Construct using default constructor in the case of failure
new (static_cast<result*>(static_cast<named_thread*>(thread_ctrl::get_current()))->get()) typename result::type();
static_cast<result*>(static_cast<named_thread*>(thread_ctrl::get_current()))->init();
}
});
@ -347,7 +386,7 @@ class named_thread final : public Context, result_storage<Context>, thread_base
else
{
// Construct the result using placement new (copy elision should happen)
new (result::get()) decltype(auto)(Context::operator()());
new (result::_get()) decltype(auto)(Context::operator()());
}
return thread::finalize(thread_state::finished);
@ -395,7 +434,7 @@ public:
if constexpr (!result::empty)
{
return *result::get();
return *result::_get();
}
}
@ -406,7 +445,279 @@ public:
if constexpr (!result::empty)
{
return *result::get();
return *result::_get();
}
}
// Send command to the thread to invoke directly (references should be passed via std::ref())
template <bool Discard = true, typename Arg, typename... Args>
auto operator()(Arg&& arg, Args&&... args)
{
// Overloaded operator() of the Context.
constexpr bool v1 = std::is_invocable_v<Context, Arg&&, Args&&...>;
// Anything invocable, not necessarily involving the Context.
constexpr bool v2 = std::is_invocable_v<Arg&&, Args&&...>;
// Could be pointer to a non-static member function (or data member) of the Context.
constexpr bool v3 = std::is_member_pointer_v<std::decay_t<Arg>> && std::is_invocable_v<Arg, Context&, Args&&...>;
// Only one invocation type shall be valid, otherwise we don't know.
static_assert((v1 + v2 + v3) == 1, "Ambiguous or invalid named_thread call.");
if constexpr (v1)
{
class future : public thread_future, result_storage<Context, void, Arg, Args...>
{
// A tuple to store arguments
decltype(std::make_tuple(std::forward<Arg, Args...>(arg, args...))) m_args;
public:
future(Arg&& arg, Args&&... args)
: m_args(std::forward<Arg, Args...>(arg, args...))
{
thread_future::exec.raw() = +[](thread_base* tb, thread_future* tf)
{
const auto _this = static_cast<future*>(tf);
if (!tb) [[unlikely]]
{
if constexpr (!future::empty && !Discard)
{
_this->init();
}
return;
}
if constexpr (future::empty || Discard)
{
std::apply(*static_cast<Context*>(static_cast<named_thread*>(tb)), _this->m_args);
}
else
{
new (_this->_get()) decltype(auto)(std::apply(*static_cast<Context*>(static_cast<named_thread*>(tb)), _this->m_args));
}
};
}
future(const future&) = delete;
future& operator=(const future&) = delete;
~future()
{
if constexpr (!future::empty && !Discard)
{
// Should be set to null if executed
if (!this->exec)
{
this->destroy();
}
}
}
decltype(auto) get()
{
if constexpr (!future::empty && !Discard)
{
return *this->_get();
}
}
decltype(auto) get() const
{
if constexpr (!future::empty && !Discard)
{
return *this->_get();
}
}
};
single_ptr<future> target = make_single<future>(std::forward<Arg, Args...>(arg, args...));
if constexpr (!Discard)
{
shared_ptr<future> result = std::move(target);
// Copy result
thread::push(result);
return result;
}
else
{
// Move target
thread::push(std::move(target));
return;
}
}
else if constexpr (v2)
{
class future : public thread_future, result_storage<std::decay_t<Arg>, void, Args...>
{
decltype(std::make_tuple(std::forward<Args...>(args...))) m_args;
std::decay_t<Arg> m_func;
public:
future(Arg func, Args&&... args)
: m_args(std::forward<Args...>(args...))
, m_func(func)
{
thread_future::exec.raw() = +[](thread_base* tb, thread_future* tf)
{
const auto _this = static_cast<future*>(tf);
if (!tb) [[unlikely]]
{
if constexpr (!future::empty && !Discard)
{
_this->init();
}
return;
}
if constexpr (future::empty || Discard)
{
std::apply(_this->m_func, _this->m_args);
}
else
{
new (_this->_get()) decltype(auto)(std::apply(_this->m_func, _this->m_args));
}
};
}
future(const future&) = delete;
future& operator=(const future&) = delete;
~future()
{
if constexpr (!future::empty && !Discard)
{
if (!this->exec)
{
this->destroy();
}
}
}
decltype(auto) get()
{
if constexpr (!future::empty && !Discard)
{
return *this->_get();
}
}
decltype(auto) get() const
{
if constexpr (!future::empty && !Discard)
{
return *this->_get();
}
}
};
single_ptr<future> target = make_single<future>(std::forward<Arg, Args...>(arg, args...));
if constexpr (!Discard)
{
shared_ptr<future> result = std::move(target);
thread::push(result);
return result;
}
else
{
thread::push(std::move(target));
return;
}
}
else if constexpr (v3)
{
class future : public thread_future, result_storage<std::decay_t<Arg>, void, Context&, Args...>
{
decltype(std::make_tuple(std::forward<Args...>(args...))) m_args;
std::decay_t<Arg> m_func;
public:
future(Arg func, Args&&... args)
: m_args(std::forward<Args...>(args...))
, m_func(func)
{
thread_future::exec.raw() = +[](thread_base* tb, thread_future* tf)
{
const auto _this = static_cast<future*>(tf);
if (!tb) [[unlikely]]
{
if constexpr (!future::empty && !Discard)
{
_this->init();
}
return;
}
if constexpr (future::empty || Discard)
{
std::apply(_this->m_func, *static_cast<Context*>(static_cast<named_thread*>(tb)), _this->m_args);
}
else
{
new (_this->_get()) decltype(auto)(std::apply(_this->m_func, *static_cast<Context*>(static_cast<named_thread*>(tb)), _this->m_args));
}
};
}
future(const future&) = delete;
future& operator=(const future&) = delete;
~future()
{
if constexpr (!future::empty && !Discard)
{
if (!this->exec)
{
this->destroy();
}
}
}
decltype(auto) get()
{
if constexpr (!future::empty && !Discard)
{
return *this->_get();
}
}
decltype(auto) get() const
{
if constexpr (!future::empty && !Discard)
{
return *this->_get();
}
}
};
single_ptr<future> target = make_single<future>(std::forward<Arg, Args...>(arg, args...));
if constexpr (!Discard)
{
shared_ptr<future> result = std::move(target);
thread::push(result);
return result;
}
else
{
thread::push(std::move(target));
return;
}
}
}

View File

@ -6,9 +6,11 @@
namespace stx
{
#ifndef _MSC_VER
#pragma GCC diagnostic push
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wundefined-var-template"
#pragma GCC diagnostic ignored "-Wundefined-var-template"
#endif
#endif
// Not defined anywhere (and produces a useless warning)
@ -19,6 +21,9 @@ namespace stx
template <typename T, typename U>
constexpr bool is_same_ptr() noexcept
{
#if !defined(_MSC_VER) && !defined(__clang__)
return true;
#else
if constexpr (std::is_void_v<T> || std::is_void_v<U> || std::is_same_v<T, U>)
{
return true;
@ -39,13 +44,14 @@ namespace stx
{
return false;
}
#endif
}
template <typename T, typename U>
constexpr bool is_same_ptr_cast_v = std::is_convertible_v<U*, T*> && is_same_ptr<T, U>();
#ifdef __clang__
#pragma clang diagnostic pop
#ifndef _MSC_VER
#pragma GCC diagnostic pop
#endif
template <typename T>
@ -253,6 +259,11 @@ namespace stx
}
};
#ifndef _MSC_VER
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Winvalid-offsetof"
#endif
template <typename T, bool Init = true, typename... Args>
static std::enable_if_t<!(std::is_unbounded_array_v<T>) && (Init || !sizeof...(Args)), single_ptr<T>> make_single(Args&&... args) noexcept
{
@ -362,6 +373,10 @@ namespace stx
return r;
}
#ifndef _MSC_VER
#pragma GCC diagnostic pop
#endif
// Simplified shared pointer
template <typename T>
class shared_ptr
@ -1035,6 +1050,41 @@ namespace stx
return compare_and_swap_test(reinterpret_cast<const shared_ptr<U>&>(cmp), std::move(exch));
}
// Helper utility
void push_head(shared_type& next, shared_type exch) noexcept
{
if (exch.m_ptr) [[likely]]
{
// Add missing references first
exch.d()->refs += c_ref_mask;
}
if (next.m_ptr) [[unlikely]]
{
// Just in case
next.reset();
}
atomic_ptr old;
old.m_val.raw() = m_val.load();
do
{
// Update old head with current value
next.m_ptr = reinterpret_cast<T*>(old.m_val.raw() >> c_ref_size);
} while (!m_val.compare_exchange(old.m_val.raw(), reinterpret_cast<uptr>(exch.m_ptr) << c_ref_size));
// This argument is consumed (moved from)
exch.m_ptr = nullptr;
if (next.m_ptr)
{
// Compensation for `next` assignment
old.m_val.raw() += 1;
}
}
// Simple atomic load is much more effective than load(), but it's a non-owning reference
const volatile void* observe() const noexcept
{