1
0
mirror of https://github.com/RPCS3/rpcs3.git synced 2024-11-25 12:12:50 +01:00

LV2: Make sys_mutex and sys_lwmutex lock-free, add some busy waiting in sys_mutex_lock

This commit is contained in:
Eladash 2022-07-28 14:10:16 +03:00 committed by Ivan
parent a17a6527f6
commit 011aabe9ed
20 changed files with 402 additions and 224 deletions

View File

@ -323,8 +323,8 @@ public:
std::shared_ptr<utils::serial> optional_savestate_state;
bool interrupt_thread_executing = false;
atomic_t<ppu_thread*> next_cpu{}; // LV2 sleep queues' node link
atomic_t<ppu_thread*> next_ppu{}; // LV2 PPU running queue's node link
ppu_thread* next_cpu{}; // LV2 sleep queues' node link
ppu_thread* next_ppu{}; // LV2 PPU running queue's node link
bool ack_suspend = false;
be_t<u64>* get_stack_arg(s32 i, u64 align = alignof(u64));

View File

@ -815,7 +815,7 @@ public:
const u32 option; // sys_spu_thread_initialize option
const u32 lv2_id; // The actual id that is used by syscalls
atomic_t<spu_thread*> next_cpu{}; // LV2 thread queues' node link
spu_thread* next_cpu{}; // LV2 thread queues' node link
// Thread name
atomic_ptr<std::string> spu_tname;

View File

@ -1276,7 +1276,7 @@ void lv2_obj::sleep_unlocked(cpu_thread& thread, u64 timeout, bool notify_later)
{
ppu_log.trace("sleep() - waiting (%zu)", g_pending);
const auto [_, ok] = ppu->state.fetch_op([&](bs_t<cpu_flag>& val)
const auto [_ ,ok] = ppu->state.fetch_op([&](bs_t<cpu_flag>& val)
{
if (!(val & cpu_flag::signal))
{
@ -1289,7 +1289,7 @@ void lv2_obj::sleep_unlocked(cpu_thread& thread, u64 timeout, bool notify_later)
if (!ok)
{
ppu_log.fatal("sleep() failed (signaled) (%s)", ppu->current_function);
ppu_log.trace("sleep() failed (signaled) (%s)", ppu->current_function);
return;
}
@ -1414,21 +1414,21 @@ bool lv2_obj::awake_unlocked(cpu_thread* cpu, bool notify_later, s32 prio)
auto ppu2 = +*ppu2_next;
// Rotate current thread to the last position of the 'same prio' threads list
ppu_next->release(ppu2);
*ppu_next = ppu2;
// Exchange forward pointers
if (ppu->next_ppu != ppu2)
{
auto ppu2_val = +ppu2->next_ppu;
ppu2->next_ppu.release(+ppu->next_ppu);
ppu->next_ppu.release(ppu2_val);
ppu2_next->release(ppu);
ppu2->next_ppu = +ppu->next_ppu;
ppu->next_ppu = ppu2_val;
*ppu2_next = ppu;
}
else
{
auto ppu2_val = +ppu2->next_ppu;
ppu2->next_ppu.release(ppu);
ppu->next_ppu.release(ppu2_val);
ppu2->next_ppu = ppu;
ppu->next_ppu = ppu2_val;
}
if (i <= g_cfg.core.ppu_threads + 0u)
@ -1468,8 +1468,8 @@ bool lv2_obj::awake_unlocked(cpu_thread* cpu, bool notify_later, s32 prio)
// Use priority, also preserve FIFO order
if (!next || next->prio > static_cast<ppu_thread*>(cpu)->prio)
{
it->release(static_cast<ppu_thread*>(cpu));
static_cast<ppu_thread*>(cpu)->next_ppu.release(next);
atomic_storage<ppu_thread*>::release(static_cast<ppu_thread*>(cpu)->next_ppu, next);
atomic_storage<ppu_thread*>::release(*it, static_cast<ppu_thread*>(cpu));
break;
}
@ -1496,12 +1496,34 @@ bool lv2_obj::awake_unlocked(cpu_thread* cpu, bool notify_later, s32 prio)
if (cpu)
{
// Emplace current thread
changed_queue = emplace_thread(cpu);
if (!emplace_thread(cpu))
{
if (notify_later)
{
// notify_later includes common optimizations regarding syscalls
// one of which is to allow a lock-free version of syscalls with awake behave as semaphore post: always notifies the thread, even if it hasn't slept yet
cpu->state += cpu_flag::signal;
}
}
else
{
changed_queue = true;
}
}
else for (const auto _cpu : g_to_awake)
{
// Emplace threads from list
changed_queue |= emplace_thread(_cpu);
if (!emplace_thread(_cpu))
{
if (notify_later)
{
_cpu->state += cpu_flag::signal;
}
}
else
{
changed_queue = true;
}
}
// Remove pending if necessary
@ -1662,3 +1684,18 @@ bool lv2_obj::is_scheduler_ready()
reader_lock lock(g_mutex);
return g_to_sleep.empty();
}
bool lv2_obj::has_ppus_in_running_state()
{
auto target = atomic_storage<ppu_thread*>::load(g_ppu);
for (usz i = 0, thread_count = g_cfg.core.ppu_threads; target; target = atomic_storage<ppu_thread*>::load(target->next_ppu), i++)
{
if (i >= thread_count)
{
return true;
}
}
return false;
}

View File

@ -1,5 +1,4 @@
#include "stdafx.h"
#include "sys_cond.h"
#include "util/serialization.hpp"
#include "Emu/IdManager.h"
@ -9,6 +8,8 @@
#include "Emu/Cell/ErrorCodes.h"
#include "Emu/Cell/PPUThread.h"
#include "sys_cond.h"
#include "util/asm.hpp"
LOG_CHANNEL(sys_cond);
@ -112,7 +113,7 @@ error_code sys_cond_destroy(ppu_thread& ppu, u32 cond_id)
{
std::lock_guard lock(cond.mutex->mutex);
if (cond.sq)
if (atomic_storage<ppu_thread*>::load(cond.sq))
{
return CELL_EBUSY;
}
@ -143,7 +144,7 @@ error_code sys_cond_signal(ppu_thread& ppu, u32 cond_id)
const auto cond = idm::check<lv2_obj, lv2_cond>(cond_id, [&](lv2_cond& cond)
{
if (cond.sq)
if (atomic_storage<ppu_thread*>::load(cond.sq))
{
lv2_obj::notify_all_t notify;
@ -159,7 +160,7 @@ error_code sys_cond_signal(ppu_thread& ppu, u32 cond_id)
// TODO: Is EBUSY returned after reqeueing, on sys_cond_destroy?
if (cond.mutex->try_own(*cpu, cpu->id))
if (cond.mutex->try_own(*cpu))
{
cond.awake(cpu, true);
}
@ -183,7 +184,7 @@ error_code sys_cond_signal_all(ppu_thread& ppu, u32 cond_id)
const auto cond = idm::check<lv2_obj, lv2_cond>(cond_id, [&](lv2_cond& cond)
{
if (cond.sq)
if (atomic_storage<ppu_thread*>::load(cond.sq))
{
lv2_obj::notify_all_t notify;
@ -199,12 +200,12 @@ error_code sys_cond_signal_all(ppu_thread& ppu, u32 cond_id)
}
cpu_thread* result = nullptr;
decltype(cond.sq) sq{+cond.sq};
cond.sq.release(nullptr);
auto sq = cond.sq;
atomic_storage<ppu_thread*>::release(cond.sq, nullptr);
while (const auto cpu = cond.schedule<ppu_thread>(sq, SYS_SYNC_PRIORITY))
{
if (cond.mutex->try_own(*cpu, cpu->id))
if (cond.mutex->try_own(*cpu))
{
ensure(!std::exchange(result, cpu));
}
@ -238,7 +239,7 @@ error_code sys_cond_signal_to(ppu_thread& ppu, u32 cond_id, u32 thread_id)
return -1;
}
if (cond.sq)
if (atomic_storage<ppu_thread*>::load(cond.sq))
{
lv2_obj::notify_all_t notify;
@ -256,7 +257,7 @@ error_code sys_cond_signal_to(ppu_thread& ppu, u32 cond_id, u32 thread_id)
ensure(cond.unqueue(cond.sq, cpu));
if (cond.mutex->try_own(*cpu, cpu->id))
if (cond.mutex->try_own(*cpu))
{
cond.awake(cpu, true);
}
@ -295,7 +296,7 @@ error_code sys_cond_wait(ppu_thread& ppu, u32 cond_id, u64 timeout)
const auto cond = idm::get<lv2_obj, lv2_cond>(cond_id, [&](lv2_cond& cond) -> s64
{
if (!ppu.loaded_from_savestate && cond.mutex->owner >> 1 != ppu.id)
if (!ppu.loaded_from_savestate && atomic_storage<u32>::load(cond.mutex->control.raw().owner) != ppu.id)
{
return -1;
}
@ -307,19 +308,18 @@ error_code sys_cond_wait(ppu_thread& ppu, u32 cond_id, u64 timeout)
const u64 syscall_state = sstate.try_read<u64>().second;
sstate.clear();
if (syscall_state & 1)
{
// Mutex sleep
ensure(!cond.mutex->try_own(ppu, ppu.id));
}
else
{
// Register waiter
lv2_obj::emplace(cond.sq, &ppu);
}
if (ppu.loaded_from_savestate)
{
if (syscall_state & 1)
{
// Mutex sleep
ensure(!cond.mutex->try_own(ppu));
}
else
{
lv2_obj::emplace(cond.sq, &ppu);
}
cond.sleep(ppu, timeout, true);
return static_cast<u32>(syscall_state >> 32);
}
@ -329,9 +329,18 @@ error_code sys_cond_wait(ppu_thread& ppu, u32 cond_id, u64 timeout)
if (const auto cpu = cond.mutex->reown<ppu_thread>())
{
if (cpu->state & cpu_flag::again)
{
ppu.state += cpu_flag::again;
return 0;
}
cond.mutex->append(cpu);
}
// Register waiter
lv2_obj::emplace(cond.sq, &ppu);
// Sleep current thread and schedule mutex waiter
cond.sleep(ppu, timeout, true);
@ -344,6 +353,11 @@ error_code sys_cond_wait(ppu_thread& ppu, u32 cond_id, u64 timeout)
return CELL_ESRCH;
}
if (ppu.state & cpu_flag::again)
{
return {};
}
if (cond.ret < 0)
{
return CELL_EPERM;
@ -363,7 +377,7 @@ error_code sys_cond_wait(ppu_thread& ppu, u32 cond_id, u64 timeout)
bool mutex_sleep = false;
bool cond_sleep = false;
for (auto cpu = +cond->sq; cpu; cpu = cpu->next_cpu)
for (auto cpu = atomic_storage<ppu_thread*>::load(cond->sq); cpu; cpu = cpu->next_cpu)
{
if (cpu == &ppu)
{
@ -372,7 +386,7 @@ error_code sys_cond_wait(ppu_thread& ppu, u32 cond_id, u64 timeout)
}
}
for (auto cpu = +cond->mutex->sq; cpu; cpu = cpu->next_cpu)
for (auto cpu = atomic_storage<ppu_thread*>::load(cond->mutex->control.raw().sq); cpu; cpu = cpu->next_cpu)
{
if (cpu == &ppu)
{
@ -422,12 +436,12 @@ error_code sys_cond_wait(ppu_thread& ppu, u32 cond_id, u64 timeout)
ppu.gpr[3] = CELL_ETIMEDOUT;
// Own or requeue
if (cond->mutex->try_own(ppu, ppu.id))
if (cond->mutex->try_own(ppu))
{
break;
}
}
else if (cond->mutex->owner >> 1 == ppu.id)
else if (atomic_storage<u32>::load(cond->mutex->control.raw().owner) == ppu.id)
{
break;
}
@ -444,7 +458,7 @@ error_code sys_cond_wait(ppu_thread& ppu, u32 cond_id, u64 timeout)
}
// Verify ownership
ensure(cond->mutex->owner >> 1 == ppu.id);
ensure(atomic_storage<u32>::load(cond->mutex->control.raw().owner) == ppu.id);
// Restore the recursive value
cond->mutex->lock_count.release(static_cast<u32>(cond.ret));

View File

@ -27,7 +27,7 @@ struct lv2_cond final : lv2_obj
const u32 mtx_id;
std::shared_ptr<lv2_mutex> mutex; // Associated Mutex
atomic_t<ppu_thread*> sq{};
ppu_thread* sq{};
lv2_cond(u64 key, u64 name, u32 mtx_id, std::shared_ptr<lv2_mutex> mutex)
: key(key)

View File

@ -92,8 +92,8 @@ struct lv2_event_queue final : public lv2_obj
shared_mutex mutex;
std::deque<lv2_event> events;
atomic_t<spu_thread*> sq{};
atomic_t<ppu_thread*> pq{};
spu_thread* sq{};
ppu_thread* pq{};
lv2_event_queue(u32 protocol, s32 type, s32 size, u64 name, u64 ipc_key) noexcept;

View File

@ -349,17 +349,12 @@ error_code sys_event_flag_set(cpu_thread& cpu, u32 id, u64 bitptn)
auto get_next = [&]() -> ppu_thread*
{
if (flag->protocol != SYS_SYNC_PRIORITY)
{
return std::exchange(first, first ? +first->next_cpu : nullptr);
}
s32 prio = smax;
ppu_thread* it{};
for (auto ppu = first; ppu; ppu = ppu->next_cpu)
{
if (!ppu->gpr[7] && ppu->prio < prio)
if (!ppu->gpr[7] && (flag->protocol != SYS_SYNC_PRIORITY || ppu->prio <= prio))
{
it = ppu;
prio = ppu->prio;
@ -404,12 +399,12 @@ error_code sys_event_flag_set(cpu_thread& cpu, u32 id, u64 bitptn)
// Remove waiters
for (auto next_cpu = &flag->sq; *next_cpu;)
{
auto& ppu = *+*next_cpu;
auto& ppu = **next_cpu;
if (ppu.gpr[3] == CELL_OK)
{
next_cpu->release(+ppu.next_cpu);
ppu.next_cpu.release(nullptr);
atomic_storage<ppu_thread*>::release(*next_cpu, ppu.next_cpu);
ppu.next_cpu = nullptr;
flag->append(&ppu);
continue;
}
@ -474,7 +469,7 @@ error_code sys_event_flag_cancel(ppu_thread& ppu, u32 id, vm::ptr<u32> num)
const u64 pattern = flag->pattern;
// Signal all threads to return CELL_ECANCELED (protocol does not matter)
for (auto ppu = +flag->sq; ppu; ppu = ppu->next_cpu)
while (auto ppu = flag->schedule<ppu_thread>(flag->sq, SYS_SYNC_FIFO))
{
ppu->gpr[3] = CELL_ECANCELED;
ppu->gpr[6] = pattern;
@ -483,8 +478,6 @@ error_code sys_event_flag_cancel(ppu_thread& ppu, u32 id, vm::ptr<u32> num)
flag->append(ppu);
}
flag->sq.release(nullptr);
if (value)
{
lv2_obj::awake_all();

View File

@ -42,7 +42,7 @@ struct lv2_event_flag final : lv2_obj
shared_mutex mutex;
atomic_t<u64> pattern;
atomic_t<ppu_thread*> sq{};
ppu_thread* sq{};
lv2_event_flag(u32 protocol, u64 key, s32 type, u64 name, u64 pattern) noexcept
: protocol{static_cast<u8>(protocol)}

View File

@ -1,4 +1,4 @@
#include "stdafx.h"
#include "stdafx.h"
#include "sys_lwcond.h"
#include "Emu/IdManager.h"
@ -65,7 +65,7 @@ error_code _sys_lwcond_destroy(ppu_thread& ppu, u32 lwcond_id)
const auto cond = idm::withdraw<lv2_obj, lv2_lwcond>(lwcond_id, [&](lv2_lwcond& cond) -> CellError
{
if (cond.sq)
if (atomic_storage<ppu_thread*>::load(cond.sq))
{
return CELL_EBUSY;
}
@ -127,7 +127,7 @@ error_code _sys_lwcond_signal(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id, u6
}
}
if (cond.sq)
if (atomic_storage<ppu_thread*>::load(cond.sq))
{
lv2_obj::notify_all_t notify;
@ -160,16 +160,15 @@ error_code _sys_lwcond_signal(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id, u6
if (mode != 2)
{
ensure(!mutex->signaled);
std::lock_guard lock(mutex->mutex);
if (mode == 3 && mutex->sq) [[unlikely]]
if (mode == 3 && mutex->load_sq()) [[unlikely]]
{
// Respect ordering of the sleep queue
lv2_obj::emplace(mutex->sq, result);
result = mutex->schedule<ppu_thread>(mutex->sq, mutex->protocol);
std::lock_guard lock(mutex->mutex);
if (static_cast<ppu_thread*>(result2)->state & cpu_flag::again)
// Respect ordering of the sleep queue
mutex->try_own(result, true);
auto result2 = mutex->reown<ppu_thread>();
if (result2->state & cpu_flag::again)
{
ppu.state += cpu_flag::again;
return 0;
@ -183,7 +182,7 @@ error_code _sys_lwcond_signal(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id, u6
}
else if (mode == 1)
{
mutex->add_waiter(result);
mutex->try_own(result, true);
result = nullptr;
}
}
@ -253,7 +252,7 @@ error_code _sys_lwcond_signal_all(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id
}
}
if (cond.sq)
if (atomic_storage<ppu_thread*>::load(cond.sq))
{
lv2_obj::notify_all_t notify;
@ -270,8 +269,8 @@ error_code _sys_lwcond_signal_all(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id
}
}
decltype(cond.sq) sq{+cond.sq};
cond.sq.release(nullptr);
auto sq = cond.sq;
atomic_storage<ppu_thread*>::release(cond.sq, nullptr);
while (const auto cpu = cond.schedule<ppu_thread>(sq, cond.protocol))
{
@ -282,9 +281,7 @@ error_code _sys_lwcond_signal_all(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id
if (mode == 1)
{
ensure(!mutex->signaled);
std::lock_guard lock(mutex->mutex);
mutex->add_waiter(cpu);
mutex->try_own(cpu, true);
}
else
{
@ -353,8 +350,7 @@ error_code _sys_lwcond_queue_wait(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id
if (mutex_sleep)
{
// Special: loading state from the point of waiting on lwmutex sleep queue
std::lock_guard lock2(mutex->mutex);
lv2_obj::emplace(mutex->sq, &ppu);
mutex->try_own(&ppu, true);
}
else
{
@ -362,25 +358,22 @@ error_code _sys_lwcond_queue_wait(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id
lv2_obj::emplace(cond.sq, &ppu);
}
if (!ppu.loaded_from_savestate)
if (!ppu.loaded_from_savestate && !mutex->try_unlock(false))
{
std::lock_guard lock2(mutex->mutex);
// Process lwmutex sleep queue
if (const auto cpu = mutex->schedule<ppu_thread>(mutex->sq, mutex->protocol))
if (const auto cpu = mutex->reown<ppu_thread>())
{
if (static_cast<ppu_thread*>(cpu)->state & cpu_flag::again)
{
ensure(cond.unqueue(cond.sq, &ppu));
ppu.state += cpu_flag::again;
return;
}
cond.append(cpu);
}
else
{
mutex->signaled |= 1;
}
}
// Sleep current thread and schedule lwmutex waiter
@ -412,7 +405,7 @@ error_code _sys_lwcond_queue_wait(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id
bool mutex_sleep = false;
bool cond_sleep = false;
for (auto cpu = +mutex->sq; cpu; cpu = cpu->next_cpu)
for (auto cpu = mutex->load_sq(); cpu; cpu = cpu->next_cpu)
{
if (cpu == &ppu)
{
@ -421,7 +414,7 @@ error_code _sys_lwcond_queue_wait(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id
}
}
for (auto cpu = +mutex->sq; cpu; cpu = cpu->next_cpu)
for (auto cpu = atomic_storage<ppu_thread*>::load(cond->sq); cpu; cpu = cpu->next_cpu)
{
if (cpu == &ppu)
{
@ -472,7 +465,7 @@ error_code _sys_lwcond_queue_wait(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id
bool mutex_sleep = false;
for (auto cpu = +mutex->sq; cpu; cpu = cpu->next_cpu)
for (auto cpu = mutex->load_sq(); cpu; cpu = cpu->next_cpu)
{
if (cpu == &ppu)
{

View File

@ -31,7 +31,7 @@ struct lv2_lwcond final : lv2_obj
vm::ptr<sys_lwcond_t> control;
shared_mutex mutex;
atomic_t<ppu_thread*> sq{};
ppu_thread* sq{};
lv2_lwcond(u64 name, u32 lwid, u32 protocol, vm::ptr<sys_lwcond_t> control) noexcept
: name(std::bit_cast<be_t<u64>>(name))

View File

@ -14,13 +14,13 @@ lv2_lwmutex::lv2_lwmutex(utils::serial& ar)
: protocol(ar)
, control(ar.operator decltype(control)())
, name(ar.operator be_t<u64>())
, signaled(ar)
{
ar(lv2_control.raw().signaled);
}
void lv2_lwmutex::save(utils::serial& ar)
{
ar(protocol, control, name, signaled);
ar(protocol, control, name, lv2_control.raw().signaled);
}
error_code _sys_lwmutex_create(ppu_thread& ppu, vm::ptr<u32> lwmutex_id, u32 protocol, vm::ptr<sys_lwmutex_t> control, s32 has_name, u64 name)
@ -72,7 +72,7 @@ error_code _sys_lwmutex_destroy(ppu_thread& ppu, u32 lwmutex_id)
std::lock_guard lock(mutex.mutex);
if (mutex.sq)
if (mutex.load_sq())
{
return CELL_EBUSY;
}
@ -141,29 +141,30 @@ error_code _sys_lwmutex_lock(ppu_thread& ppu, u32 lwmutex_id, u64 timeout)
const auto mutex = idm::get<lv2_obj, lv2_lwmutex>(lwmutex_id, [&](lv2_lwmutex& mutex)
{
if (mutex.signaled.try_dec(0))
if (s32 signal = mutex.lv2_control.fetch_op([](auto& data)
{
if (data.signaled == 1)
{
data.signaled = 0;
return true;
}
return false;
}).first.signaled)
{
if (signal == smin)
{
ppu.gpr[3] = CELL_EBUSY;
}
return true;
}
lv2_obj::notify_all_t notify(ppu);
std::lock_guard lock(mutex.mutex);
auto [old, _] = mutex.signaled.fetch_op([](s32& value)
if (s32 signal = mutex.try_own(&ppu))
{
if (value)
{
value = 0;
return true;
}
return false;
});
if (old)
{
if (old == smin)
if (signal == smin)
{
ppu.gpr[3] = CELL_EBUSY;
}
@ -172,7 +173,6 @@ error_code _sys_lwmutex_lock(ppu_thread& ppu, u32 lwmutex_id, u64 timeout)
}
mutex.sleep(ppu, timeout, true);
mutex.add_waiter(&ppu);
return false;
});
@ -197,7 +197,7 @@ error_code _sys_lwmutex_lock(ppu_thread& ppu, u32 lwmutex_id, u64 timeout)
{
std::lock_guard lock(mutex->mutex);
for (auto cpu = +mutex->sq; cpu; cpu = cpu->next_cpu)
for (auto cpu = mutex->load_sq(); cpu; cpu = cpu->next_cpu)
{
if (cpu == &ppu)
{
@ -231,7 +231,7 @@ error_code _sys_lwmutex_lock(ppu_thread& ppu, u32 lwmutex_id, u64 timeout)
std::lock_guard lock(mutex->mutex);
if (!mutex->unqueue(mutex->sq, &ppu))
if (!mutex->unqueue(mutex->lv2_control.raw().sq, &ppu))
{
break;
}
@ -257,11 +257,11 @@ error_code _sys_lwmutex_trylock(ppu_thread& ppu, u32 lwmutex_id)
const auto mutex = idm::check<lv2_obj, lv2_lwmutex>(lwmutex_id, [&](lv2_lwmutex& mutex)
{
auto [_, ok] = mutex.signaled.fetch_op([](s32& value)
auto [_, ok] = mutex.lv2_control.fetch_op([](auto& data)
{
if (value & 1)
if (data.signaled & 1)
{
value = 0;
data.signaled = 0;
return true;
}
@ -292,11 +292,16 @@ error_code _sys_lwmutex_unlock(ppu_thread& ppu, u32 lwmutex_id)
const auto mutex = idm::check<lv2_obj, lv2_lwmutex>(lwmutex_id, [&](lv2_lwmutex& mutex)
{
if (mutex.try_unlock(false))
{
return;
}
lv2_obj::notify_all_t notify;
std::lock_guard lock(mutex.mutex);
if (const auto cpu = mutex.schedule<ppu_thread>(mutex.sq, mutex.protocol))
if (const auto cpu = mutex.reown<ppu_thread>())
{
if (static_cast<ppu_thread*>(cpu)->state & cpu_flag::again)
{
@ -307,8 +312,6 @@ error_code _sys_lwmutex_unlock(ppu_thread& ppu, u32 lwmutex_id)
mutex.awake(cpu, true);
return;
}
mutex.signaled |= 1;
});
if (!mutex)
@ -327,11 +330,16 @@ error_code _sys_lwmutex_unlock2(ppu_thread& ppu, u32 lwmutex_id)
const auto mutex = idm::check<lv2_obj, lv2_lwmutex>(lwmutex_id, [&](lv2_lwmutex& mutex)
{
if (mutex.try_unlock(true))
{
return;
}
lv2_obj::notify_all_t notify;
std::lock_guard lock(mutex.mutex);
if (const auto cpu = mutex.schedule<ppu_thread>(mutex.sq, mutex.protocol))
if (const auto cpu = mutex.reown<ppu_thread>(true))
{
if (static_cast<ppu_thread*>(cpu)->state & cpu_flag::again)
{
@ -343,8 +351,6 @@ error_code _sys_lwmutex_unlock2(ppu_thread& ppu, u32 lwmutex_id)
mutex.awake(cpu, true);
return;
}
mutex.signaled |= smin;
});
if (!mutex)

View File

@ -60,10 +60,17 @@ struct lv2_lwmutex final : lv2_obj
const be_t<u64> name;
shared_mutex mutex;
atomic_t<s32> signaled{0};
atomic_t<ppu_thread*> sq{};
atomic_t<s32> lwcond_waiters{0};
struct alignas(16) control_data_t
{
s32 signaled{0};
u32 reserved{};
ppu_thread* sq{};
};
atomic_t<control_data_t> lv2_control{};
lv2_lwmutex(u32 protocol, vm::ptr<sys_lwmutex_t> control, u64 name) noexcept
: protocol{static_cast<u8>(protocol)}
, control(control)
@ -74,10 +81,28 @@ struct lv2_lwmutex final : lv2_obj
lv2_lwmutex(utils::serial& ar);
void save(utils::serial& ar);
// Add a waiter
template <typename T>
void add_waiter(T* cpu)
ppu_thread* load_sq() const
{
return atomic_storage<ppu_thread*>::load(lv2_control.raw().sq);
}
template <typename T>
s32 try_own(T* cpu, bool wait_only = false)
{
const s32 signal = lv2_control.fetch_op([&](control_data_t& data)
{
if (!data.signaled)
{
cpu->next_cpu = data.sq;
data.sq = cpu;
}
else
{
ensure(!wait_only);
data.signaled = 0;
}
}).signaled;
const bool notify = lwcond_waiters.fetch_op([](s32& val)
{
if (val + 0u <= 1u << 31)
@ -92,13 +117,67 @@ struct lv2_lwmutex final : lv2_obj
return true;
}).second;
lv2_obj::emplace(sq, cpu);
if (notify)
{
// Notify lwmutex destroyer (may cause EBUSY to be returned for it)
lwcond_waiters.notify_all();
}
return signal;
}
bool try_unlock(bool unlock2)
{
if (!load_sq())
{
control_data_t old{};
old.signaled = atomic_storage<s32>::load(lv2_control.raw().signaled);
control_data_t store = old;
store.signaled |= (unlock2 ? s32{smin} : 1);
if (lv2_control.compare_and_swap_test(old, store))
{
return true;
}
}
return false;
}
template <typename T>
T* reown(bool unlock2 = false)
{
T* res{};
T* restore_next{};
lv2_control.fetch_op([&](control_data_t& data)
{
if (res)
{
res->next_cpu = restore_next;
res = nullptr;
}
if (auto sq = data.sq)
{
res = schedule<T>(data.sq, protocol);
if (sq == data.sq)
{
return false;
}
restore_next = res->next_cpu;
return true;
}
else
{
data.signaled |= (unlock2 ? s32{smin} : 1);
return true;
}
});
return res;
}
};

View File

@ -1,5 +1,4 @@
#include "stdafx.h"
#include "sys_mutex.h"
#include "Emu/IdManager.h"
#include "Emu/IPC.h"
@ -9,6 +8,8 @@
#include "util/asm.hpp"
#include "sys_mutex.h"
LOG_CHANNEL(sys_mutex);
lv2_mutex::lv2_mutex(utils::serial& ar)
@ -18,7 +19,10 @@ lv2_mutex::lv2_mutex(utils::serial& ar)
, key(ar)
, name(ar)
{
ar(lock_count, owner);
ar(lock_count, control.raw().owner);
// For backwards compatibility
control.raw().owner >>= 1;
}
std::shared_ptr<void> lv2_mutex::load(utils::serial& ar)
@ -29,7 +33,7 @@ std::shared_ptr<void> lv2_mutex::load(utils::serial& ar)
void lv2_mutex::save(utils::serial& ar)
{
ar(protocol, recursive, adaptive, key, name, lock_count, owner & -2);
ar(protocol, recursive, adaptive, key, name, lock_count, control.raw().owner << 1);
}
error_code sys_mutex_create(ppu_thread& ppu, vm::ptr<u32> mutex_id, vm::ptr<sys_mutex_attribute_t> attr)
@ -102,7 +106,7 @@ error_code sys_mutex_destroy(ppu_thread& ppu, u32 mutex_id)
{
std::lock_guard lock(mutex.mutex);
if (mutex.owner || mutex.lock_count)
if (atomic_storage<u32>::load(mutex.control.raw().owner))
{
return CELL_EBUSY;
}
@ -137,15 +141,28 @@ error_code sys_mutex_lock(ppu_thread& ppu, u32 mutex_id, u64 timeout)
const auto mutex = idm::get<lv2_obj, lv2_mutex>(mutex_id, [&](lv2_mutex& mutex)
{
CellError result = mutex.try_lock(ppu.id);
CellError result = mutex.try_lock(ppu);
if (result == CELL_EBUSY && !atomic_storage<ppu_thread*>::load(mutex.control.raw().sq))
{
// Try busy waiting a bit if advantageous
for (u32 i = 0, end = lv2_obj::has_ppus_in_running_state() ? 3 : 10; id_manager::g_mutex.is_lockable() && i < end; i++)
{
busy_wait(300);
result = mutex.try_lock(ppu);
if (!result || atomic_storage<ppu_thread*>::load(mutex.control.raw().sq))
{
break;
}
}
}
if (result == CELL_EBUSY)
{
lv2_obj::notify_all_t notify(ppu);
std::lock_guard lock(mutex.mutex);
if (mutex.try_own(ppu, ppu.id))
if (mutex.try_own(ppu))
{
result = {};
}
@ -188,7 +205,7 @@ error_code sys_mutex_lock(ppu_thread& ppu, u32 mutex_id, u64 timeout)
{
std::lock_guard lock(mutex->mutex);
for (auto cpu = +mutex->sq; cpu; cpu = cpu->next_cpu)
for (auto cpu = atomic_storage<ppu_thread*>::load(mutex->control.raw().sq); cpu; cpu = cpu->next_cpu)
{
if (cpu == &ppu)
{
@ -200,7 +217,7 @@ error_code sys_mutex_lock(ppu_thread& ppu, u32 mutex_id, u64 timeout)
break;
}
for (usz i = 0; cpu_flag::signal - ppu.state && i < 50; i++)
for (usz i = 0; cpu_flag::signal - ppu.state && i < 40; i++)
{
busy_wait(500);
}
@ -222,7 +239,7 @@ error_code sys_mutex_lock(ppu_thread& ppu, u32 mutex_id, u64 timeout)
std::lock_guard lock(mutex->mutex);
if (!mutex->unqueue(mutex->sq, &ppu))
if (!mutex->unqueue(mutex->control.raw().sq, &ppu))
{
break;
}
@ -248,7 +265,7 @@ error_code sys_mutex_trylock(ppu_thread& ppu, u32 mutex_id)
const auto mutex = idm::check<lv2_obj, lv2_mutex>(mutex_id, [&](lv2_mutex& mutex)
{
return mutex.try_lock(ppu.id);
return mutex.try_lock(ppu);
});
if (!mutex)
@ -277,7 +294,7 @@ error_code sys_mutex_unlock(ppu_thread& ppu, u32 mutex_id)
const auto mutex = idm::check<lv2_obj, lv2_mutex>(mutex_id, [&](lv2_mutex& mutex) -> CellError
{
CellError result = mutex.try_unlock(ppu.id);
auto result = mutex.try_unlock(ppu);
if (result == CELL_EBUSY)
{

View File

@ -4,6 +4,8 @@
#include "Emu/Memory/vm_ptr.h"
#include "Emu/Cell/PPUThread.h"
struct sys_mutex_attribute_t
{
be_t<u32> protocol; // SYS_SYNC_FIFO, SYS_SYNC_PRIORITY or SYS_SYNC_PRIORITY_INHERIT
@ -21,6 +23,8 @@ struct sys_mutex_attribute_t
};
};
class ppu_thread;
struct lv2_mutex final : lv2_obj
{
static const u32 id_base = 0x85000000;
@ -33,9 +37,16 @@ struct lv2_mutex final : lv2_obj
u32 cond_count = 0; // Condition Variables
shared_mutex mutex;
atomic_t<u32> owner{0};
atomic_t<u32> lock_count{0}; // Recursive Locks
atomic_t<ppu_thread*> sq{};
struct alignas(16) control_data_t
{
u32 owner{};
u32 reserved{};
ppu_thread* sq{};
};
atomic_t<control_data_t> control{};
lv2_mutex(u32 protocol, u32 recursive,u32 adaptive, u64 key, u64 name) noexcept
: protocol{static_cast<u8>(protocol)}
@ -50,11 +61,24 @@ struct lv2_mutex final : lv2_obj
static std::shared_ptr<void> load(utils::serial& ar);
void save(utils::serial& ar);
CellError try_lock(u32 id)
template <typename T>
CellError try_lock(T& cpu)
{
const u32 value = owner;
auto it = control.load();
if (value >> 1 == id)
if (!it.owner)
{
auto store = it;
store.owner = cpu.id;
if (!control.compare_and_swap_test(it, store))
{
return CELL_EBUSY;
}
return {};
}
if (it.owner == cpu.id)
{
// Recursive locking
if (recursive == SYS_SYNC_RECURSIVE)
@ -71,44 +95,34 @@ struct lv2_mutex final : lv2_obj
return CELL_EDEADLK;
}
if (value == 0)
{
if (owner.compare_and_swap_test(0, id << 1))
{
return {};
}
}
return CELL_EBUSY;
}
template <typename T>
bool try_own(T& cpu, u32 id)
bool try_own(T& cpu)
{
if (owner.fetch_op([&](u32& val)
return control.atomic_op([&](control_data_t& data)
{
if (val == 0)
if (data.owner)
{
val = id << 1;
cpu.next_cpu = data.sq;
data.sq = &cpu;
return false;
}
else
{
val |= 1;
data.owner = cpu.id;
return true;
}
}))
{
lv2_obj::emplace(sq, &cpu);
return false;
}
return true;
});
}
CellError try_unlock(u32 id)
template <typename T>
CellError try_unlock(T& cpu)
{
const u32 value = owner;
auto it = control.load();
if (value >> 1 != id)
if (it.owner != cpu.id)
{
return CELL_EPERM;
}
@ -119,9 +133,12 @@ struct lv2_mutex final : lv2_obj
return {};
}
if (value == id << 1)
if (!it.sq)
{
if (owner.compare_and_swap_test(value, 0))
auto store = it;
store.owner = 0;
if (control.compare_and_swap_test(it, store))
{
return {};
}
@ -133,25 +150,42 @@ struct lv2_mutex final : lv2_obj
template <typename T>
T* reown()
{
if (auto cpu = schedule<T>(sq, protocol))
T* res{};
T* restore_next{};
control.fetch_op([&](control_data_t& data)
{
if (cpu->state & cpu_flag::again)
if (res)
{
return static_cast<T*>(cpu);
res->next_cpu = restore_next;
res = nullptr;
}
owner = cpu->id << 1 | !!sq;
return static_cast<T*>(cpu);
}
else
{
owner = 0;
return nullptr;
}
if (auto sq = data.sq)
{
res = schedule<T>(data.sq, protocol);
if (sq == data.sq)
{
atomic_storage<u32>::release(control.raw().owner, res->id);
return false;
}
restore_next = res->next_cpu;
data.owner = res->id;
return true;
}
else
{
data.owner = 0;
return true;
}
});
return res;
}
};
class ppu_thread;
// Syscalls

View File

@ -444,14 +444,12 @@ error_code sys_rwlock_wlock(ppu_thread& ppu, u32 rw_lock_id, u64 timeout)
s64 size = 0;
// Protocol doesn't matter here since they are all enqueued anyways
for (auto cpu = +rwlock->rq; cpu; cpu = cpu->next_cpu)
while (auto cpu = rwlock->schedule<ppu_thread>(rwlock->rq, SYS_SYNC_FIFO))
{
size++;
rwlock->append(cpu);
}
rwlock->rq.release(nullptr);
rwlock->owner.atomic_op([&](s64& owner)
{
owner -= 2 * size; // Add readers to value
@ -564,13 +562,12 @@ error_code sys_rwlock_wunlock(ppu_thread& ppu, u32 rw_lock_id)
s64 size = 0;
// Protocol doesn't matter here since they are all enqueued anyways
for (auto cpu = +rwlock->rq; cpu; cpu = cpu->next_cpu)
while (auto cpu = rwlock->schedule<ppu_thread>(rwlock->rq, SYS_SYNC_FIFO))
{
size++;
rwlock->append(cpu);
}
rwlock->rq.release(nullptr);
rwlock->owner.release(-2 * static_cast<s64>(size));
lv2_obj::awake_all();
}

View File

@ -29,8 +29,8 @@ struct lv2_rwlock final : lv2_obj
shared_mutex mutex;
atomic_t<s64> owner{0};
atomic_t<ppu_thread*> rq{};
atomic_t<ppu_thread*> wq{};
ppu_thread* rq{};
ppu_thread* wq{};
lv2_rwlock(u32 protocol, u64 key, u64 name) noexcept
: protocol{static_cast<u8>(protocol)}

View File

@ -30,7 +30,7 @@ struct lv2_sema final : lv2_obj
shared_mutex mutex;
atomic_t<s32> val;
atomic_t<ppu_thread*> sq{};
ppu_thread* sq{};
lv2_sema(u32 protocol, u64 key, u64 name, s32 max, s32 value) noexcept
: protocol{static_cast<u8>(protocol)}

View File

@ -118,14 +118,14 @@ public:
// Find and remove the object from the linked list
template <typename T>
static T* unqueue(atomic_t<T*>& first, T* object, atomic_t<T*> T::* mem_ptr = &T::next_cpu)
static T* unqueue(T*& first, T* object, T* T::* mem_ptr = &T::next_cpu)
{
auto it = +first;
if (it == object)
{
first.release(+it->*mem_ptr);
(it->*mem_ptr).release(nullptr);
atomic_storage<T*>::release(first, it->*mem_ptr);
atomic_storage<T*>::release(it->*mem_ptr, nullptr);
return it;
}
@ -135,8 +135,8 @@ public:
if (next == object)
{
(it->*mem_ptr).release(+next->*mem_ptr);
(next->*mem_ptr).release(nullptr);
atomic_storage<T*>::release(it->*mem_ptr, next->*mem_ptr);
atomic_storage<T*>::release(next->*mem_ptr, nullptr);
return next;
}
@ -146,8 +146,9 @@ public:
return {};
}
// Remove an object from the linked set according to the protocol
template <typename E, typename T>
static E* schedule(atomic_t<T>& first, u32 protocol)
static E* schedule(T& first, u32 protocol)
{
auto it = static_cast<E*>(first);
@ -156,20 +157,32 @@ public:
return it;
}
auto parent_found = &first;
if (protocol == SYS_SYNC_FIFO)
{
if (it && cpu_flag::again - it->state)
while (true)
{
first.release(+it->next_cpu);
it->next_cpu.release(nullptr);
}
const auto next = +it->next_cpu;
return it;
if (next)
{
parent_found = &it->next_cpu;
it = next;
continue;
}
if (it && cpu_flag::again - it->state)
{
atomic_storage<T>::release(*parent_found, nullptr);
}
return it;
}
}
s32 prio = it->prio;
auto found = it;
auto parent_found = &first;
while (true)
{
@ -183,7 +196,8 @@ public:
const s32 _prio = static_cast<E*>(next)->prio;
if (_prio < prio)
// This condition tests for equality as well so the eraliest element to be pushed is popped
if (_prio <= prio)
{
found = next;
parent_found = &node;
@ -195,27 +209,18 @@ public:
if (cpu_flag::again - found->state)
{
parent_found->release(+found->next_cpu);
found->next_cpu.release(nullptr);
atomic_storage<T>::release(*parent_found, found->next_cpu);
atomic_storage<T>::release(found->next_cpu, nullptr);
}
return found;
}
template <typename T>
static auto emplace(atomic_t<T>& first, T object)
static void emplace(T& first, T object)
{
auto it = &first;
while (auto ptr = static_cast<T>(+*it))
{
it = &ptr->next_cpu;
}
it->release(object);
// Return parent
return it;
atomic_storage<T>::release(object->next_cpu, first);
atomic_storage<T>::release(first, object);
}
private:
@ -258,6 +263,9 @@ public:
static void set_future_sleep(cpu_thread* cpu);
static bool is_scheduler_ready();
// Must be called under IDM lock
static bool has_ppus_in_running_state();
static void cleanup();
template <typename T>
@ -538,7 +546,7 @@ private:
static thread_local std::vector<class cpu_thread*> g_to_awake;
// Scheduler queue for active PPU threads
static atomic_t<class ppu_thread*> g_ppu;
static class ppu_thread* g_ppu;
// Waiting for the response from
static u32 g_pending;

View File

@ -109,7 +109,7 @@ public:
// sys_usbd_receive_event PPU Threads
shared_mutex mutex_sq;
atomic_t<ppu_thread*> sq{};
ppu_thread* sq{};
static constexpr auto thread_name = "Usb Manager Thread"sv;
@ -642,7 +642,7 @@ error_code sys_usbd_finalize(ppu_thread& ppu, u32 handle)
usbh.is_init = false;
// Forcefully awake all waiters
for (auto cpu = +usbh.sq; cpu; cpu = cpu->next_cpu)
while (auto cpu = lv2_obj::schedule<ppu_thread>(usbh.sq, SYS_SYNC_FIFO))
{
// Special ternimation signal value
cpu->gpr[4] = 4;
@ -651,8 +651,6 @@ error_code sys_usbd_finalize(ppu_thread& ppu, u32 handle)
lv2_obj::awake(cpu);
}
usbh.sq.release(nullptr);
// TODO
return CELL_OK;
}

View File

@ -364,8 +364,9 @@ void kernel_explorer::update()
case SYS_MUTEX_OBJECT:
{
auto& mutex = static_cast<lv2_mutex&>(obj);
const auto control = mutex.control.load();
show_waiters(add_solid_node(node, qstr(fmt::format(u8"Mutex 0x%08x: “%s”, %s,%s Owner: %#x, Locks: %u, Key: %#llx, Conds: %u", id, lv2_obj::name64(mutex.name), mutex.protocol,
mutex.recursive == SYS_SYNC_RECURSIVE ? " Recursive," : "", mutex.owner >> 1, +mutex.lock_count, mutex.key, mutex.cond_count))), mutex.sq);
mutex.recursive == SYS_SYNC_RECURSIVE ? " Recursive," : "", control.owner, +mutex.lock_count, mutex.key, mutex.cond_count))), control.sq);
break;
}
case SYS_COND_OBJECT:
@ -488,6 +489,7 @@ void kernel_explorer::update()
auto& lwm = static_cast<lv2_lwmutex&>(obj);
std::string owner_str = "unknown"; // Either invalid state or the lwmutex control data was moved from
sys_lwmutex_t lwm_data{};
auto lv2_control = lwm.lv2_control.load();
if (lwm.control.try_read(lwm_data) && lwm_data.sleep_queue == id)
{
@ -513,12 +515,12 @@ void kernel_explorer::update()
}
else
{
show_waiters(add_solid_node(node, qstr(fmt::format(u8"LWMutex 0x%08x: “%s”, %s, Signal: %#x (unmapped/invalid control data at *0x%x)", id, lv2_obj::name64(lwm.name), lwm.protocol, +lwm.signaled, lwm.control))), lwm.sq);
show_waiters(add_solid_node(node, qstr(fmt::format(u8"LWMutex 0x%08x: “%s”, %s, Signal: %#x (unmapped/invalid control data at *0x%x)", id, lv2_obj::name64(lwm.name), lwm.protocol, +lv2_control.signaled, lwm.control))), lv2_control.sq);
break;
}
show_waiters(add_solid_node(node, qstr(fmt::format(u8"LWMutex 0x%08x: “%s”, %s,%s Owner: %s, Locks: %u, Signal: %#x, Control: *0x%x", id, lv2_obj::name64(lwm.name), lwm.protocol,
(lwm_data.attribute & SYS_SYNC_RECURSIVE) ? " Recursive," : "", owner_str, lwm_data.recursive_count, +lwm.signaled, lwm.control))), lwm.sq);
(lwm_data.attribute & SYS_SYNC_RECURSIVE) ? " Recursive," : "", owner_str, lwm_data.recursive_count, +lv2_control.signaled, lwm.control))), lv2_control.sq);
break;
}
case SYS_TIMER_OBJECT: