From 15de869a9c40f9977587535ff416cb79955fe400 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Tue, 16 Apr 2024 17:45:56 +0200 Subject: [PATCH] CommandQueueMT: Optimize & fix handling of sync/ret commands --- core/templates/command_queue_mt.cpp | 29 ------------ core/templates/command_queue_mt.h | 73 ++++++++++++----------------- 2 files changed, 30 insertions(+), 72 deletions(-) diff --git a/core/templates/command_queue_mt.cpp b/core/templates/command_queue_mt.cpp index 0c5c6394a1a..d9e5e0b2178 100644 --- a/core/templates/command_queue_mt.cpp +++ b/core/templates/command_queue_mt.cpp @@ -41,35 +41,6 @@ void CommandQueueMT::unlock() { mutex.unlock(); } -void CommandQueueMT::wait_for_flush() { - // wait one millisecond for a flush to happen - OS::get_singleton()->delay_usec(1000); -} - -CommandQueueMT::SyncSemaphore *CommandQueueMT::_alloc_sync_sem() { - int idx = -1; - - while (true) { - lock(); - for (int i = 0; i < SYNC_SEMAPHORES; i++) { - if (!sync_sems[i].in_use) { - sync_sems[i].in_use = true; - idx = i; - break; - } - } - unlock(); - - if (idx == -1) { - wait_for_flush(); - } else { - break; - } - } - - return &sync_sems[idx]; -} - CommandQueueMT::CommandQueueMT() { } diff --git a/core/templates/command_queue_mt.h b/core/templates/command_queue_mt.h index a4ac338bed6..c149861467d 100644 --- a/core/templates/command_queue_mt.h +++ b/core/templates/command_queue_mt.h @@ -32,9 +32,9 @@ #define COMMAND_QUEUE_MT_H #include "core/object/worker_thread_pool.h" +#include "core/os/condition_variable.h" #include "core/os/memory.h" #include "core/os/mutex.h" -#include "core/os/semaphore.h" #include "core/string/print_string.h" #include "core/templates/local_vector.h" #include "core/templates/simple_type.h" @@ -251,14 +251,14 @@ #define DECL_PUSH(N) \ template \ void push(T *p_instance, M p_method COMMA(N) COMMA_SEP_LIST(PARAM, N)) { \ - CMD_TYPE(N) *cmd = allocate_and_lock(); \ + MutexLock mlock(mutex); \ + CMD_TYPE(N) *cmd = allocate(); \ cmd->instance = p_instance; \ cmd->method = p_method; \ SEMIC_SEP_LIST(CMD_ASSIGN_PARAM, N); \ if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) { \ WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id); \ } \ - unlock(); \ } #define CMD_RET_TYPE(N) CommandRet##N @@ -266,19 +266,17 @@ #define DECL_PUSH_AND_RET(N) \ template \ void push_and_ret(T *p_instance, M p_method, COMMA_SEP_LIST(PARAM, N) COMMA(N) R *r_ret) { \ - SyncSemaphore *ss = _alloc_sync_sem(); \ - CMD_RET_TYPE(N) *cmd = allocate_and_lock(); \ + MutexLock mlock(mutex); \ + CMD_RET_TYPE(N) *cmd = allocate(); \ cmd->instance = p_instance; \ cmd->method = p_method; \ SEMIC_SEP_LIST(CMD_ASSIGN_PARAM, N); \ cmd->ret = r_ret; \ - cmd->sync_sem = ss; \ if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) { \ WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id); \ } \ - unlock(); \ - ss->sem.wait(); \ - ss->in_use = false; \ + sync_tail++; \ + _wait_for_sync(mlock); \ } #define CMD_SYNC_TYPE(N) CommandSync##N @@ -286,39 +284,31 @@ #define DECL_PUSH_AND_SYNC(N) \ template \ void push_and_sync(T *p_instance, M p_method COMMA(N) COMMA_SEP_LIST(PARAM, N)) { \ - SyncSemaphore *ss = _alloc_sync_sem(); \ - CMD_SYNC_TYPE(N) *cmd = allocate_and_lock(); \ + MutexLock mlock(mutex); \ + CMD_SYNC_TYPE(N) *cmd = allocate(); \ cmd->instance = p_instance; \ cmd->method = p_method; \ SEMIC_SEP_LIST(CMD_ASSIGN_PARAM, N); \ - cmd->sync_sem = ss; \ if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) { \ WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id); \ } \ - unlock(); \ - ss->sem.wait(); \ - ss->in_use = false; \ + sync_tail++; \ + _wait_for_sync(mlock); \ } #define MAX_CMD_PARAMS 15 class CommandQueueMT { - struct SyncSemaphore { - Semaphore sem; - bool in_use = false; - }; - struct CommandBase { + bool sync = false; virtual void call() = 0; - virtual SyncSemaphore *get_sync_semaphore() { return nullptr; } virtual ~CommandBase() = default; // Won't be called. }; struct SyncCommand : public CommandBase { - SyncSemaphore *sync_sem = nullptr; - - virtual SyncSemaphore *get_sync_semaphore() override { - return sync_sem; + virtual void call() override {} + SyncCommand() { + sync = true; } }; @@ -340,9 +330,11 @@ class CommandQueueMT { SYNC_SEMAPHORES = 8 }; + BinaryMutex mutex; LocalVector command_mem; - SyncSemaphore sync_sems[SYNC_SEMAPHORES]; - Mutex mutex; + ConditionVariable sync_cond_var; + uint32_t sync_head = 0; + uint32_t sync_tail = 0; WorkerThreadPool::TaskID pump_task_id = WorkerThreadPool::INVALID_TASK_ID; uint64_t flush_read_ptr = 0; @@ -357,32 +349,23 @@ class CommandQueueMT { return cmd; } - template - T *allocate_and_lock() { - lock(); - T *ret = allocate(); - return ret; - } - void _flush() { - lock(); - if (unlikely(flush_read_ptr)) { // Re-entrant call. - unlock(); return; } + lock(); + WorkerThreadPool::thread_enter_command_queue_mt_flush(this); while (flush_read_ptr < command_mem.size()) { uint64_t size = *(uint64_t *)&command_mem[flush_read_ptr]; flush_read_ptr += 8; CommandBase *cmd = reinterpret_cast(&command_mem[flush_read_ptr]); - - SyncSemaphore *sync_sem = cmd->get_sync_semaphore(); cmd->call(); - if (sync_sem) { - sync_sem->sem.post(); // Release in case it needs sync/ret. + if (unlikely(cmd->sync)) { + sync_head++; + sync_cond_var.notify_all(); } flush_read_ptr += size; @@ -394,8 +377,12 @@ class CommandQueueMT { unlock(); } - void wait_for_flush(); - SyncSemaphore *_alloc_sync_sem(); + _FORCE_INLINE_ void _wait_for_sync(MutexLock &p_lock) { + uint32_t sync_head_goal = sync_tail; + do { + sync_cond_var.wait(p_lock); + } while (sync_head != sync_head_goal); // Can't use lower-than because of wraparound. + } public: void lock();