1
0
mirror of https://github.com/RPCS3/rpcs3.git synced 2024-11-25 04:02:42 +01:00

atomic.cpp: implement notify callback

Notification can be very heavy, especially if we need to wake many threads.
Callback is set for cpu_thread in order to set wait flag accordingly.
This commit is contained in:
Nekotekina 2020-10-26 04:02:39 +03:00
parent 0a121e9d26
commit 6806e3d5c7
4 changed files with 94 additions and 4 deletions

View File

@ -1970,7 +1970,7 @@ bool thread_base::finalize(thread_state result_state) noexcept
void thread_base::finalize() noexcept
{
atomic_storage_futex::set_wait_callback([](const void*){ return true; });
atomic_storage_futex::set_wait_callback(nullptr);
g_tls_log_prefix = []() -> std::string { return {}; };
thread_ctrl::g_tls_this_thread = nullptr;
}

View File

@ -446,6 +446,29 @@ void cpu_thread::operator()()
return;
}
atomic_storage_futex::set_notify_callback([](const void*, u64 progress)
{
static thread_local bool wait_set = false;
cpu_thread* _cpu = get_current_cpu_thread();
// Wait flag isn't set asynchronously so this should be thread-safe
if (progress == 0 && !(_cpu->state & cpu_flag::wait))
{
// Operation just started and syscall is imminent
_cpu->state += cpu_flag::wait + cpu_flag::temp;
wait_set = true;
return;
}
if (progress == umax && std::exchange(wait_set, false))
{
// Operation finished: need to clean wait flag
verify(HERE), !_cpu->check_state();
return;
}
});
static thread_local struct thread_cleanup_t
{
cpu_thread* _this;
@ -469,6 +492,8 @@ void cpu_thread::operator()()
ptr->compare_and_swap(_this, nullptr);
}
atomic_storage_futex::set_notify_callback(nullptr);
g_fxo->get<cpu_counter>()->remove(_this, s_tls_thread_slot);
_this = nullptr;

View File

@ -41,6 +41,9 @@ static constexpr u64 one_v = Mask & (0 - Mask);
// Callback for wait() function, returns false if wait should return
static thread_local bool(*s_tls_wait_cb)(const void* data) = [](const void*){ return true; };
// Callback for notification functions for optimizations
static thread_local void(*s_tls_notify_cb)(const void* data, u64 progress) = [](const void*, u64){};
// Compare data in memory with old value, and return true if they are equal
template <bool CheckCb = true, bool CheckData = true>
static inline bool ptr_cmp(const void* data, std::size_t size, u64 old_value, u64 mask)
@ -682,11 +685,17 @@ SAFE_BUFFERS void atomic_storage_futex::wait(const void* data, std::size_t size,
}
// Platform specific wake-up function
static inline bool alert_sema(atomic_t<u32>* sema)
static inline bool alert_sema(atomic_t<u32>* sema, const void* data, u64 progress)
{
#ifdef USE_FUTEX
if (sema->load() == 1 && sema->compare_and_swap_test(1, 2))
{
if (!progress)
{
// Imminent notification
s_tls_notify_cb(data, 0);
}
// Use "wake all" arg for robustness, only 1 thread is expected
futex(sema, FUTEX_WAKE_PRIVATE, 0x7fff'ffff);
return true;
@ -713,6 +722,12 @@ static inline bool alert_sema(atomic_t<u32>* sema)
{
if (auto cond = cond_get(cond_id))
{
if (!progress)
{
// Imminent notification
s_tls_notify_cb(data, 0);
}
// Not super efficient: locking is required to avoid lost notifications
cond->mtx.lock();
cond->mtx.unlock();
@ -738,8 +753,15 @@ static inline bool alert_sema(atomic_t<u32>* sema)
// Check if tid is neither 0 nor -1
if (tid + 1 > 1 && sema->compare_and_swap_test(tid, -1))
{
if (!progress)
{
// Imminent notification
s_tls_notify_cb(data, 0);
}
if (NtAlertThreadByThreadId(tid) == NTSTATUS_SUCCESS)
{
// Could be some dead thread otherwise
return true;
}
}
@ -749,6 +771,12 @@ static inline bool alert_sema(atomic_t<u32>* sema)
if (sema->load() == 1 && sema->compare_and_swap_test(1, 2))
{
if (!progress)
{
// Imminent notification
s_tls_notify_cb(data, 0);
}
// Can wait in rare cases, which is its annoying weakness
NtReleaseKeyedEvent(nullptr, sema, 1, nullptr);
return true;
@ -764,6 +792,22 @@ void atomic_storage_futex::set_wait_callback(bool(*cb)(const void* data))
{
s_tls_wait_cb = cb;
}
else
{
s_tls_wait_cb = [](const void*){ return true; };
}
}
void atomic_storage_futex::set_notify_callback(void(*cb)(const void*, u64))
{
if (cb)
{
s_tls_notify_cb = cb;
}
else
{
s_tls_notify_cb = [](const void*, u64){};
}
}
void atomic_storage_futex::raw_notify(const void* data)
@ -785,15 +829,20 @@ void atomic_storage_futex::notify_one(const void* data)
return;
}
u64 progress = 0;
for (u64 bits = slot->sema_bits; bits; bits &= bits - 1)
{
const auto sema = &slot->sema_data[std::countr_zero(bits)];
if (alert_sema(sema))
if (alert_sema(sema, data, progress))
{
s_tls_notify_cb(data, ++progress);
break;
}
}
s_tls_notify_cb(data, -1);
}
void atomic_storage_futex::notify_all(const void* data)
@ -807,6 +856,8 @@ void atomic_storage_futex::notify_all(const void* data)
return;
}
u64 progress = 0;
#if defined(_WIN32) && !defined(USE_FUTEX)
if (!NtAlertThreadByThreadId)
{
@ -825,6 +876,12 @@ void atomic_storage_futex::notify_all(const void* data)
if (sema->load() == 1 && sema->compare_and_swap_test(1, 2))
{
// Waiters locked for notification
if (bits == copy)
{
// Notify imminent notification
s_tls_notify_cb(data, 0);
}
continue;
}
@ -847,6 +904,8 @@ void atomic_storage_futex::notify_all(const void* data)
continue;
}
s_tls_notify_cb(data, ++progress);
// Remove the bit from next stage
copy &= ~(1ull << id);
}
@ -856,8 +915,10 @@ void atomic_storage_futex::notify_all(const void* data)
for (u64 bits = copy; bits; bits &= bits - 1)
{
NtReleaseKeyedEvent(nullptr, &slot->sema_data[std::countr_zero(bits)], 1, nullptr);
s_tls_notify_cb(data, ++progress);
}
s_tls_notify_cb(data, -1);
return;
}
#endif
@ -866,9 +927,12 @@ void atomic_storage_futex::notify_all(const void* data)
{
const auto sema = &slot->sema_data[std::countr_zero(bits)];
if (alert_sema(sema))
if (alert_sema(sema, data, progress))
{
s_tls_notify_cb(data, ++progress);
continue;
}
}
s_tls_notify_cb(data, -1);
}

View File

@ -27,6 +27,7 @@ private:
public:
static void set_wait_callback(bool(*cb)(const void* data));
static void set_notify_callback(void(*cb)(const void* data, u64 progress));
static void raw_notify(const void* data);
};