From 9444d297ed0b1dbc7c05fa0bf2e06241335f5057 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Fri, 5 Jan 2024 17:39:26 +0100 Subject: [PATCH 1/3] WorkerThreadPool: Overhaul scheduling and synchronization This commits rewrites the sync logic in a way that the `use_system_threads_for_low_priority_tasks` setting, which was added due to the lack of a cross-platform wait-for-multiple-objects functionality, can be removed (it's as if it was effectively hardcoded to `false`). With the new implementation, we have the best of both worlds: threads don't have to poll, plus no bespoke threads are used. In addition, regarding deadlock prevention, since not every possible case of wait-deadlock could be avoided, this commits removes the current best-effort avoidance mechanisms and keeps only a simple, pessimistic way of detection. It turns out that the only current user of deadlock prevention, ResourceLoader, works fine with it and so every possible situation in resource loading is now properly handled, with no possibilities of deadlocking. There's a comment in the code with further details. Lastly, a potential for load tasks never being awaited/disposed is cleared. --- core/io/resource_loader.cpp | 12 +- core/object/worker_thread_pool.cpp | 496 +++++++++++++++-------------- core/object/worker_thread_pool.h | 34 +- core/os/condition_variable.h | 2 + core/os/semaphore.h | 8 +- core/register_core_types.cpp | 1 - doc/classes/ProjectSettings.xml | 2 - doc/classes/WorkerThreadPool.xml | 2 +- main/main.cpp | 14 +- 9 files changed, 293 insertions(+), 278 deletions(-) diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index 0c7764392ac..ec768eb4b71 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -641,15 +641,16 @@ Ref ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro if (load_task.task_id != 0) { // Loading thread is in the worker pool. - load_task.awaited = true; thread_load_mutex.unlock(); Error err = WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id); if (err == ERR_BUSY) { - // The WorkerThreadPool has scheduled tasks in a way that the current load depends on - // another one in a lower stack frame. Restart such load here. When the stack is eventually - // unrolled, the original load will have been notified to go on. + // The WorkerThreadPool has reported that the current task wants to await on an older one. + // That't not allowed for safety, to avoid deadlocks. Fortunately, though, in the context of + // resource loading that means that the task to wait for can be restarted here to break the + // cycle, with as much recursion into this process as needed. + // When the stack is eventually unrolled, the original load will have been notified to go on. #ifdef DEV_ENABLED - print_verbose("ResourceLoader: Load task happened to wait on another one deep in the call stack. Attempting to avoid deadlock by re-issuing the load now."); + print_verbose("ResourceLoader: Potential for deadlock detected in task dependency. Attempting to avoid it by re-issuing the load now."); #endif // CACHE_MODE_IGNORE is needed because, otherwise, the new request would just see there's // an ongoing load for that resource and wait for it again. This value forces a new load. @@ -663,6 +664,7 @@ Ref ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro } else { DEV_ASSERT(err == OK); thread_load_mutex.lock(); + load_task.awaited = true; } } else { // Loading thread is main or user thread. diff --git a/core/object/worker_thread_pool.cpp b/core/object/worker_thread_pool.cpp index 631767219fa..b93c3270048 100644 --- a/core/object/worker_thread_pool.cpp +++ b/core/object/worker_thread_pool.cpp @@ -43,24 +43,15 @@ void WorkerThreadPool::Task::free_template_userdata() { WorkerThreadPool *WorkerThreadPool::singleton = nullptr; -void WorkerThreadPool::_process_task_queue() { - task_mutex.lock(); - Task *task = task_queue.first()->self(); - task_queue.remove(task_queue.first()); - task_mutex.unlock(); - _process_task(task); -} - void WorkerThreadPool::_process_task(Task *p_task) { - bool low_priority = p_task->low_priority; - int pool_thread_index = -1; - Task *prev_low_prio_task = nullptr; // In case this is recursively called. + int pool_thread_index = thread_ids[Thread::get_caller_id()]; + ThreadData &curr_thread = threads[pool_thread_index]; + Task *prev_task = nullptr; // In case this is recursively called. + bool safe_for_nodes_backup = is_current_thread_safe_for_nodes(); - if (!use_native_low_priority_threads) { + { // Tasks must start with this unset. They are free to set-and-forget otherwise. set_current_thread_safe_for_nodes(false); - pool_thread_index = thread_ids[Thread::get_caller_id()]; - ThreadData &curr_thread = threads[pool_thread_index]; // Since the WorkerThreadPool is started before the script server, // its pre-created threads can't have ScriptServer::thread_enter() called on them early. // Therefore, we do it late at the first opportunity, so in case the task @@ -71,13 +62,8 @@ void WorkerThreadPool::_process_task(Task *p_task) { } task_mutex.lock(); p_task->pool_thread_index = pool_thread_index; - if (low_priority) { - low_priority_tasks_running++; - prev_low_prio_task = curr_thread.current_low_prio_task; - curr_thread.current_low_prio_task = p_task; - } else { - curr_thread.current_low_prio_task = nullptr; - } + prev_task = curr_thread.current_task; + curr_thread.current_task = p_task; task_mutex.unlock(); } @@ -111,33 +97,24 @@ void WorkerThreadPool::_process_task(Task *p_task) { memdelete(p_task->template_userdata); // This is no longer needed at this point, so get rid of it. } - if (low_priority && use_native_low_priority_threads) { - p_task->completed = true; - p_task->done_semaphore.post(); - if (do_post) { - p_task->group->completed.set_to(true); - } - } else { - if (do_post) { - p_task->group->done_semaphore.post(); - p_task->group->completed.set_to(true); - } - uint32_t max_users = p_task->group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment. - uint32_t finished_users = p_task->group->finished.increment(); - - if (finished_users == max_users) { - // Get rid of the group, because nobody else is using it. - task_mutex.lock(); - group_allocator.free(p_task->group); - task_mutex.unlock(); - } - - // For groups, tasks get rid of themselves. + if (do_post) { + p_task->group->done_semaphore.post(); + p_task->group->completed.set_to(true); + } + uint32_t max_users = p_task->group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment. + uint32_t finished_users = p_task->group->finished.increment(); + if (finished_users == max_users) { + // Get rid of the group, because nobody else is using it. task_mutex.lock(); - task_allocator.free(p_task); + group_allocator.free(p_task->group); task_mutex.unlock(); } + + // For groups, tasks get rid of themselves. + + task_mutex.lock(); + task_allocator.free(p_task); } else { if (p_task->native_func) { p_task->native_func(p_task->native_func_userdata); @@ -150,88 +127,162 @@ void WorkerThreadPool::_process_task(Task *p_task) { task_mutex.lock(); p_task->completed = true; - for (uint8_t i = 0; i < p_task->waiting; i++) { - p_task->done_semaphore.post(); + p_task->pool_thread_index = -1; + if (p_task->waiting_user) { + p_task->done_semaphore.post(p_task->waiting_user); } - if (!use_native_low_priority_threads) { - p_task->pool_thread_index = -1; + // Let awaiters know. + for (uint32_t i = 0; i < threads.size(); i++) { + if (threads[i].awaited_task == p_task) { + threads[i].cond_var.notify_one(); + threads[i].signaled = true; + } } - task_mutex.unlock(); // Keep mutex down to here since on unlock the task may be freed. } - // Task may have been freed by now (all callers notified). - p_task = nullptr; - - if (!use_native_low_priority_threads) { - bool post = false; - task_mutex.lock(); - ThreadData &curr_thread = threads[pool_thread_index]; - curr_thread.current_low_prio_task = prev_low_prio_task; - if (low_priority) { + { + curr_thread.current_task = prev_task; + if (p_task->low_priority) { low_priority_threads_used--; - low_priority_tasks_running--; - // A low prioriry task was freed, so see if we can move a pending one to the high priority queue. - if (_try_promote_low_priority_task()) { - post = true; - } - if (low_priority_tasks_awaiting_others == low_priority_tasks_running) { - _prevent_low_prio_saturation_deadlock(); + if (_try_promote_low_priority_task()) { + if (prev_task) { // Otherwise, this thread will catch it. + _notify_threads(&curr_thread, 1, 0); + } } } + task_mutex.unlock(); - if (post) { - task_available_semaphore.post(); - } } + + set_current_thread_safe_for_nodes(safe_for_nodes_backup); } void WorkerThreadPool::_thread_function(void *p_user) { + ThreadData *thread_data = (ThreadData *)p_user; while (true) { - singleton->task_available_semaphore.wait(); - if (singleton->exit_threads) { - break; + Task *task_to_process = nullptr; + { + MutexLock lock(singleton->task_mutex); + if (singleton->exit_threads) { + return; + } + thread_data->signaled = false; + + if (singleton->task_queue.first()) { + task_to_process = singleton->task_queue.first()->self(); + singleton->task_queue.remove(singleton->task_queue.first()); + } else { + thread_data->cond_var.wait(lock); + DEV_ASSERT(singleton->exit_threads || thread_data->signaled); + } + } + + if (task_to_process) { + singleton->_process_task(task_to_process); } - singleton->_process_task_queue(); } } -void WorkerThreadPool::_native_low_priority_thread_function(void *p_user) { - Task *task = (Task *)p_user; - singleton->_process_task(task); -} - -void WorkerThreadPool::_post_task(Task *p_task, bool p_high_priority) { +void WorkerThreadPool::_post_tasks_and_unlock(Task **p_tasks, uint32_t p_count, bool p_high_priority) { // Fall back to processing on the calling thread if there are no worker threads. // Separated into its own variable to make it easier to extend this logic // in custom builds. bool process_on_calling_thread = threads.size() == 0; if (process_on_calling_thread) { - _process_task(p_task); + task_mutex.unlock(); + for (uint32_t i = 0; i < p_count; i++) { + _process_task(p_tasks[i]); + } return; } - task_mutex.lock(); - p_task->low_priority = !p_high_priority; - if (!p_high_priority && use_native_low_priority_threads) { - p_task->low_priority_thread = native_thread_allocator.alloc(); - task_mutex.unlock(); + uint32_t to_process = 0; + uint32_t to_promote = 0; - if (p_task->group) { - p_task->group->low_priority_native_tasks.push_back(p_task); + ThreadData *caller_pool_thread = thread_ids.has(Thread::get_caller_id()) ? &threads[thread_ids[Thread::get_caller_id()]] : nullptr; + + for (uint32_t i = 0; i < p_count; i++) { + p_tasks[i]->low_priority = !p_high_priority; + if (p_high_priority || low_priority_threads_used < max_low_priority_threads) { + task_queue.add_last(&p_tasks[i]->task_elem); + if (!p_high_priority) { + low_priority_threads_used++; + } + to_process++; + } else { + // Too many threads using low priority, must go to queue. + low_priority_task_queue.add_last(&p_tasks[i]->task_elem); + to_promote++; } - p_task->low_priority_thread->start(_native_low_priority_thread_function, p_task); // Pask task directly to thread. - } else if (p_high_priority || low_priority_threads_used < max_low_priority_threads) { - task_queue.add_last(&p_task->task_elem); - if (!p_high_priority) { - low_priority_threads_used++; + } + + _notify_threads(caller_pool_thread, to_process, to_promote); + + task_mutex.unlock(); +} + +void WorkerThreadPool::_notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count) { + uint32_t to_process = p_process_count; + uint32_t to_promote = p_promote_count; + + // This is where which threads are awaken is decided according to the workload. + // Threads that will anyway have a chance to check the situation and process/promote tasks + // are excluded from being notified. Others will be tried anyway to try to distribute load. + // The current thread, if is a pool thread, is also excluded depending on the promoting/processing + // needs because it will anyway loop again. However, it will contribute to decreasing the count, + // which helps reducing sync traffic. + + uint32_t thread_count = threads.size(); + + // First round: + // 1. For processing: notify threads that are not running tasks, to keep the stacks as shallow as possible. + // 2. For promoting: since it's exclusive with processing, we fin threads able to promote low-prio tasks now. + for (uint32_t i = 0; + i < thread_count && (to_process || to_promote); + i++, notify_index = (notify_index + 1) % thread_count) { + ThreadData &th = threads[notify_index]; + + if (th.signaled) { + continue; + } + if (th.current_task) { + // Good thread for promoting low-prio? + if (to_promote && th.awaited_task && th.current_task->low_priority) { + if (likely(&th != p_current_thread_data)) { + th.cond_var.notify_one(); + } + th.signaled = true; + to_promote--; + } + } else { + if (to_process) { + if (likely(&th != p_current_thread_data)) { + th.cond_var.notify_one(); + } + th.signaled = true; + to_process--; + } + } + } + + // Second round: + // For processing: if the first round wasn't enough, let's try now with threads processing tasks but currently awaiting. + for (uint32_t i = 0; + i < thread_count && to_process; + i++, notify_index = (notify_index + 1) % thread_count) { + ThreadData &th = threads[notify_index]; + + if (th.signaled) { + continue; + } + if (th.awaited_task) { + if (likely(&th != p_current_thread_data)) { + th.cond_var.notify_one(); + } + th.signaled = true; + to_process--; } - task_mutex.unlock(); - task_available_semaphore.post(); - } else { - // Too many threads using low priority, must go to queue. - low_priority_task_queue.add_last(&p_task->task_elem); - task_mutex.unlock(); } } @@ -247,23 +298,6 @@ bool WorkerThreadPool::_try_promote_low_priority_task() { } } -void WorkerThreadPool::_prevent_low_prio_saturation_deadlock() { - if (low_priority_tasks_awaiting_others == low_priority_tasks_running) { -#ifdef DEV_ENABLED - print_verbose("WorkerThreadPool: Low-prio slots saturated with tasks all waiting for other low-prio tasks. Attempting to avoid deadlock by scheduling one extra task."); -#endif - // In order not to create dependency cycles, we can only schedule the next one. - // We'll keep doing the same until the deadlock is broken, - SelfList *to_promote = low_priority_task_queue.first(); - if (to_promote) { - low_priority_task_queue.remove(to_promote); - task_queue.add_last(to_promote); - low_priority_threads_used++; - task_available_semaphore.post(); - } - } -} - WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void *), void *p_userdata, bool p_high_priority, const String &p_description) { return _add_task(Callable(), p_func, p_userdata, nullptr, p_high_priority, p_description); } @@ -273,15 +307,15 @@ WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable, // Get a free task Task *task = task_allocator.alloc(); TaskID id = last_task++; + task->self = id; task->callable = p_callable; task->native_func = p_func; task->native_func_userdata = p_userdata; task->description = p_description; task->template_userdata = p_template_userdata; tasks.insert(id, task); - task_mutex.unlock(); - _post_task(task, p_high_priority); + _post_tasks_and_unlock(&task, 1, p_high_priority); return id; } @@ -313,105 +347,109 @@ Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) { } Task *task = *taskp; - if (!task->completed) { - if (!use_native_low_priority_threads && task->pool_thread_index != -1) { // Otherwise, it's not running yet. - int caller_pool_th_index = thread_ids.has(Thread::get_caller_id()) ? thread_ids[Thread::get_caller_id()] : -1; - if (caller_pool_th_index == task->pool_thread_index) { - // Deadlock prevention. - // Waiting for a task run on this same thread? That means the task to be awaited started waiting as well - // and another task was run to make use of the thread in the meantime, with enough bad luck as to - // the need to wait for the original task arose in turn. - // In other words, the task we want to wait for is buried in the stack. - // Let's report the caller about the issue to it handles as it sees fit. - task_mutex.unlock(); - return ERR_BUSY; - } + if (task->completed) { + if (task->waiting_pool == 0 && task->waiting_user == 0) { + tasks.erase(p_task_id); + task_allocator.free(task); } - - task->waiting++; - - bool is_low_prio_waiting_for_another = false; - if (!use_native_low_priority_threads) { - // Deadlock prevention: - // If all low-prio tasks are waiting for other low-prio tasks and there are no more free low-prio slots, - // we have a no progressable situation. We can apply a workaround, consisting in promoting an awaited queued - // low-prio task to the schedule queue so it can run and break the "impasse". - // NOTE: A similar reasoning could be made about high priority tasks, but there are usually much more - // than low-prio. Therefore, a deadlock there would only happen when dealing with a very complex task graph - // or when there are too few worker threads (limited platforms or exotic settings). If that turns out to be - // an issue in the real world, a further fix can be applied against that. - if (task->low_priority) { - bool awaiter_is_a_low_prio_task = thread_ids.has(Thread::get_caller_id()) && threads[thread_ids[Thread::get_caller_id()]].current_low_prio_task; - if (awaiter_is_a_low_prio_task) { - is_low_prio_waiting_for_another = true; - low_priority_tasks_awaiting_others++; - if (low_priority_tasks_awaiting_others == low_priority_tasks_running) { - _prevent_low_prio_saturation_deadlock(); - } - } - } - } - task_mutex.unlock(); - - if (use_native_low_priority_threads && task->low_priority) { - task->done_semaphore.wait(); - } else { - bool current_is_pool_thread = thread_ids.has(Thread::get_caller_id()); - if (current_is_pool_thread) { - // We are an actual process thread, we must not be blocked so continue processing stuff if available. - bool must_exit = false; - while (true) { - if (task->done_semaphore.try_wait()) { - // If done, exit - break; - } - if (!must_exit) { - if (task_available_semaphore.try_wait()) { - if (exit_threads) { - must_exit = true; - } else { - // Solve tasks while they are around. - bool safe_for_nodes_backup = is_current_thread_safe_for_nodes(); - _process_task_queue(); - set_current_thread_safe_for_nodes(safe_for_nodes_backup); - continue; - } - } else if (!use_native_low_priority_threads && task->low_priority) { - // A low prioriry task started waiting, so see if we can move a pending one to the high priority queue. - task_mutex.lock(); - bool post = _try_promote_low_priority_task(); - task_mutex.unlock(); - if (post) { - task_available_semaphore.post(); - } - } - } - OS::get_singleton()->delay_usec(1); // Microsleep, this could be converted to waiting for multiple objects in supported platforms for a bit more performance. - } - } else { - task->done_semaphore.wait(); - } - } - - task_mutex.lock(); - if (is_low_prio_waiting_for_another) { - low_priority_tasks_awaiting_others--; - } - - task->waiting--; + return OK; } - if (task->waiting == 0) { - if (use_native_low_priority_threads && task->low_priority) { - task->low_priority_thread->wait_to_finish(); - native_thread_allocator.free(task->low_priority_thread); - } - tasks.erase(p_task_id); - task_allocator.free(task); + ThreadData *caller_pool_thread = thread_ids.has(Thread::get_caller_id()) ? &threads[thread_ids[Thread::get_caller_id()]] : nullptr; + if (caller_pool_thread && p_task_id <= caller_pool_thread->current_task->self) { + // Deadlock prevention: + // When a pool thread wants to wait for an older task, the following situations can happen: + // 1. Awaited task is deep in the stack of the awaiter. + // 2. A group of awaiter threads end up depending on some tasks buried in the stack + // of their worker threads in such a way that progress can't be made. + // Both would entail a deadlock. Some may be handled here in the WorkerThreadPool + // with some extra logic and bookkeeping. However, there would still be unavoidable + // cases of deadlock because of the way waiting threads process outstanding tasks. + // Taking into account there's no feasible solution for every possible case + // with the current design, we just simply reject attempts to await on older tasks, + // with a specific error code that signals the situation so the caller can handle it. + task_mutex.unlock(); + return ERR_BUSY; + } + + if (caller_pool_thread) { + task->waiting_pool++; + } else { + task->waiting_user++; } task_mutex.unlock(); + + if (caller_pool_thread) { + while (true) { + Task *task_to_process = nullptr; + { + MutexLock lock(task_mutex); + bool was_signaled = caller_pool_thread->signaled; + caller_pool_thread->signaled = false; + + if (task->completed) { + // This thread was awaken also for some reason, but it's about to exit. + // Let's find out what may be pending and forward the requests. + if (!exit_threads && was_signaled) { + uint32_t to_process = task_queue.first() ? 1 : 0; + uint32_t to_promote = caller_pool_thread->current_task->low_priority && low_priority_task_queue.first() ? 1 : 0; + if (to_process || to_promote) { + // This thread must be left alone since it won't loop again. + caller_pool_thread->signaled = true; + _notify_threads(caller_pool_thread, to_process, to_promote); + } + } + + task->waiting_pool--; + if (task->waiting_pool == 0 && task->waiting_user == 0) { + tasks.erase(p_task_id); + task_allocator.free(task); + } + + break; + } + + if (!exit_threads) { + // This is a thread from the pool. It shouldn't just idle. + // Let's try to process other tasks while we wait. + + if (caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) { + if (_try_promote_low_priority_task()) { + _notify_threads(caller_pool_thread, 1, 0); + } + } + + if (singleton->task_queue.first()) { + task_to_process = task_queue.first()->self(); + task_queue.remove(task_queue.first()); + } + + if (!task_to_process) { + caller_pool_thread->awaited_task = task; + caller_pool_thread->cond_var.wait(lock); + DEV_ASSERT(exit_threads || caller_pool_thread->signaled || task->completed); + caller_pool_thread->awaited_task = nullptr; + } + } + } + + if (task_to_process) { + _process_task(task_to_process); + } + } + } else { + task->done_semaphore.wait(); + task_mutex.lock(); + task->waiting_user--; + if (task->waiting_pool == 0 && task->waiting_user == 0) { + tasks.erase(p_task_id); + task_allocator.free(task); + } + task_mutex.unlock(); + } + return OK; } @@ -455,11 +493,8 @@ WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_ca } groups[id] = group; - task_mutex.unlock(); - for (int i = 0; i < p_tasks; i++) { - _post_task(tasks_posted[i], p_high_priority); - } + _post_tasks_and_unlock(tasks_posted, p_tasks, p_high_priority); return id; } @@ -502,21 +537,9 @@ void WorkerThreadPool::wait_for_group_task_completion(GroupID p_group) { if (!groupp) { ERR_FAIL_MSG("Invalid Group ID"); } - Group *group = *groupp; - if (group->low_priority_native_tasks.size() > 0) { - for (Task *task : group->low_priority_native_tasks) { - task->low_priority_thread->wait_to_finish(); - task_mutex.lock(); - native_thread_allocator.free(task->low_priority_thread); - task_allocator.free(task); - task_mutex.unlock(); - } - - task_mutex.lock(); - group_allocator.free(group); - task_mutex.unlock(); - } else { + { + Group *group = *groupp; group->done_semaphore.wait(); uint32_t max_users = group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment. @@ -540,19 +563,13 @@ int WorkerThreadPool::get_thread_index() { return singleton->thread_ids.has(tid) ? singleton->thread_ids[tid] : -1; } -void WorkerThreadPool::init(int p_thread_count, bool p_use_native_threads_low_priority, float p_low_priority_task_ratio) { +void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio) { ERR_FAIL_COND(threads.size() > 0); if (p_thread_count < 0) { p_thread_count = OS::get_singleton()->get_default_thread_pool_size(); } - if (p_use_native_threads_low_priority) { - max_low_priority_threads = 0; - } else { - max_low_priority_threads = CLAMP(p_thread_count * p_low_priority_task_ratio, 1, p_thread_count - 1); - } - - use_native_low_priority_threads = p_use_native_threads_low_priority; + max_low_priority_threads = CLAMP(p_thread_count * p_low_priority_task_ratio, 1, p_thread_count - 1); threads.resize(p_thread_count); @@ -576,12 +593,13 @@ void WorkerThreadPool::finish() { } task_mutex.unlock(); - exit_threads = true; - - for (uint32_t i = 0; i < threads.size(); i++) { - task_available_semaphore.post(); + { + MutexLock lock(task_mutex); + exit_threads = true; + } + for (ThreadData &data : threads) { + data.cond_var.notify_one(); } - for (ThreadData &data : threads) { data.thread.wait_to_finish(); } diff --git a/core/object/worker_thread_pool.h b/core/object/worker_thread_pool.h index dd56f95caea..1c4758fb48f 100644 --- a/core/object/worker_thread_pool.h +++ b/core/object/worker_thread_pool.h @@ -31,6 +31,7 @@ #ifndef WORKER_THREAD_POOL_H #define WORKER_THREAD_POOL_H +#include "core/os/condition_variable.h" #include "core/os/memory.h" #include "core/os/os.h" #include "core/os/semaphore.h" @@ -60,7 +61,7 @@ private: }; struct Group { - GroupID self; + GroupID self = -1; SafeNumeric index; SafeNumeric completed_index; uint32_t max = 0; @@ -68,23 +69,23 @@ private: SafeFlag completed; SafeNumeric finished; uint32_t tasks_used = 0; - TightLocalVector low_priority_native_tasks; }; struct Task { + TaskID self = -1; Callable callable; void (*native_func)(void *) = nullptr; void (*native_group_func)(void *, uint32_t) = nullptr; void *native_func_userdata = nullptr; String description; - Semaphore done_semaphore; + Semaphore done_semaphore; // For user threads awaiting. bool completed = false; Group *group = nullptr; SelfList task_elem; - uint32_t waiting = 0; + uint32_t waiting_pool = 0; + uint32_t waiting_user = 0; bool low_priority = false; BaseTemplateUserdata *template_userdata = nullptr; - Thread *low_priority_thread = nullptr; int pool_thread_index = -1; void free_template_userdata(); @@ -94,19 +95,20 @@ private: PagedAllocator task_allocator; PagedAllocator group_allocator; - PagedAllocator native_thread_allocator; SelfList::List low_priority_task_queue; SelfList::List task_queue; - Mutex task_mutex; - Semaphore task_available_semaphore; + BinaryMutex task_mutex; struct ThreadData { - uint32_t index; + uint32_t index = 0; Thread thread; - Task *current_low_prio_task = nullptr; bool ready_for_scripting = false; + bool signaled = false; + Task *current_task = nullptr; + Task *awaited_task = nullptr; // Null if not awaiting the condition variable. Special value for idle-waiting. + ConditionVariable cond_var; }; TightLocalVector threads; @@ -116,24 +118,20 @@ private: HashMap tasks; HashMap groups; - bool use_native_low_priority_threads = false; uint32_t max_low_priority_threads = 0; uint32_t low_priority_threads_used = 0; - uint32_t low_priority_tasks_running = 0; - uint32_t low_priority_tasks_awaiting_others = 0; + uint32_t notify_index = 0; // For rotating across threads, no help distributing load. uint64_t last_task = 1; static void _thread_function(void *p_user); - static void _native_low_priority_thread_function(void *p_user); - void _process_task_queue(); void _process_task(Task *task); - void _post_task(Task *p_task, bool p_high_priority); + void _post_tasks_and_unlock(Task **p_tasks, uint32_t p_count, bool p_high_priority); + void _notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count); bool _try_promote_low_priority_task(); - void _prevent_low_prio_saturation_deadlock(); static WorkerThreadPool *singleton; @@ -199,7 +197,7 @@ public: static WorkerThreadPool *get_singleton() { return singleton; } static int get_thread_index(); - void init(int p_thread_count = -1, bool p_use_native_threads_low_priority = true, float p_low_priority_task_ratio = 0.3); + void init(int p_thread_count = -1, float p_low_priority_task_ratio = 0.3); void finish(); WorkerThreadPool(); ~WorkerThreadPool(); diff --git a/core/os/condition_variable.h b/core/os/condition_variable.h index 6a6996019d3..6a49ced31b4 100644 --- a/core/os/condition_variable.h +++ b/core/os/condition_variable.h @@ -31,6 +31,8 @@ #ifndef CONDITION_VARIABLE_H #define CONDITION_VARIABLE_H +#include "core/os/mutex.h" + #ifdef MINGW_ENABLED #define MINGW_STDTHREAD_REDUNDANCY_WARNING #include "thirdparty/mingw-std-threads/mingw.condition_variable.h" diff --git a/core/os/semaphore.h b/core/os/semaphore.h index 8bb1529bbd4..b8ae35b86b9 100644 --- a/core/os/semaphore.h +++ b/core/os/semaphore.h @@ -58,10 +58,12 @@ private: #endif public: - _ALWAYS_INLINE_ void post() const { + _ALWAYS_INLINE_ void post(uint32_t p_count = 1) const { std::lock_guard lock(mutex); - count++; - condition.notify_one(); + count += p_count; + for (uint32_t i = 0; i < p_count; ++i) { + condition.notify_one(); + } } _ALWAYS_INLINE_ void wait() const { diff --git a/core/register_core_types.cpp b/core/register_core_types.cpp index 2785d1daa57..82b3d279426 100644 --- a/core/register_core_types.cpp +++ b/core/register_core_types.cpp @@ -307,7 +307,6 @@ void register_core_settings() { GLOBAL_DEF(PropertyInfo(Variant::STRING, "network/tls/certificate_bundle_override", PROPERTY_HINT_FILE, "*.crt"), ""); GLOBAL_DEF("threading/worker_pool/max_threads", -1); - GLOBAL_DEF("threading/worker_pool/use_system_threads_for_low_priority_tasks", true); GLOBAL_DEF("threading/worker_pool/low_priority_thread_ratio", 0.3); } diff --git a/doc/classes/ProjectSettings.xml b/doc/classes/ProjectSettings.xml index ab92916320d..3a9495c3795 100644 --- a/doc/classes/ProjectSettings.xml +++ b/doc/classes/ProjectSettings.xml @@ -2768,8 +2768,6 @@ Maximum number of threads to be used by [WorkerThreadPool]. Value of [code]-1[/code] means no limit. - - Action map configuration to load by default. diff --git a/doc/classes/WorkerThreadPool.xml b/doc/classes/WorkerThreadPool.xml index d6751d31ce7..fa1f08b149b 100644 --- a/doc/classes/WorkerThreadPool.xml +++ b/doc/classes/WorkerThreadPool.xml @@ -106,7 +106,7 @@ Pauses the thread that calls this method until the task with the given ID is completed. Returns [constant @GlobalScope.OK] if the task could be successfully awaited. Returns [constant @GlobalScope.ERR_INVALID_PARAMETER] if a task with the passed ID does not exist (maybe because it was already awaited and disposed of). - Returns [constant @GlobalScope.ERR_BUSY] if the call is made from another running task and, due to task scheduling, the task to await is at a lower level in the call stack and therefore can't progress. This is an advanced situation that should only matter when some tasks depend on others. + Returns [constant @GlobalScope.ERR_BUSY] if the call is made from another running task and, due to task scheduling, there's potential for deadlocking (e.g., the task to await may be at a lower level in the call stack and therefore can't progress). This is an advanced situation that should only matter when some tasks depend on others (in the current implementation, the tricky case is a task trying to wait on an older one). diff --git a/main/main.cpp b/main/main.cpp index 4dcd92bcffc..b50def9cec4 100644 --- a/main/main.cpp +++ b/main/main.cpp @@ -1615,16 +1615,12 @@ Error Main::setup(const char *execpath, int argc, char *argv[], bool p_second_ph } // Initialize WorkerThreadPool. - { + if (editor || project_manager) { + WorkerThreadPool::get_singleton()->init(-1, 0.75); + } else { int worker_threads = GLOBAL_GET("threading/worker_pool/max_threads"); - bool low_priority_use_system_threads = GLOBAL_GET("threading/worker_pool/use_system_threads_for_low_priority_tasks"); - float low_property_ratio = GLOBAL_GET("threading/worker_pool/low_priority_thread_ratio"); - - if (editor || project_manager) { - WorkerThreadPool::get_singleton()->init(); - } else { - WorkerThreadPool::get_singleton()->init(worker_threads, low_priority_use_system_threads, low_property_ratio); - } + float low_priority_ratio = GLOBAL_GET("threading/worker_pool/low_priority_thread_ratio"); + WorkerThreadPool::get_singleton()->init(worker_threads, low_priority_ratio); } #ifdef TOOLS_ENABLED From ae418f9469937b9a438a509bf359da9039cbee37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Fri, 29 Dec 2023 01:27:17 +0100 Subject: [PATCH 2/3] WorkerThreadPool: Avoid deadlocks when CommandQueueMT is involved This commit lets CommandQueueMT play nicely with the WorkerThreadPool to avoid non-progressable situations caused by an interdependence between both. While a command queue is being flushed, it allows the WTP to release its lock while tasks are being awaited so they can make progress in case they need in turn to post to the command queue. --- core/object/worker_thread_pool.cpp | 28 +++++++++++++++++++ core/object/worker_thread_pool.h | 7 +++++ core/templates/command_queue_mt.h | 44 +++++++++++++++++++----------- 3 files changed, 63 insertions(+), 16 deletions(-) diff --git a/core/object/worker_thread_pool.cpp b/core/object/worker_thread_pool.cpp index b93c3270048..881c825cf1a 100644 --- a/core/object/worker_thread_pool.cpp +++ b/core/object/worker_thread_pool.cpp @@ -33,6 +33,7 @@ #include "core/object/script_language.h" #include "core/os/os.h" #include "core/os/thread_safe.h" +#include "core/templates/command_queue_mt.h" void WorkerThreadPool::Task::free_template_userdata() { ERR_FAIL_NULL(template_userdata); @@ -43,6 +44,8 @@ void WorkerThreadPool::Task::free_template_userdata() { WorkerThreadPool *WorkerThreadPool::singleton = nullptr; +thread_local CommandQueueMT *WorkerThreadPool::flushing_cmd_queue = nullptr; + void WorkerThreadPool::_process_task(Task *p_task) { int pool_thread_index = thread_ids[Thread::get_caller_id()]; ThreadData &curr_thread = threads[pool_thread_index]; @@ -428,7 +431,15 @@ Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) { if (!task_to_process) { caller_pool_thread->awaited_task = task; + + if (flushing_cmd_queue) { + flushing_cmd_queue->unlock(); + } caller_pool_thread->cond_var.wait(lock); + if (flushing_cmd_queue) { + flushing_cmd_queue->lock(); + } + DEV_ASSERT(exit_threads || caller_pool_thread->signaled || task->completed); caller_pool_thread->awaited_task = nullptr; } @@ -540,7 +551,14 @@ void WorkerThreadPool::wait_for_group_task_completion(GroupID p_group) { { Group *group = *groupp; + + if (flushing_cmd_queue) { + flushing_cmd_queue->unlock(); + } group->done_semaphore.wait(); + if (flushing_cmd_queue) { + flushing_cmd_queue->lock(); + } uint32_t max_users = group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment. uint32_t finished_users = group->finished.increment(); // fetch happens before inc, so increment later. @@ -563,6 +581,16 @@ int WorkerThreadPool::get_thread_index() { return singleton->thread_ids.has(tid) ? singleton->thread_ids[tid] : -1; } +void WorkerThreadPool::thread_enter_command_queue_mt_flush(CommandQueueMT *p_queue) { + ERR_FAIL_COND(flushing_cmd_queue != nullptr); + flushing_cmd_queue = p_queue; +} + +void WorkerThreadPool::thread_exit_command_queue_mt_flush() { + ERR_FAIL_NULL(flushing_cmd_queue); + flushing_cmd_queue = nullptr; +} + void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio) { ERR_FAIL_COND(threads.size() > 0); if (p_thread_count < 0) { diff --git a/core/object/worker_thread_pool.h b/core/object/worker_thread_pool.h index 1c4758fb48f..3ec4fd732fa 100644 --- a/core/object/worker_thread_pool.h +++ b/core/object/worker_thread_pool.h @@ -41,6 +41,8 @@ #include "core/templates/rid.h" #include "core/templates/safe_refcount.h" +class CommandQueueMT; + class WorkerThreadPool : public Object { GDCLASS(WorkerThreadPool, Object) public: @@ -135,6 +137,8 @@ private: static WorkerThreadPool *singleton; + static thread_local CommandQueueMT *flushing_cmd_queue; + TaskID _add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description); GroupID _add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description); @@ -197,6 +201,9 @@ public: static WorkerThreadPool *get_singleton() { return singleton; } static int get_thread_index(); + static void thread_enter_command_queue_mt_flush(CommandQueueMT *p_queue); + static void thread_exit_command_queue_mt_flush(); + void init(int p_thread_count = -1, float p_low_priority_task_ratio = 0.3); void finish(); WorkerThreadPool(); diff --git a/core/templates/command_queue_mt.h b/core/templates/command_queue_mt.h index 7e480653ace..b1010f7f43c 100644 --- a/core/templates/command_queue_mt.h +++ b/core/templates/command_queue_mt.h @@ -31,6 +31,7 @@ #ifndef COMMAND_QUEUE_MT_H #define COMMAND_QUEUE_MT_H +#include "core/object/worker_thread_pool.h" #include "core/os/memory.h" #include "core/os/mutex.h" #include "core/os/semaphore.h" @@ -306,15 +307,15 @@ class CommandQueueMT { struct CommandBase { virtual void call() = 0; - virtual void post() {} - virtual ~CommandBase() {} + virtual SyncSemaphore *get_sync_semaphore() { return nullptr; } + virtual ~CommandBase() = default; // Won't be called. }; struct SyncCommand : public CommandBase { SyncSemaphore *sync_sem = nullptr; - virtual void post() override { - sync_sem->sem.post(); + virtual SyncSemaphore *get_sync_semaphore() override { + return sync_sem; } }; @@ -340,6 +341,7 @@ class CommandQueueMT { SyncSemaphore sync_sems[SYNC_SEMAPHORES]; Mutex mutex; Semaphore *sync = nullptr; + uint64_t flush_read_ptr = 0; template T *allocate() { @@ -362,31 +364,41 @@ class CommandQueueMT { void _flush() { lock(); - uint64_t read_ptr = 0; - uint64_t limit = command_mem.size(); + WorkerThreadPool::thread_enter_command_queue_mt_flush(this); + while (flush_read_ptr < command_mem.size()) { + uint64_t size = *(uint64_t *)&command_mem[flush_read_ptr]; + flush_read_ptr += 8; + CommandBase *cmd = reinterpret_cast(&command_mem[flush_read_ptr]); - while (read_ptr < limit) { - uint64_t size = *(uint64_t *)&command_mem[read_ptr]; - read_ptr += 8; - CommandBase *cmd = reinterpret_cast(&command_mem[read_ptr]); + SyncSemaphore *sync_sem = cmd->get_sync_semaphore(); + cmd->call(); + if (sync_sem) { + sync_sem->sem.post(); // Release in case it needs sync/ret. + } - cmd->call(); //execute the function - cmd->post(); //release in case it needs sync/ret - cmd->~CommandBase(); //should be done, so erase the command + if (unlikely(flush_read_ptr == 0)) { + // A reentrant call flushed. + DEV_ASSERT(command_mem.is_empty()); + unlock(); + return; + } - read_ptr += size; + flush_read_ptr += size; } + WorkerThreadPool::thread_exit_command_queue_mt_flush(); command_mem.clear(); + flush_read_ptr = 0; unlock(); } - void lock(); - void unlock(); void wait_for_flush(); SyncSemaphore *_alloc_sync_sem(); public: + void lock(); + void unlock(); + /* NORMAL PUSH COMMANDS */ DECL_PUSH(0) SPACE_SEP_LIST(DECL_PUSH, 15) From a7317748135e064ff150072f2f46ccc1d2c4b358 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Thu, 28 Dec 2023 19:31:28 +0100 Subject: [PATCH 3/3] WorkerThreadPool: Avoid most runtime allocations Just a little optimization. **NOTE:** With `RID_Owner` we could replace each pair of `PagedAllocator` and `HashMap`-of-ids-to-pointers. However, that would force us to expose `RID` as the task/group id, instead of `int`, which would break the API. Too bad. Let's wait until Godot 5.0. --- core/object/worker_thread_pool.cpp | 20 ++++++++++++++------ core/object/worker_thread_pool.h | 23 +++++++++++++++++++---- core/templates/paged_allocator.h | 6 +----- 3 files changed, 34 insertions(+), 15 deletions(-) diff --git a/core/object/worker_thread_pool.cpp b/core/object/worker_thread_pool.cpp index 881c825cf1a..8e8a2ef06b9 100644 --- a/core/object/worker_thread_pool.cpp +++ b/core/object/worker_thread_pool.cpp @@ -613,13 +613,14 @@ void WorkerThreadPool::finish() { return; } - task_mutex.lock(); - SelfList *E = low_priority_task_queue.first(); - while (E) { - print_error("Task waiting was never re-claimed: " + E->self()->description); - E = E->next(); + { + MutexLock lock(task_mutex); + SelfList *E = low_priority_task_queue.first(); + while (E) { + print_error("Task waiting was never re-claimed: " + E->self()->description); + E = E->next(); + } } - task_mutex.unlock(); { MutexLock lock(task_mutex); @@ -632,6 +633,13 @@ void WorkerThreadPool::finish() { data.thread.wait_to_finish(); } + { + MutexLock lock(task_mutex); + for (KeyValue &E : tasks) { + task_allocator.free(E.value); + } + } + threads.clear(); } diff --git a/core/object/worker_thread_pool.h b/core/object/worker_thread_pool.h index 3ec4fd732fa..c9921c808da 100644 --- a/core/object/worker_thread_pool.h +++ b/core/object/worker_thread_pool.h @@ -95,8 +95,11 @@ private: task_elem(this) {} }; - PagedAllocator task_allocator; - PagedAllocator group_allocator; + static const uint32_t TASKS_PAGE_SIZE = 1024; + static const uint32_t GROUPS_PAGE_SIZE = 256; + + PagedAllocator task_allocator; + PagedAllocator group_allocator; SelfList::List low_priority_task_queue; SelfList::List task_queue; @@ -117,8 +120,20 @@ private: bool exit_threads = false; HashMap thread_ids; - HashMap tasks; - HashMap groups; + HashMap< + TaskID, + Task *, + HashMapHasherDefault, + HashMapComparatorDefault, + PagedAllocator, false, TASKS_PAGE_SIZE>> + tasks; + HashMap< + GroupID, + Group *, + HashMapHasherDefault, + HashMapComparatorDefault, + PagedAllocator, false, GROUPS_PAGE_SIZE>> + groups; uint32_t max_low_priority_threads = 0; uint32_t low_priority_threads_used = 0; diff --git a/core/templates/paged_allocator.h b/core/templates/paged_allocator.h index 6f3f78d4d2a..d880eae0c3e 100644 --- a/core/templates/paged_allocator.h +++ b/core/templates/paged_allocator.h @@ -40,7 +40,7 @@ #include #include -template +template class PagedAllocator { T **page_pool = nullptr; T ***available_pool = nullptr; @@ -53,10 +53,6 @@ class PagedAllocator { SpinLock spin_lock; public: - enum { - DEFAULT_PAGE_SIZE = 4096 - }; - template T *alloc(Args &&...p_args) { if (thread_safe) {