1
0
mirror of https://github.com/RPCS3/rpcs3.git synced 2024-11-22 02:32:36 +01:00

LV2: Improve IPC support (#10206)

* Remove custom event queue's IPC management of favour of universal LV2 approach.
* Move ipc_manager to FXO.
* Fix ipc_manager internal storage memory leak: deallocate entry when IPC object destroyed.
* Rewrite lv2_obj::create to be simpler (remove many duplicated code).
* Always execute lv2_obj::create under both IPC and IDM mutexes at once (not in non-atomic single-steps). Fixing potential case where concurrency can cause IDM to contain 2 or more different objects with the same IPC key with SYS_SYNC_NOT_CARE (instead of the same object).
* Do not rely on smart ptr reference count to tell if the object exists. Use similar approach as event queues as it makes error checkings accurate.
* Optimize lv2_event_port by using std::shared_ptr for queue which wasn't allowed before.
This commit is contained in:
Eladash 2021-05-07 09:58:30 +03:00 committed by GitHub
parent a043e95d24
commit 7b6482c01d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 146 additions and 240 deletions

View File

@ -9,8 +9,6 @@
LOG_CHANNEL(sys_cond);
template<> DECLARE(ipc_manager<lv2_cond, u64>::g_ipc) {};
error_code sys_cond_create(ppu_thread& ppu, vm::ptr<u32> cond_id, u32 mutex_id, vm::ptr<sys_cond_attribute_t> attr)
{
ppu.state += cpu_flag::wait;
@ -58,7 +56,8 @@ error_code sys_cond_destroy(ppu_thread& ppu, u32 cond_id)
return CELL_EBUSY;
}
cond.mutex->obj_count.atomic_op([](lv2_mutex::count_info& info){ info.cond_count--; });
cond.mutex->cond_count--;
lv2_obj::on_id_destroy(cond, cond.shared, cond.key);
return {};
});

View File

@ -40,20 +40,17 @@ struct lv2_cond final : lv2_obj
{
}
CellError on_id_create() const
CellError on_id_create()
{
if (!mutex->obj_count.fetch_op([](lv2_mutex::count_info& info)
if (mutex->exists)
{
if (info.mutex_count)
return info.cond_count++, true;
return false;
}).second)
{
// Mutex has been destroyed, cannot create conditional variable
return CELL_ESRCH;
mutex->cond_count++;
exists++;
return {};
}
return {};
// Mutex has been destroyed, cannot create conditional variable
return CELL_ESRCH;
}
};

View File

@ -11,8 +11,6 @@
LOG_CHANNEL(sys_event);
template<> DECLARE(ipc_manager<lv2_event_queue, u64>::g_ipc) {};
std::shared_ptr<lv2_event_queue> lv2_event_queue::find(u64 ipc_key)
{
if (ipc_key == SYS_EVENT_QUEUE_LOCAL)
@ -21,14 +19,7 @@ std::shared_ptr<lv2_event_queue> lv2_event_queue::find(u64 ipc_key)
return {};
}
auto queue = ipc_manager<lv2_event_queue, u64>::get(ipc_key);
if (queue && !queue->exists)
{
queue.reset();
}
return queue;
return g_fxo->get<ipc_manager<lv2_event_queue, u64>>().get(ipc_key);
}
bool lv2_event_queue::check(const std::weak_ptr<lv2_event_queue>& wkptr)
@ -93,11 +84,11 @@ CellError lv2_event_queue::send(lv2_event event)
return {};
}
error_code sys_event_queue_create(cpu_thread& cpu, vm::ptr<u32> equeue_id, vm::ptr<sys_event_queue_attribute_t> attr, u64 event_queue_key, s32 size)
error_code sys_event_queue_create(cpu_thread& cpu, vm::ptr<u32> equeue_id, vm::ptr<sys_event_queue_attribute_t> attr, u64 ipc_key, s32 size)
{
cpu.state += cpu_flag::wait;
sys_event.warning("sys_event_queue_create(equeue_id=*0x%x, attr=*0x%x, event_queue_key=0x%llx, size=%d)", equeue_id, attr, event_queue_key, size);
sys_event.warning("sys_event_queue_create(equeue_id=*0x%x, attr=*0x%x, ipc_key=0x%llx, size=%d)", equeue_id, attr, ipc_key, size);
if (size <= 0 || size > 127)
{
@ -120,42 +111,19 @@ error_code sys_event_queue_create(cpu_thread& cpu, vm::ptr<u32> equeue_id, vm::p
return CELL_EINVAL;
}
auto queue = std::make_shared<lv2_event_queue>(protocol, type, attr->name_u64, event_queue_key, size);
const u32 pshared = ipc_key == SYS_EVENT_QUEUE_LOCAL ? SYS_SYNC_NOT_PROCESS_SHARED : SYS_SYNC_NOT_PROCESS_SHARED;
constexpr u32 flags = SYS_SYNC_NEWLY_CREATED; // NOTE: This is inaccurate for multi-process
const u64 name = attr->name_u64;
CellError error = CELL_EAGAIN;
if (event_queue_key == SYS_EVENT_QUEUE_LOCAL)
if (const auto error = lv2_obj::create<lv2_event_queue>(pshared, ipc_key, flags, [&]()
{
// Not an IPC queue
if (const u32 _id = idm::import<lv2_obj, lv2_event_queue>([&]() { if ((error = queue->on_id_create())) queue.reset(); return std::move(queue); } ))
{
*equeue_id = _id;
return CELL_OK;
}
return error;
}
// Create IPC queue
if (!ipc_manager<lv2_event_queue, u64>::add(event_queue_key, [&]() -> std::shared_ptr<lv2_event_queue>
{
if (const u32 _id = idm::import<lv2_obj, lv2_event_queue>([&]() { if ((error = queue->on_id_create())) return decltype(queue){}; return queue; } ))
{
*equeue_id = _id;
return std::move(queue);
}
return nullptr;
return std::make_shared<lv2_event_queue>(protocol, type, name, ipc_key, size);
}))
{
return CELL_EEXIST;
}
if (queue)
{
return error;
}
*equeue_id = idm::last_id();
return CELL_OK;
}
@ -179,7 +147,7 @@ error_code sys_event_queue_destroy(ppu_thread& ppu, u32 equeue_id, s32 mode)
return CELL_EBUSY;
}
queue.exists--;
lv2_obj::on_id_destroy(queue, queue.key == SYS_EVENT_QUEUE_LOCAL ? SYS_SYNC_NOT_PROCESS_SHARED : SYS_SYNC_PROCESS_SHARED, queue.key);
return {};
});
@ -521,11 +489,11 @@ error_code sys_event_port_send(u32 eport_id, u64 data1, u64 data2, u64 data3)
const auto port = idm::get<lv2_obj, lv2_event_port>(eport_id, [&](lv2_event_port& port) -> CellError
{
if (const auto queue = port.queue.lock(); lv2_event_queue::check(queue))
if (lv2_event_queue::check(port.queue))
{
const u64 source = port.name ? port.name : (s64{process_getpid()} << 32) | u64{eport_id};
return queue->send(source, data1, data2, data3);
return port.queue->send(source, data1, data2, data3);
}
return CELL_ENOTCONN;

View File

@ -86,7 +86,6 @@ struct lv2_event_queue final : public lv2_obj
const u64 key;
const s32 size;
atomic_t<u32> exists = 0; // Existence validation (workaround for shared-ptr ref-counting)
shared_mutex mutex;
std::deque<lv2_event> events;
std::deque<cpu_thread*> sq;
@ -113,12 +112,6 @@ struct lv2_event_queue final : public lv2_obj
// Check queue ptr validity (use 'exists' member)
static bool check(const std::weak_ptr<lv2_event_queue>&);
static bool check(const std::shared_ptr<lv2_event_queue>&);
CellError on_id_create()
{
exists++;
return {};
}
};
struct lv2_event_port final : lv2_obj
@ -129,7 +122,7 @@ struct lv2_event_port final : lv2_obj
const u64 name; // Event source (generated from id and process id if not set)
u32 queue_id = 0; // Event queue ID (if IPC is used this value is meaningless)
std::weak_ptr<lv2_event_queue> queue; // Event queue this port is connected to
std::shared_ptr<lv2_event_queue> queue; // Event queue this port is connected to
lv2_event_port(s32 type, u64 name)
: type(type)

View File

@ -11,8 +11,6 @@
LOG_CHANNEL(sys_event_flag);
template<> DECLARE(ipc_manager<lv2_event_flag, u64>::g_ipc) {};
error_code sys_event_flag_create(ppu_thread& ppu, vm::ptr<u32> id, vm::ptr<sys_event_flag_attribute_t> attr, u64 init)
{
ppu.state += cpu_flag::wait;
@ -73,6 +71,7 @@ error_code sys_event_flag_destroy(ppu_thread& ppu, u32 id)
return CELL_EBUSY;
}
lv2_obj::on_id_destroy(flag, flag.shared, flag.key);
return {};
});

View File

@ -12,10 +12,12 @@
LOG_CHANNEL(sys_mmapper);
lv2_memory::lv2_memory(u32 size, u32 align, u64 flags, lv2_memory_container* ct)
lv2_memory::lv2_memory(u32 size, u32 align, u64 flags, u64 key, u32 pshared, lv2_memory_container* ct)
: size(size)
, align(align)
, flags(flags)
, key(key)
, pshared(pshared)
, ct(ct)
, shm(std::make_shared<utils::shm>(size, 1 /* shareable flag */))
{
@ -25,17 +27,18 @@ lv2_memory::lv2_memory(u32 size, u32 align, u64 flags, lv2_memory_container* ct)
#endif
}
template<> DECLARE(ipc_manager<lv2_memory, u64>::g_ipc) {};
template <bool exclusive = false>
error_code create_lv2_shm(bool pshared, u64 ipc_key, u64 size, u32 align, u64 flags, lv2_memory_container* ct)
{
if (auto error = lv2_obj::create<lv2_memory>(pshared ? SYS_SYNC_PROCESS_SHARED : SYS_SYNC_NOT_PROCESS_SHARED, ipc_key, exclusive ? SYS_SYNC_NEWLY_CREATED : SYS_SYNC_NOT_CARE, [&]()
const u32 _pshared = pshared ? SYS_SYNC_PROCESS_SHARED : SYS_SYNC_NOT_PROCESS_SHARED;
if (auto error = lv2_obj::create<lv2_memory>(_pshared, ipc_key, exclusive ? SYS_SYNC_NEWLY_CREATED : SYS_SYNC_NOT_CARE, [&]()
{
return std::make_shared<lv2_memory>(
static_cast<u32>(size),
align,
flags,
ipc_key,
_pshared,
ct);
}, false))
{
@ -525,6 +528,7 @@ error_code sys_mmapper_free_shared_memory(ppu_thread& ppu, u32 mem_id)
return CELL_EBUSY;
}
lv2_obj::on_id_destroy(mem, mem.pshared, mem.key);
return {};
});

View File

@ -21,12 +21,14 @@ struct lv2_memory : lv2_obj
const u32 size; // Memory size
const u32 align; // Alignment required
const u64 flags;
const u64 key; // IPC key
const u32 pshared;
lv2_memory_container* const ct; // Associated memory container
const std::shared_ptr<utils::shm> shm;
atomic_t<u32> counter{0};
lv2_memory(u32 size, u32 align, u64 flags, lv2_memory_container* ct);
lv2_memory(u32 size, u32 align, u64 flags, u64 key, u32 pshared, lv2_memory_container* ct);
};
enum : u64

View File

@ -9,8 +9,6 @@
LOG_CHANNEL(sys_mutex);
template<> DECLARE(ipc_manager<lv2_mutex, u64>::g_ipc) {};
error_code sys_mutex_create(ppu_thread& ppu, vm::ptr<u32> mutex_id, vm::ptr<sys_mutex_attribute_t> attr)
{
ppu.state += cpu_flag::wait;
@ -87,21 +85,12 @@ error_code sys_mutex_destroy(ppu_thread& ppu, u32 mutex_id)
return CELL_EBUSY;
}
if (!mutex.obj_count.fetch_op([](lv2_mutex::count_info& info)
{
if (info.cond_count)
{
return false;
}
// Decrement mutex copies count
info.mutex_count--;
return true;
}).second)
if (mutex.cond_count)
{
return CELL_EPERM;
}
lv2_obj::on_id_destroy(mutex, mutex.shared, mutex.key);
return {};
});

View File

@ -32,16 +32,10 @@ struct lv2_mutex final : lv2_obj
const u64 key;
const u64 name;
struct alignas(8) count_info
{
u32 mutex_count; // Mutex copies count (0 means doesn't exist anymore)
u32 cond_count; // Condition Variables
};
u32 cond_count = 0; // Condition Variables
shared_mutex mutex;
atomic_t<u32> owner{0};
atomic_t<u32> lock_count{0}; // Recursive Locks
atomic_t<count_info> obj_count{};
std::deque<cpu_thread*> sq;
lv2_mutex(u32 protocol, u32 recursive, u32 shared, u32 adaptive, u64 key, u64 name)
@ -54,12 +48,6 @@ struct lv2_mutex final : lv2_obj
{
}
CellError on_id_create()
{
obj_count.atomic_op([](count_info& info){ info.mutex_count++; });
return {};
}
CellError try_lock(u32 id)
{
const u32 value = owner;

View File

@ -9,8 +9,6 @@
LOG_CHANNEL(sys_rwlock);
template<> DECLARE(ipc_manager<lv2_rwlock, u64>::g_ipc) {};
error_code sys_rwlock_create(ppu_thread& ppu, vm::ptr<u32> rw_lock_id, vm::ptr<sys_rwlock_attribute_t> attr)
{
ppu.state += cpu_flag::wait;
@ -57,6 +55,7 @@ error_code sys_rwlock_destroy(ppu_thread& ppu, u32 rw_lock_id)
return CELL_EBUSY;
}
lv2_obj::on_id_destroy(rw, rw.shared, rw.key);
return {};
});

View File

@ -9,8 +9,6 @@
LOG_CHANNEL(sys_semaphore);
template<> DECLARE(ipc_manager<lv2_sema, u64>::g_ipc) {};
error_code sys_semaphore_create(ppu_thread& ppu, vm::ptr<u32> sem_id, vm::ptr<sys_semaphore_attribute_t> attr, s32 initial_val, s32 max_val)
{
ppu.state += cpu_flag::wait;
@ -68,6 +66,7 @@ error_code sys_semaphore_destroy(ppu_thread& ppu, u32 sem_id)
return CELL_EBUSY;
}
lv2_obj::on_id_destroy(sema, sema.shared, sema.key);
return {};
});

View File

@ -77,13 +77,17 @@ private:
};
// Function executed under IDM mutex, error will make the object creation fail and the error will be returned
static CellError on_id_create()
CellError on_id_create()
{
exists++;
return {};
}
public:
// Existence validation (workaround for shared-ptr ref-counting)
atomic_t<u32> exists = 0;
static std::string name64(u64 name_u64)
{
const auto ptr = reinterpret_cast<const char*>(&name_u64);
@ -198,95 +202,95 @@ public:
{
case SYS_SYNC_NEWLY_CREATED:
case SYS_SYNC_NOT_CARE:
{
std::shared_ptr<T> result = make();
CellError error{};
if (!ipc_manager<T, u64>::add(ipc_key, [&]()
{
if (!idm::import<lv2_obj, T>([&]()
{
if (result && (error = result->on_id_create()))
result.reset();
return result;
}))
{
result.reset();
}
return result;
}, &result))
{
if (error)
{
return error;
}
if (flags == SYS_SYNC_NEWLY_CREATED)
{
return CELL_EEXIST;
}
error = CELL_EAGAIN;
if (!idm::import<lv2_obj, T>([&]() { if (result && (error = result->on_id_create())) result.reset(); return std::move(result); }))
{
return error;
}
return CELL_OK;
}
else if (!result)
{
return error ? CELL_EAGAIN : error;
}
else
{
return CELL_OK;
}
}
case SYS_SYNC_NOT_CREATE:
{
auto result = ipc_manager<T, u64>::get(ipc_key);
if (!result)
{
return CELL_ESRCH;
}
CellError error = CELL_EAGAIN;
if (!idm::import<lv2_obj, T>([&]() { if (result && (error = result->on_id_create())) result.reset(); return std::move(result); }))
{
return error;
}
return CELL_OK;
}
default:
{
return CELL_EINVAL;
break;
}
default: return CELL_EINVAL;
}
break;
}
case SYS_SYNC_NOT_PROCESS_SHARED:
{
std::shared_ptr<T> result = make();
break;
}
default: return CELL_EINVAL;
}
CellError error = CELL_EAGAIN;
std::shared_ptr<T> result = make();
if (!idm::import<lv2_obj, T>([&]() { if (result && (error = result->on_id_create())) result.reset(); return std::move(result); }))
// EAGAIN for IDM IDs shortage
CellError error = CELL_EAGAIN;
if (!idm::import<lv2_obj, T>([&]() -> std::shared_ptr<T>
{
auto finalize_construct = [&]() -> std::shared_ptr<T>
{
return error;
if ((error = result->on_id_create()))
{
result.reset();
}
return std::move(result);
};
if (flags != SYS_SYNC_PROCESS_SHARED)
{
// Creation of unique (non-shared) object handle
return finalize_construct();
}
return CELL_OK;
}
default:
auto& ipc_container = g_fxo->get<ipc_manager<T, u64>>();
if (flags == SYS_SYNC_NOT_CREATE)
{
result = ipc_container.get(ipc_key);
if (!result)
{
error = CELL_ESRCH;
return result;
}
// Run on_id_create() on existing object
return finalize_construct();
}
bool added = false;
std::tie(added, result) = ipc_container.add(ipc_key, finalize_construct, flags != SYS_SYNC_NEWLY_CREATED);
if (!added)
{
if (flags == SYS_SYNC_NEWLY_CREATED)
{
// Object already exists but flags does not allow it
error = CELL_EEXIST;
// We specified we do not want to peek pointer's value, result must be empty
AUDIT(!result);
return result;
}
// Run on_id_create() on existing object
return finalize_construct();
}
return std::move(result);
}))
{
return CELL_EINVAL;
return error;
}
return CELL_OK;
}
template <typename T>
static void on_id_destroy(T& obj, u32 pshared, u64 ipc_key)
{
if (obj.exists-- == 1u && pshared == SYS_SYNC_PROCESS_SHARED)
{
g_fxo->get<ipc_manager<T, u64>>().remove(ipc_key);
}
}

View File

@ -6,99 +6,65 @@
#include "Utilities/mutex.h"
// IPC manager for objects of type T and IPC keys of type K.
// External declaration of g_ipc is required.
template <typename T, typename K>
class ipc_manager final
{
std::unordered_map<K, std::weak_ptr<T>> m_map;
std::unordered_map<K, std::shared_ptr<T>> m_map;
shared_mutex m_mutex;
static ipc_manager g_ipc;
mutable shared_mutex m_mutex;
public:
// Add new object if specified ipc_key is not used
// .first: added new object?, .second: what's at m_map[key] after this function if (peek_ptr || added new object) is true
template <typename F>
static bool add(const K& ipc_key, F&& provider, std::shared_ptr<T>* out = nullptr)
std::pair<bool, std::shared_ptr<T>> add(const K& ipc_key, F&& provider, bool peek_ptr = true)
{
std::lock_guard lock(g_ipc.m_mutex);
std::lock_guard lock(m_mutex);
// Get object location
std::weak_ptr<T>& wptr = g_ipc.m_map[ipc_key];
std::shared_ptr<T>& ptr = m_map[ipc_key];
const bool existed = ptr.operator bool();
std::shared_ptr<T> old;
if ((out && !(old = wptr.lock())) || wptr.expired())
if (!existed)
{
// Call a function which must return the object
if (out)
{
*out = provider();
wptr = *out;
}
else
{
wptr = provider();
}
return true;
ptr = provider();
}
if (out)
{
*out = std::move(old);
}
return false;
const bool added = !existed && ptr;
return {added, (peek_ptr || added) ? ptr : nullptr};
}
// Unregister specified ipc_key, may return true even if the object doesn't exist anymore
static bool remove(const K& ipc_key)
bool remove(const K& ipc_key)
{
std::lock_guard lock(g_ipc.m_mutex);
std::lock_guard lock(m_mutex);
return g_ipc.m_map.erase(ipc_key) != 0;
}
// Unregister specified ipc_key, return the object
static std::shared_ptr<T> withdraw(const K& ipc_key)
{
std::lock_guard lock(g_ipc.m_mutex);
const auto found = g_ipc.m_map.find(ipc_key);
if (found != g_ipc.m_map.end())
{
auto ptr = found->second.lock();
g_ipc.m_map.erase(found);
return ptr;
}
return nullptr;
return m_map.erase(ipc_key) != 0;
}
// Get object with specified ipc_key
static std::shared_ptr<T> get(const K& ipc_key)
std::shared_ptr<T> get(const K& ipc_key) const
{
reader_lock lock(g_ipc.m_mutex);
reader_lock lock(m_mutex);
const auto found = g_ipc.m_map.find(ipc_key);
const auto found = m_map.find(ipc_key);
if (found != g_ipc.m_map.end())
if (found != m_map.end())
{
return found->second.lock();
return found->second;
}
return nullptr;
}
// Check whether the object actually exists
static bool check(const K& ipc_key)
bool check(const K& ipc_key) const
{
reader_lock lock(g_ipc.m_mutex);
reader_lock lock(m_mutex);
const auto found = g_ipc.m_map.find(ipc_key);
const auto found = m_map.find(ipc_key);
return found != g_ipc.m_map.end() && !found->second.expired();
return found != m_map.end();
}
};

View File

@ -321,7 +321,7 @@ void kernel_explorer::Update()
{
auto& mutex = static_cast<lv2_mutex&>(obj);
add_leaf(node, qstr(fmt::format(u8"Mutex 0x%08x: “%s”, %s,%s Owner: %#x, Locks: %u, Key: %#llx, Conds: %u, Wq: %zu", id, lv2_obj::name64(mutex.name), mutex.protocol,
mutex.recursive == SYS_SYNC_RECURSIVE ? " Recursive," : "", mutex.owner >> 1, +mutex.lock_count, mutex.key, mutex.obj_count.load().cond_count, mutex.sq.size())));
mutex.recursive == SYS_SYNC_RECURSIVE ? " Recursive," : "", mutex.owner >> 1, +mutex.lock_count, mutex.key, mutex.cond_count, mutex.sq.size())));
break;
}
case SYS_COND_OBJECT:
@ -360,19 +360,18 @@ void kernel_explorer::Update()
case SYS_EVENT_PORT_OBJECT:
{
auto& ep = static_cast<lv2_event_port&>(obj);
const auto queue = ep.queue.lock();
const auto type = ep.type == SYS_EVENT_PORT_LOCAL ? "LOCAL"sv : "IPC"sv;
if (lv2_event_queue::check(queue))
if (const auto queue = ep.queue.get(); queue && queue->exists)
{
if (queue.get() == idm::check_unlocked<lv2_obj, lv2_event_queue>(ep.queue_id))
if (queue == idm::check_unlocked<lv2_obj, lv2_event_queue>(ep.queue_id))
{
// Type must be LOCAL here, but refer to the note below for why it is showed
add_leaf(node, qstr(fmt::format("Event Port 0x%08x: %s, Name: %#llx, Event Queue (ID): 0x%08x", id, type, ep.name, ep.queue_id)));
break;
}
if (queue == lv2_event_queue::find(queue->key))
if (queue == lv2_event_queue::find(queue->key).get())
{
// There are cases in which the attached queue by ID also has an IPC
// And the ID was destroyed but another was created for that same IPC