diff --git a/Utilities/GDBDebugServer.cpp b/Utilities/GDBDebugServer.cpp index eb3bf30f84..708241fc62 100644 --- a/Utilities/GDBDebugServer.cpp +++ b/Utilities/GDBDebugServer.cpp @@ -829,7 +829,7 @@ void GDBDebugServer::on_stop() this->stop = true; //just in case we are waiting for breakpoint this->notify(); - named_thread::on_stop(); + old_thread::on_stop(); } void GDBDebugServer::pause_from(cpu_thread* t) { diff --git a/Utilities/GDBDebugServer.h b/Utilities/GDBDebugServer.h index c36935eb26..d18861610b 100644 --- a/Utilities/GDBDebugServer.h +++ b/Utilities/GDBDebugServer.h @@ -40,8 +40,8 @@ public: const u64 ALL_THREADS = 0xffffffffffffffff; const u64 ANY_THREAD = 0; -class GDBDebugServer : public named_thread { - +class GDBDebugServer : public old_thread +{ socket_t server_socket; socket_t client_socket; std::weak_ptr selected_thread; diff --git a/Utilities/Thread.cpp b/Utilities/Thread.cpp index c18d938702..f820494a7f 100644 --- a/Utilities/Thread.cpp +++ b/Utilities/Thread.cpp @@ -35,10 +35,12 @@ #endif #include "sync.h" +#include "Log.h" thread_local u64 g_tls_fault_all = 0; thread_local u64 g_tls_fault_rsx = 0; thread_local u64 g_tls_fault_spu = 0; +extern thread_local std::string(*g_tls_log_prefix)(); [[noreturn]] void catch_all_exceptions() { @@ -48,11 +50,11 @@ thread_local u64 g_tls_fault_spu = 0; } catch (const std::exception& e) { - report_fatal_error("Unhandled exception of type '"s + typeid(e).name() + "': "s + e.what()); + report_fatal_error("{" + g_tls_log_prefix() + "} Unhandled exception of type '"s + typeid(e).name() + "': "s + e.what()); } catch (...) { - report_fatal_error("Unhandled exception (unknown)"); + report_fatal_error("{" + g_tls_log_prefix() + "} Unhandled exception (unknown)"); } } @@ -1567,25 +1569,21 @@ extern atomic_t g_thread_count(0); thread_local DECLARE(thread_ctrl::g_tls_this_thread) = nullptr; -extern thread_local std::string(*g_tls_log_prefix)(); - DECLARE(thread_ctrl::g_native_core_layout) { native_core_arrangement::undefined }; -void thread_ctrl::start(const std::shared_ptr& ctrl, task_stack task) +void thread_base::start(const std::shared_ptr& ctrl, task_stack task) { #ifdef _WIN32 using thread_result = uint; - using thread_type = thread_result(__stdcall*)(void* arg); #else using thread_result = void*; - using thread_type = thread_result(*)(void* arg); #endif // Thread entry point - const thread_type entry = [](void* arg) -> thread_result + const native_entry entry = [](void* arg) -> thread_result { - // Recover shared_ptr from short-circuited thread_ctrl object pointer - const std::shared_ptr ctrl = static_cast(arg)->m_self; + // Recover shared_ptr from short-circuited thread_base object pointer + std::shared_ptr ctrl = static_cast(arg)->m_self; try { @@ -1596,17 +1594,18 @@ void thread_ctrl::start(const std::shared_ptr& ctrl, task_stack tas { // Capture exception ctrl->finalize(std::current_exception()); + finalize(); return 0; } ctrl->finalize(nullptr); + finalize(); return 0; }; ctrl->m_self = ctrl; ctrl->m_task = std::move(task); - // TODO: implement simple thread pool #ifdef _WIN32 std::uintptr_t thread = _beginthreadex(nullptr, 0, entry, ctrl.get(), 0, nullptr); verify("thread_ctrl::start" HERE), thread != 0; @@ -1619,14 +1618,24 @@ void thread_ctrl::start(const std::shared_ptr& ctrl, task_stack tas ctrl->m_thread = (uintptr_t)thread; } -void thread_ctrl::initialize() +void thread_base::start(native_entry entry) +{ +#ifdef _WIN32 + m_thread = ::_beginthreadex(nullptr, 0, entry, this, CREATE_SUSPENDED, nullptr); + verify("thread_ctrl::start" HERE), m_thread, ::ResumeThread(reinterpret_cast(+m_thread)) != -1; +#else + verify("thread_ctrl::start" HERE), pthread_create(reinterpret_cast(&m_thread.raw()), nullptr, entry, this) == 0; +#endif +} + +void thread_base::initialize() { // Initialize TLS variable - g_tls_this_thread = this; + thread_ctrl::g_tls_this_thread = this; g_tls_log_prefix = [] { - return g_tls_this_thread->m_name; + return thread_ctrl::g_tls_this_thread->m_name.get(); }; ++g_thread_count; @@ -1645,7 +1654,7 @@ void thread_ctrl::initialize() { THREADNAME_INFO info; info.dwType = 0x1000; - info.szName = m_name.c_str(); + info.szName = m_name.get().c_str(); info.dwThreadID = -1; info.dwFlags = 0; @@ -1660,17 +1669,17 @@ void thread_ctrl::initialize() #endif #if defined(__APPLE__) - pthread_setname_np(m_name.substr(0, 15).c_str()); + pthread_setname_np(m_name.get().substr(0, 15).c_str()); #elif defined(__DragonFly__) || defined(__FreeBSD__) || defined(__OpenBSD__) - pthread_set_name_np(pthread_self(), m_name.c_str()); + pthread_set_name_np(pthread_self(), m_name.get().c_str()); #elif defined(__NetBSD__) - pthread_setname_np(pthread_self(), "%s", (void*)m_name.c_str()); + pthread_setname_np(pthread_self(), "%s", (void*)m_name.get().c_str()); #elif !defined(_WIN32) - pthread_setname_np(pthread_self(), m_name.substr(0, 15).c_str()); + pthread_setname_np(pthread_self(), m_name.get().substr(0, 15).c_str()); #endif } -void thread_ctrl::finalize(std::exception_ptr eptr) noexcept +std::shared_ptr thread_base::finalize(std::exception_ptr eptr) noexcept { // Report pending errors error_code::error_report(0, 0, 0, 0); @@ -1693,7 +1702,7 @@ void thread_ctrl::finalize(std::exception_ptr eptr) noexcept g_tls_log_prefix = [] { - return g_tls_this_thread->m_name; + return thread_ctrl::g_tls_this_thread->m_name.get(); }; LOG_NOTICE(GENERAL, "Thread time: %fs (%fGc); Faults: %u [rsx:%u, spu:%u];", @@ -1703,13 +1712,24 @@ void thread_ctrl::finalize(std::exception_ptr eptr) noexcept g_tls_fault_rsx, g_tls_fault_spu); - --g_thread_count; - // Untangle circular reference, set exception - std::lock_guard{m_mutex}, m_self.reset(), m_exception = eptr; + std::unique_lock lock(m_mutex); - // Signal joining waiters - m_jcv.notify_all(); + // Possibly last reference to the thread object + std::shared_ptr self = std::move(m_self); + m_state = thread_state::finished; + m_exception = eptr; + + // Signal waiting threads + lock.unlock(), m_jcv.notify_all(); + return self; +} + +void thread_base::finalize() noexcept +{ + g_tls_log_prefix = []() -> std::string { return {}; }; + thread_ctrl::g_tls_this_thread = nullptr; + --g_thread_count; } bool thread_ctrl::_wait_for(u64 usec) @@ -1718,7 +1738,7 @@ bool thread_ctrl::_wait_for(u64 usec) struct half_lock { - semaphore<>& ref; + shared_mutex& ref; void lock() { @@ -1777,7 +1797,7 @@ bool thread_ctrl::_wait_for(u64 usec) return false; } -[[noreturn]] void thread_ctrl::_throw() +[[noreturn]] void thread_base::_throw() { std::exception_ptr ex = std::exchange(m_exception, std::exception_ptr{}); m_signal &= ~3; @@ -1785,10 +1805,10 @@ bool thread_ctrl::_wait_for(u64 usec) std::rethrow_exception(std::move(ex)); } -void thread_ctrl::_notify(cond_variable thread_ctrl::* ptr) +void thread_base::_notify(cond_variable thread_base::* ptr) { // Optimized lock + unlock - if (!m_mutex.get()) + if (!m_mutex.is_free()) { m_mutex.lock(); m_mutex.unlock(); @@ -1797,12 +1817,12 @@ void thread_ctrl::_notify(cond_variable thread_ctrl::* ptr) (this->*ptr).notify_one(); } -thread_ctrl::thread_ctrl(std::string&& name) - : m_name(std::move(name)) +thread_base::thread_base(std::string_view name) + : m_name(name) { } -thread_ctrl::~thread_ctrl() +thread_base::~thread_base() { if (m_thread) { @@ -1814,13 +1834,13 @@ thread_ctrl::~thread_ctrl() } } -std::exception_ptr thread_ctrl::get_exception() const +std::exception_ptr thread_base::get_exception() const { std::lock_guard lock(m_mutex); return m_exception; } -void thread_ctrl::set_exception(std::exception_ptr ptr) +void thread_base::set_exception(std::exception_ptr ptr) { std::lock_guard lock(m_mutex); m_exception = ptr; @@ -1836,35 +1856,52 @@ void thread_ctrl::set_exception(std::exception_ptr ptr) } } -void thread_ctrl::join() +void thread_base::join() const { -#ifdef _WIN32 - //verify("thread_ctrl::join" HERE), WaitForSingleObjectEx((HANDLE)m_thread.load(), -1, false) == WAIT_OBJECT_0; -#endif + if (m_state == thread_state::finished) + { + return; + } std::unique_lock lock(m_mutex); - while (m_self) + while (m_state != thread_state::finished) { m_jcv.wait(lock); } +} - if (UNLIKELY(m_exception && !std::uncaught_exceptions())) +void thread_base::detach() +{ + auto self = weak_from_this().lock(); + + if (!self) { - std::rethrow_exception(m_exception); + LOG_FATAL(GENERAL, "Cannot detach thread '%s'", get_name()); + return; + } + + if (self->m_state.compare_and_swap_test(thread_state::created, thread_state::detached)) + { + std::lock_guard lock(m_mutex); + + if (m_state == thread_state::detached) + { + m_self = std::move(self); + } } } -void thread_ctrl::notify() +void thread_base::notify() { if (!(m_signal & 1)) { m_signal |= 1; - _notify(&thread_ctrl::m_cond); + _notify(&thread_base::m_cond); } } -u64 thread_ctrl::get_cycles() +u64 thread_base::get_cycles() { u64 cycles; @@ -2059,23 +2096,23 @@ void thread_ctrl::set_thread_affinity_mask(u16 mask) #endif } -named_thread::named_thread() +old_thread::old_thread() { } -named_thread::~named_thread() +old_thread::~old_thread() { } -std::string named_thread::get_name() const +std::string old_thread::get_name() const { return fmt::format("('%s') Unnamed Thread", typeid(*this).name()); } -void named_thread::start_thread(const std::shared_ptr& _this) +void old_thread::start_thread(const std::shared_ptr& _this) { // Ensure it's not called from the constructor and the correct object is passed - verify("named_thread::start_thread" HERE), _this.get() == this; + verify("old_thread::start_thread" HERE), _this.get() == this; // Run thread thread_ctrl::spawn(m_thread, get_name(), [this, _this]() diff --git a/Utilities/Thread.h b/Utilities/Thread.h index 5c3c53feb1..bdcea7b01e 100644 --- a/Utilities/Thread.h +++ b/Utilities/Thread.h @@ -6,9 +6,11 @@ #include #include #include +#include -#include "sema.h" +#include "mutex.h" #include "cond.h" +#include "lockless.h" // Report error and call std::abort(), defined in main.cpp [[noreturn]] void report_fatal_error(const std::string&); @@ -33,6 +35,55 @@ enum class thread_class : u32 ppu }; +enum class thread_state +{ + created, // Initial state + detached, // Set if the thread has been detached successfully (only possible via shared_ptr) + aborting, // Set if the thread has been joined in destructor (mutually exclusive with detached) + finished // Final state, always set at the end of thread execution +}; + +template +class named_thread; + +template +struct result_storage +{ + alignas(T) std::byte data[sizeof(T)]; + + static constexpr bool empty = false; + + using type = T; + + T* get() + { + return reinterpret_cast(&data); + } + + const T* get() const + { + return reinterpret_cast(&data); + } +}; + +template <> +struct result_storage +{ + static constexpr bool empty = true; + + using type = void; +}; + +template +using result_storage_t = result_storage>; + +// Detect on_stop() method (should return void) +template +struct thread_on_stop : std::bool_constant {}; + +template +struct thread_on_stop&>().on_stop())> : std::bool_constant {}; + // Simple list of void() functors class task_stack { @@ -102,23 +153,24 @@ public: } }; -// Thread control class -class thread_ctrl final +// Thread base class +class thread_base : public std::enable_shared_from_this { - // Current thread - static thread_local thread_ctrl* g_tls_this_thread; + // Native thread entry point function type +#ifdef _WIN32 + using native_entry = uint(__stdcall*)(void* arg); +#else + using native_entry = void*(*)(void* arg); +#endif - // Target cpu core layout - static atomic_t g_native_core_layout; - - // Self pointer - std::shared_ptr m_self; + // Self pointer for detached thread + std::shared_ptr m_self; // Thread handle (platform-specific) atomic_t m_thread{0}; // Thread mutex - mutable semaphore<> m_mutex; + mutable shared_mutex m_mutex; // Thread condition variable cond_variable m_cond; @@ -127,7 +179,10 @@ class thread_ctrl final atomic_t m_signal{0}; // Thread joining condition variable - cond_variable m_jcv; + mutable cond_variable m_jcv; + + // Thread state + atomic_t m_state = thread_state::created; // Remotely set or caught exception std::exception_ptr m_exception; @@ -135,38 +190,40 @@ class thread_ctrl final // Thread initial task task_stack m_task; - // Fixed name - std::string m_name; + // Thread name + lf_value m_name; // CPU cycles thread has run for u64 m_cycles{0}; // Start thread - static void start(const std::shared_ptr&, task_stack); + static void start(const std::shared_ptr&, task_stack); + + void start(native_entry); // Called at the thread start void initialize(); - // Called at the thread end - void finalize(std::exception_ptr) noexcept; + // Called at the thread end, returns moved m_self (may be null) + std::shared_ptr finalize(std::exception_ptr) noexcept; - // Internal waiting function, may throw. Infinite value is -1. - static bool _wait_for(u64 usec); + static void finalize() noexcept; // Internal throwing function. Mutex must be locked and will be unlocked. [[noreturn]] void _throw(); // Internal notification function - void _notify(cond_variable thread_ctrl::*); + void _notify(cond_variable thread_base::*); + + friend class thread_ctrl; + + template + friend class named_thread; public: - thread_ctrl(std::string&& name); + thread_base(std::string_view name); - thread_ctrl(const thread_ctrl&) = delete; - - thread_ctrl& operator=(const thread_ctrl&) = delete; - - ~thread_ctrl(); + ~thread_base(); // Get thread name const std::string& get_name() const @@ -174,6 +231,12 @@ public: return m_name; } + // Set thread name (not recommended) + void set_name(std::string_view name) + { + m_name.assign(name); + } + // Get CPU cycles since last time this function was called. First call returns 0. u64 get_cycles(); @@ -189,11 +252,36 @@ public: // Set exception void set_exception(std::exception_ptr ptr); - // Get thread result (may throw, simultaneous joining allowed) - void join(); + // Wait for the thread (it does NOT change thread state, and can be called from multiple threads) + void join() const; + + // Make thread to manage a shared_ptr of itself + void detach(); // Notify the thread void notify(); +}; + +// Collection of global function for current thread +class thread_ctrl final +{ + // Current thread + static thread_local thread_base* g_tls_this_thread; + + // Target cpu core layout + static atomic_t g_native_core_layout; + + // Internal waiting function, may throw. Infinite value is -1. + static bool _wait_for(u64 usec); + + friend class thread_base; + +public: + // Read current state + static inline thread_state state() + { + return g_tls_this_thread->m_state; + } // Wait once with timeout. Abortable, may throw. May spuriously return false. static inline bool wait_for(u64 usec) @@ -208,7 +296,7 @@ public: } // Wait until pred(). Abortable, may throw. - template> + template > static inline RT wait(F&& pred) { while (true) @@ -235,27 +323,27 @@ public: static void test(); // Get current thread (may be nullptr) - static thread_ctrl* get_current() + static thread_base* get_current() { return g_tls_this_thread; } // Create detached named thread - template + template static inline void spawn(N&& name, F&& func) { - auto out = std::make_shared(std::forward(name)); + auto out = std::make_shared(std::forward(name)); - thread_ctrl::start(out, std::forward(func)); + thread_base::start(out, std::forward(func)); } // Named thread factory - template - static inline void spawn(std::shared_ptr& out, N&& name, F&& func) + template + static inline void spawn(std::shared_ptr& out, N&& name, F&& func) { - out = std::make_shared(std::forward(name)); + out = std::make_shared(std::forward(name)); - thread_ctrl::start(out, std::forward(func)); + thread_base::start(out, std::forward(func)); } // Detect layout @@ -269,21 +357,140 @@ public: // Sets the preferred affinity mask for this thread static void set_thread_affinity_mask(u16 mask); + + template + static inline std::shared_ptr> make_shared(std::string_view name, F&& lambda) + { + return std::make_shared>(name, std::forward(lambda)); + } + + template + static inline std::shared_ptr> make_shared(std::string_view name, Args&&... args) + { + return std::make_shared>(name, std::forward(args)...); + } }; -class named_thread +// Derived from the callable object Context, possibly a lambda +template +class named_thread final : public Context, result_storage_t, public thread_base { - // Pointer to managed resource (shared with actual thread) - std::shared_ptr m_thread; + using result = result_storage_t; + using thread = thread_base; + + // Type-erased thread entry point +#ifdef _WIN32 + static inline uint __stdcall entry_point(void* arg) try +#else + static inline void* entry_point(void* arg) try +#endif + { + const auto maybe_last_ptr = static_cast(static_cast(arg))->entry_point(); + thread::finalize(); + return 0; + } + catch (...) + { + catch_all_exceptions(); + } + + std::shared_ptr entry_point() + { + thread::initialize(); + + if constexpr (result::empty) + { + // No result + Context::operator()(); + } + else + { + // Construct the result using placement new (copy elision should happen) + new (result::get()) typename result::type(Context::operator()()); + } + + return thread::finalize(nullptr); + } public: - named_thread(); - virtual ~named_thread(); + // Normal forwarding constructor + template >> + named_thread(std::string_view name, Args&&... args) + : Context(std::forward(args)...) + , thread(name) + { + thread::start(&named_thread::entry_point); + } - named_thread(const named_thread&) = delete; + // Lambda constructor, also the implicit deduction guide candidate + named_thread(std::string_view name, Context&& f) + : Context(std::forward(f)) + , thread(name) + { + thread::start(&named_thread::entry_point); + } - named_thread& operator=(const named_thread&) = delete; + // Wait for the completion and access result (if not void) + [[nodiscard]] decltype(auto) operator()() + { + thread::join(); + + if constexpr (!result::empty) + { + return *result::get(); + } + } + + // Wait for the completion and access result (if not void) + [[nodiscard]] decltype(auto) operator()() const + { + thread::join(); + + if constexpr (!result::empty) + { + return *result::get(); + } + } + + // Access thread state + operator thread_state() const + { + return thread::m_state.load(); + } + + // Context type doesn't need virtual destructor + ~named_thread() + { + // Notify thread if not detached or terminated + if (thread::m_state.compare_and_swap_test(thread_state::created, thread_state::aborting)) + { + // Additional notification if on_stop() method exists + if constexpr (thread_on_stop()) + { + Context::on_stop(); + } + + thread::notify(); + thread::join(); + } + } +}; + +// Old named_thread +class old_thread +{ + // Pointer to managed resource (shared with actual thread) + std::shared_ptr m_thread; + +public: + old_thread(); + + virtual ~old_thread(); + + old_thread(const old_thread&) = delete; + + old_thread& operator=(const old_thread&) = delete; // Get thread name virtual std::string get_name() const; @@ -314,8 +521,7 @@ public: m_thread->join(); } - // Access thread_ctrl - thread_ctrl* get() const + thread_base* get() const { return m_thread.get(); } @@ -330,32 +536,3 @@ public: return m_thread->notify(); } }; - -// Wrapper for named thread, joins automatically in the destructor, can only be used in function scope -class scope_thread final -{ - std::shared_ptr m_thread; - -public: - template - scope_thread(N&& name, F&& func) - { - thread_ctrl::spawn(m_thread, std::forward(name), std::forward(func)); - } - - scope_thread(const scope_thread&) = delete; - - scope_thread& operator=(const scope_thread&) = delete; - - // Destructor with exceptions allowed - ~scope_thread() noexcept(false) - { - m_thread->join(); - } - - // Access thread_ctrl - thread_ctrl* get() const - { - return m_thread.get(); - } -}; diff --git a/rpcs3/Emu/CPU/CPUThread.h b/rpcs3/Emu/CPU/CPUThread.h index 5110627acc..8272be5d45 100644 --- a/rpcs3/Emu/CPU/CPUThread.h +++ b/rpcs3/Emu/CPU/CPUThread.h @@ -24,7 +24,7 @@ enum class cpu_flag : u32 // Flag set for pause state constexpr bs_t cpu_state_pause = cpu_flag::suspend + cpu_flag::dbg_global_pause + cpu_flag::dbg_pause; -class cpu_thread : public named_thread +class cpu_thread : public old_thread { void on_task() override final; diff --git a/rpcs3/Emu/Cell/Modules/cellAudio.cpp b/rpcs3/Emu/Cell/Modules/cellAudio.cpp index 0a1204cc29..3e5484d749 100644 --- a/rpcs3/Emu/Cell/Modules/cellAudio.cpp +++ b/rpcs3/Emu/Cell/Modules/cellAudio.cpp @@ -52,7 +52,7 @@ void audio_config::on_init(const std::shared_ptr& _this) ports[i].index = m_indexes + i; } - named_thread::on_init(_this); + old_thread::on_init(_this); } void audio_config::on_task() diff --git a/rpcs3/Emu/Cell/Modules/cellAudio.h b/rpcs3/Emu/Cell/Modules/cellAudio.h index 30f24d62ea..db30ea9228 100644 --- a/rpcs3/Emu/Cell/Modules/cellAudio.h +++ b/rpcs3/Emu/Cell/Modules/cellAudio.h @@ -119,7 +119,7 @@ struct audio_port atomic_t level_set; }; -class audio_config final : public named_thread +class audio_config final : public old_thread { void on_task() override; diff --git a/rpcs3/Emu/Cell/Modules/cellCamera.cpp b/rpcs3/Emu/Cell/Modules/cellCamera.cpp index e222e6098c..a2fd4bbdd3 100644 --- a/rpcs3/Emu/Cell/Modules/cellCamera.cpp +++ b/rpcs3/Emu/Cell/Modules/cellCamera.cpp @@ -1224,7 +1224,7 @@ void camera_thread::on_task() void camera_thread::on_init(const std::shared_ptr& _this) { - named_thread::on_init(_this); + old_thread::on_init(_this); } void camera_thread::send_attach_state(bool attached) diff --git a/rpcs3/Emu/Cell/Modules/cellCamera.h b/rpcs3/Emu/Cell/Modules/cellCamera.h index 9b02a54510..7aa5dc6dc8 100644 --- a/rpcs3/Emu/Cell/Modules/cellCamera.h +++ b/rpcs3/Emu/Cell/Modules/cellCamera.h @@ -351,7 +351,7 @@ struct CellCameraReadEx vm::bptr pbuf; }; -class camera_thread final : public named_thread +class camera_thread final : public old_thread { private: struct notify_event_data diff --git a/rpcs3/Emu/Cell/Modules/cellMic.cpp b/rpcs3/Emu/Cell/Modules/cellMic.cpp index ee07c9d343..de904932f7 100644 --- a/rpcs3/Emu/Cell/Modules/cellMic.cpp +++ b/rpcs3/Emu/Cell/Modules/cellMic.cpp @@ -11,7 +11,7 @@ LOG_CHANNEL(cellMic); void mic_thread::on_init(const std::shared_ptr& _this) { - named_thread::on_init(_this); + old_thread::on_init(_this); } void mic_thread::on_task() diff --git a/rpcs3/Emu/Cell/Modules/cellMic.h b/rpcs3/Emu/Cell/Modules/cellMic.h index 1390b0c356..cb26fe7436 100644 --- a/rpcs3/Emu/Cell/Modules/cellMic.h +++ b/rpcs3/Emu/Cell/Modules/cellMic.h @@ -54,7 +54,7 @@ const u32 bufferSize = 1; bool micInited = false; -class mic_thread final : public named_thread +class mic_thread final : public old_thread { private: void on_task() override; diff --git a/rpcs3/Emu/Cell/Modules/cellMsgDialog.cpp b/rpcs3/Emu/Cell/Modules/cellMsgDialog.cpp index 183a508247..302789c85d 100644 --- a/rpcs3/Emu/Cell/Modules/cellMsgDialog.cpp +++ b/rpcs3/Emu/Cell/Modules/cellMsgDialog.cpp @@ -242,7 +242,7 @@ s32 cellMsgDialogClose(f32 delay) { if (auto dlg = manager->get()) { - thread_ctrl::spawn("cellMsgDialogClose() Thread", [=] + thread_ctrl::make_shared("cellMsgDialogClose() Thread", [=] { while (get_system_time() < wait_until) { @@ -256,7 +256,7 @@ s32 cellMsgDialogClose(f32 delay) } dlg->close(); - }); + })->detach(); return CELL_OK; } @@ -269,7 +269,7 @@ s32 cellMsgDialogClose(f32 delay) return CELL_MSGDIALOG_ERROR_DIALOG_NOT_OPENED; } - thread_ctrl::spawn("cellMsgDialogClose() Thread", [=]() + thread_ctrl::make_shared("cellMsgDialogClose() Thread", [=]() { while (dlg->state == MsgDialogState::Open && get_system_time() < wait_until) { @@ -279,7 +279,7 @@ s32 cellMsgDialogClose(f32 delay) } dlg->on_close(CELL_MSGDIALOG_BUTTON_NONE); - }); + })->detach(); return CELL_OK; } diff --git a/rpcs3/Emu/Cell/lv2/lv2.cpp b/rpcs3/Emu/Cell/lv2/lv2.cpp index 67fa2a1122..4d01b2816b 100644 --- a/rpcs3/Emu/Cell/lv2/lv2.cpp +++ b/rpcs3/Emu/Cell/lv2/lv2.cpp @@ -1002,7 +1002,7 @@ DECLARE(lv2_obj::g_ppu); DECLARE(lv2_obj::g_pending); DECLARE(lv2_obj::g_waiting); -void lv2_obj::sleep_timeout(named_thread& thread, u64 timeout) +void lv2_obj::sleep_timeout(old_thread& thread, u64 timeout) { std::lock_guard lock(g_mutex); diff --git a/rpcs3/Emu/Cell/lv2/sys_net.cpp b/rpcs3/Emu/Cell/lv2/sys_net.cpp index 7999d9906c..fbe6662048 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_net.cpp @@ -112,7 +112,7 @@ static void network_clear_queue(ppu_thread& ppu) extern void network_thread_init() { - thread_ctrl::spawn("Network Thread", []() + thread_ctrl::make_shared("Network Thread", []() { std::vector> socklist; socklist.reserve(lv2_socket::id_count); @@ -241,7 +241,7 @@ extern void network_thread_init() CloseHandle(_eventh); WSACleanup(); #endif - }); + })->detach(); } lv2_socket::lv2_socket(lv2_socket::socket_type s) diff --git a/rpcs3/Emu/Cell/lv2/sys_sync.h b/rpcs3/Emu/Cell/lv2/sys_sync.h index 7585583937..ec4e5aeea8 100644 --- a/rpcs3/Emu/Cell/lv2/sys_sync.h +++ b/rpcs3/Emu/Cell/lv2/sys_sync.h @@ -114,7 +114,7 @@ struct lv2_obj } // Remove the current thread from the scheduling queue, register timeout - static void sleep_timeout(named_thread&, u64 timeout); + static void sleep_timeout(old_thread&, u64 timeout); static void sleep(cpu_thread& thread, u64 timeout = 0) { @@ -224,7 +224,7 @@ private: static std::deque g_pending; // Scheduler queue for timeouts (wait until -> thread) - static std::deque> g_waiting; + static std::deque> g_waiting; static void schedule_all(); }; diff --git a/rpcs3/Emu/Cell/lv2/sys_timer.h b/rpcs3/Emu/Cell/lv2/sys_timer.h index 5e561dedaf..15f058917f 100644 --- a/rpcs3/Emu/Cell/lv2/sys_timer.h +++ b/rpcs3/Emu/Cell/lv2/sys_timer.h @@ -17,7 +17,7 @@ struct sys_timer_information_t be_t pad; }; -struct lv2_timer final : public lv2_obj, public named_thread +struct lv2_timer final : public lv2_obj, public old_thread { static const u32 id_base = 0x11000000; diff --git a/rpcs3/Emu/Memory/vm.h b/rpcs3/Emu/Memory/vm.h index f9df163378..e66cf9540f 100644 --- a/rpcs3/Emu/Memory/vm.h +++ b/rpcs3/Emu/Memory/vm.h @@ -6,7 +6,6 @@ #include "Utilities/VirtualMemory.h" class shared_mutex; -class named_thread; class cpu_thread; class notifier; diff --git a/rpcs3/Emu/RSX/Overlays/overlays.h b/rpcs3/Emu/RSX/Overlays/overlays.h index cb6eea5a5e..08f5ee7a50 100644 --- a/rpcs3/Emu/RSX/Overlays/overlays.h +++ b/rpcs3/Emu/RSX/Overlays/overlays.h @@ -938,13 +938,13 @@ namespace rsx this->on_close = on_close; if (interactive) { - thread_ctrl::spawn("dialog input thread", [&] + thread_ctrl::make_shared("dialog input thread", [&] { if (auto error = run_input_loop()) { LOG_ERROR(RSX, "Dialog input loop exited with error code=%d", error); } - }); + })->detach(); } return CELL_OK; diff --git a/rpcs3/Emu/RSX/RSXThread.cpp b/rpcs3/Emu/RSX/RSXThread.cpp index f2cf151e06..55e08e81ca 100644 --- a/rpcs3/Emu/RSX/RSXThread.cpp +++ b/rpcs3/Emu/RSX/RSXThread.cpp @@ -2675,7 +2675,7 @@ namespace rsx reader_lock lock(m_mtx_task); const auto map_range = address_range::start_length(address, size); - + if (!m_invalidated_memory_range.valid()) return; @@ -2714,7 +2714,7 @@ namespace rsx std::lock_guard lock(m_mtx_task); const bool existing_range_valid = m_invalidated_memory_range.valid(); const auto unmap_range = address_range::start_length(address, size); - + if (existing_range_valid && m_invalidated_memory_range.touches(unmap_range)) { // Merge range-to-invalidate in case of consecutive unmaps diff --git a/rpcs3/Emu/RSX/RSXThread.h b/rpcs3/Emu/RSX/RSXThread.h index 308dab8a4e..09f1eaeca2 100644 --- a/rpcs3/Emu/RSX/RSXThread.h +++ b/rpcs3/Emu/RSX/RSXThread.h @@ -361,10 +361,10 @@ namespace rsx struct sampled_image_descriptor_base; - class thread : public named_thread + class thread : public old_thread { - std::shared_ptr m_vblank_thread; - std::shared_ptr m_decompiler_thread; + std::shared_ptr m_vblank_thread; + std::shared_ptr m_decompiler_thread; u64 timestamp_ctrl = 0; u64 timestamp_subvalue = 0; @@ -565,7 +565,7 @@ namespace rsx void sync(); void read_barrier(u32 memory_address, u32 memory_range); virtual void sync_hint(FIFO_hint hint) {} - + gsl::span get_raw_index_array(const std::vector >& draw_indexed_clause) const; gsl::span get_raw_vertex_buffer(const rsx::data_array_format_info&, u32 base_offset, const std::vector>& vertex_ranges) const; diff --git a/rpcs3/Emu/System.cpp b/rpcs3/Emu/System.cpp index 9328386677..46a06a429a 100644 --- a/rpcs3/Emu/System.cpp +++ b/rpcs3/Emu/System.cpp @@ -515,21 +515,16 @@ bool Emulator::InstallPkg(const std::string& path) atomic_t progress(0.); int int_progress = 0; + + // Run PKG unpacking asynchronously + named_thread worker("PKG Installer", [&] { - // Run PKG unpacking asynchronously - scope_thread worker("PKG Installer", [&] - { - if (pkg_install(path, progress)) - { - progress = 1.; - return; - } - - progress = -1.; - }); + return pkg_install(path, progress); + }); + { // Wait for the completion - while (std::this_thread::sleep_for(5ms), std::abs(progress) < 1.) + while (std::this_thread::sleep_for(5ms), worker != thread_state::finished) { // TODO: update unified progress dialog double pval = progress; @@ -544,12 +539,7 @@ bool Emulator::InstallPkg(const std::string& path) } } - if (progress >= 1.) - { - return true; - } - - return false; + return worker(); } std::string Emulator::GetEmuDir() @@ -730,7 +720,7 @@ void Emulator::Load(bool add_only) // Workaround for analyser glitches vm::falloc(0x10000, 0xf0000, vm::main); - return thread_ctrl::spawn("SPRX Loader", [this] + return thread_ctrl::make_shared("SPRX Loader", [this] { std::vector dir_queue; dir_queue.emplace_back(m_path + '/'); @@ -738,7 +728,7 @@ void Emulator::Load(bool add_only) std::vector> file_queue; file_queue.reserve(2000); - std::queue> thread_queue; + std::queue> thread_queue; const uint max_threads = std::thread::hardware_concurrency(); // Initialize progress dialog @@ -812,14 +802,12 @@ void Emulator::Load(bool add_only) std::this_thread::sleep_for(10ms); } - thread_queue.emplace(); - - thread_ctrl::spawn(thread_queue.back(), "Worker " + std::to_string(thread_queue.size()), [_prx = std::move(prx)] + thread_queue.emplace(thread_ctrl::make_shared("Worker " + std::to_string(thread_queue.size()), [_prx = std::move(prx)] { ppu_initialize(*_prx); ppu_unload_prx(*_prx); g_progr_fdone++; - }); + })); continue; } @@ -829,18 +817,9 @@ void Emulator::Load(bool add_only) g_progr_fdone++; } - // Join every thread and print exceptions + // Join every thread while (!thread_queue.empty()) { - try - { - thread_queue.front()->join(); - } - catch (const std::exception& e) - { - LOG_FATAL(LOADER, "[%s] %s thrown: %s", thread_queue.front()->get_name(), typeid(e).name(), e.what()); - } - thread_queue.pop(); } @@ -849,7 +828,7 @@ void Emulator::Load(bool add_only) { Emu.Stop(); }); - }); + })->detach(); } // Detect boot location diff --git a/rpcs3/rpcs3qt/main_window.cpp b/rpcs3/rpcs3qt/main_window.cpp index 1129738362..c29da4e302 100644 --- a/rpcs3/rpcs3qt/main_window.cpp +++ b/rpcs3/rpcs3qt/main_window.cpp @@ -462,25 +462,21 @@ void main_window::InstallPkg(const QString& dropPath, bool is_bulk) // Synchronization variable atomic_t progress(0.); + + // Run PKG unpacking asynchronously + named_thread worker("PKG Installer", [&] { - // Run PKG unpacking asynchronously - scope_thread worker("PKG Installer", [&] - { - if (pkg_install(path, progress)) - { - progress = 1.; - return; - } - - progress = -1.; - }); + return pkg_install(path, progress); + }); + { // Wait for the completion - while (std::this_thread::sleep_for(5ms), std::abs(progress) < 1.) + while (std::this_thread::sleep_for(5ms), worker != thread_state::finished) { if (pdlg.wasCanceled()) { progress -= 1.; + break; } // Update progress window @@ -490,14 +486,14 @@ void main_window::InstallPkg(const QString& dropPath, bool is_bulk) QCoreApplication::processEvents(); } - if (progress > 0.) + if (worker()) { pdlg.SetValue(pdlg.maximum()); std::this_thread::sleep_for(100ms); } } - if (progress >= 1.) + if (worker()) { m_gameListFrame->Refresh(true); LOG_SUCCESS(GENERAL, "Successfully installed %s.", fileName); @@ -574,7 +570,7 @@ void main_window::InstallPup(const QString& dropPath) atomic_t progress(0); { // Run asynchronously - scope_thread worker("Firmware Installer", [&] + named_thread worker("Firmware Installer", [&] { for (const auto& updatefilename : updatefilenames) {