diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index 932493e340b..a27341dd2cc 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -206,13 +206,15 @@ void ResourceFormatLoader::_bind_methods() { void ResourceLoader::LoadToken::clear() { thread_load_mutex.lock(); - Thread *thread_to_destroy = nullptr; + WorkerThreadPool::TaskID task_to_await = 0; if (!local_path.is_empty()) { // Empty is used for the special case where the load task is not registered. DEV_ASSERT(thread_load_tasks.has(local_path)); ThreadLoadTask &load_task = thread_load_tasks[local_path]; - thread_to_destroy = load_task.thread; - load_task.thread = nullptr; + if (!load_task.awaited) { + task_to_await = load_task.task_id; + load_task.awaited = true; + } thread_load_tasks.erase(local_path); local_path.clear(); } @@ -225,12 +227,9 @@ void ResourceLoader::LoadToken::clear() { thread_load_mutex.unlock(); - // If thread is unused, destroy it here, locally, now the token data is consistent. - if (thread_to_destroy) { - if (thread_to_destroy->is_started()) { - thread_to_destroy->wait_to_finish(); - } - memdelete(thread_to_destroy); + // If task is unused, await it here, locally, now the token data is consistent. + if (task_to_await) { + WorkerThreadPool::get_singleton()->wait_for_task_completion(task_to_await); } } @@ -284,9 +283,19 @@ Ref ResourceLoader::_load(const String &p_path, const String &p_origin void ResourceLoader::_thread_load_function(void *p_userdata) { ThreadLoadTask &load_task = *(ThreadLoadTask *)p_userdata; + + thread_load_mutex.lock(); + caller_task_id = load_task.task_id; + if (cleaning_tasks) { + load_task.status = THREAD_LOAD_FAILED; + thread_load_mutex.unlock(); + return; + } + thread_load_mutex.unlock(); + // Thread-safe either if it's the current thread or a brand new one. CallQueue *mq_override = nullptr; - if (load_task.first_in_stack) { + if (load_nesting == 0) { if (!load_task.dependent_path.is_empty()) { load_paths_stack.push_back(load_task.dependent_path); } @@ -341,18 +350,9 @@ void ResourceLoader::_thread_load_function(void *p_userdata) { } } - if (load_nesting == 0) { - thread_active_count--; - if (thread_waiting_count) { - thread_active_cond_var.notify_one(); - } - } - - print_lt("END: load count: " + itos(thread_active_count + thread_suspended_count) + " / wait count: " + itos(thread_waiting_count) + " / suspended count: " + itos(thread_suspended_count) + " / active: " + itos(thread_active_count)); - thread_load_mutex.unlock(); - if (load_task.first_in_stack && mq_override) { + if (load_nesting == 0 && mq_override) { memdelete(mq_override); MessageQueue::set_thread_singleton_override(nullptr); } @@ -472,46 +472,15 @@ Ref ResourceLoader::_load_start(const String &p_path, load_task_ptr = must_not_register ? &unregistered_load_task : &thread_load_tasks[local_path]; } - print_lt("REQUEST: load count: " + itos(thread_active_count + thread_suspended_count) + " / wait count: " + itos(thread_waiting_count) + " / suspended count: " + itos(thread_suspended_count) + " / active: " + itos(thread_active_count)); - run_on_current_thread = must_not_register || p_thread_mode == LOAD_THREAD_FROM_CURRENT; - if (!run_on_current_thread && thread_active_count >= thread_active_max && load_nesting > 0) { - // No free slots for another thread, but this one is already active, so keep working here. - run_on_current_thread = true; - } - - load_task_ptr->first_in_stack = run_on_current_thread ? load_nesting == 0 : true; - - if (load_task_ptr->first_in_stack) { - if (!run_on_current_thread && load_paths_stack.size()) { - // The paths stack is lost across thread boundaries, so we have to remember what was the topmost path. - load_task_ptr->dependent_path = load_paths_stack[load_paths_stack.size() - 1]; - } - if (thread_active_count >= thread_active_max) { - // Either the current or a new thread needs to wait for a free slot to become active. - thread_waiting_count++; - do { - thread_active_cond_var.wait(thread_load_lock); - } while (thread_active_count >= thread_active_max); - thread_waiting_count--; - } - thread_active_count++; - } - - if (cleaning_tasks) { - load_task_ptr->status = THREAD_LOAD_FAILED; - return load_token; - } - if (run_on_current_thread) { - load_task_ptr->loader_id = Thread::get_caller_id(); + load_task_ptr->thread_id = Thread::get_caller_id(); if (must_not_register) { load_token->res_if_unregistered = load_task_ptr->resource; } } else { - load_task_ptr->thread = memnew(Thread); - load_task_ptr->loader_id = load_task_ptr->thread->start(_thread_load_function, load_task_ptr); + load_task_ptr->task_id = WorkerThreadPool::get_singleton()->add_native_task(&ResourceLoader::_thread_load_function, load_task_ptr); } } @@ -632,29 +601,34 @@ Ref ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro ThreadLoadTask &load_task = thread_load_tasks[p_load_token.local_path]; if (load_task.status == THREAD_LOAD_IN_PROGRESS) { - if (load_task.loader_id == Thread::get_caller_id()) { + DEV_ASSERT((load_task.task_id == 0) != (load_task.thread_id == 0)); + + if ((load_task.task_id != 0 && load_task.task_id == caller_task_id) || + (load_task.thread_id != 0 && load_task.thread_id == Thread::get_caller_id())) { // Load is in progress, but it's precisely this thread the one in charge. // That means this is a cyclic load. if (r_error) { *r_error = ERR_BUSY; } return Ref(); - } else if (!load_task.cond_var) { - // This is the first time some thread needs to wait for this one. - load_task.cond_var = memnew(ConditionVariable); } - // Wait for load to complete. - thread_suspended_count++; - - print_lt("GET: load count: " + itos(thread_active_count + thread_suspended_count) + " / wait count: " + itos(thread_waiting_count) + " / suspended count: " + itos(thread_suspended_count) + " / active: " + itos(thread_active_count)); - - do { - load_task.cond_var->wait(p_thread_load_lock); - DEV_ASSERT(thread_load_tasks.has(p_load_token.local_path) && p_load_token.get_reference_count()); - } while (load_task.cond_var); - - thread_suspended_count--; + if (load_task.task_id != 0 && !load_task.awaited) { + // Loading thread is in the worker pool and still not awaited. + load_task.awaited = true; + thread_load_mutex.unlock(); + WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id); + thread_load_mutex.lock(); + } else { + // Loading thread is main or user thread, or in the worker pool, but already awaited by some other thread. + if (!load_task.cond_var) { + load_task.cond_var = memnew(ConditionVariable); + } + do { + load_task.cond_var->wait(p_thread_load_lock); + DEV_ASSERT(thread_load_tasks.has(p_load_token.local_path) && p_load_token.get_reference_count()); + } while (load_task.cond_var); + } } if (cleaning_tasks) { @@ -1042,7 +1016,6 @@ void ResourceLoader::clear_thread_load_tasks() { if (none_running) { break; } - thread_active_cond_var.notify_all(); thread_load_mutex.unlock(); OS::get_singleton()->delay_usec(1000); thread_load_mutex.lock(); @@ -1158,12 +1131,7 @@ bool ResourceLoader::is_cleaning_tasks() { return cleaning_tasks; } -void ResourceLoader::initialize() { - thread_active_max = OS::get_singleton()->get_processor_count(); - thread_active_count = 0; - thread_waiting_count = 0; - thread_suspended_count = 0; -} +void ResourceLoader::initialize() {} void ResourceLoader::finalize() {} @@ -1178,18 +1146,13 @@ bool ResourceLoader::abort_on_missing_resource = true; bool ResourceLoader::timestamp_on_load = false; thread_local int ResourceLoader::load_nesting = 0; +thread_local WorkerThreadPool::TaskID ResourceLoader::caller_task_id = 0; thread_local Vector ResourceLoader::load_paths_stack; template <> thread_local uint32_t SafeBinaryMutex::count = 0; SafeBinaryMutex ResourceLoader::thread_load_mutex; HashMap ResourceLoader::thread_load_tasks; -ConditionVariable ResourceLoader::thread_active_cond_var; - -int ResourceLoader::thread_active_count = 0; -int ResourceLoader::thread_waiting_count = 0; -int ResourceLoader::thread_suspended_count = 0; -int ResourceLoader::thread_active_max = 0; bool ResourceLoader::cleaning_tasks = false; HashMap ResourceLoader::user_load_tokens; diff --git a/core/io/resource_loader.h b/core/io/resource_loader.h index 2bb46387e92..ffe9d5de9a8 100644 --- a/core/io/resource_loader.h +++ b/core/io/resource_loader.h @@ -34,6 +34,7 @@ #include "core/io/resource.h" #include "core/object/gdvirtual.gen.inc" #include "core/object/script_language.h" +#include "core/object/worker_thread_pool.h" #include "core/os/semaphore.h" #include "core/os/thread.h" @@ -158,10 +159,10 @@ private: static Ref _find_custom_resource_format_loader(String path); struct ThreadLoadTask { - Thread *thread = nullptr; - Thread::ID loader_id = 0; - bool first_in_stack = false; - ConditionVariable *cond_var = nullptr; + WorkerThreadPool::TaskID task_id = 0; // Used if run on a worker thread from the pool. + Thread::ID thread_id = 0; // Used if running on an user thread (e.g., simple non-threaded load). + bool awaited = false; // If it's in the pool, this helps not awaiting from more than one dependent thread. + ConditionVariable *cond_var = nullptr; // In not in the worker pool or already awaiting, this is used as a secondary awaiting mechanism. LoadToken *load_token = nullptr; String local_path; String remapped_path; @@ -180,14 +181,10 @@ private: static void _thread_load_function(void *p_userdata); static thread_local int load_nesting; + static thread_local WorkerThreadPool::TaskID caller_task_id; static thread_local Vector load_paths_stack; static SafeBinaryMutex thread_load_mutex; static HashMap thread_load_tasks; - static ConditionVariable thread_active_cond_var; - static int thread_active_count; - static int thread_waiting_count; - static int thread_suspended_count; - static int thread_active_max; static bool cleaning_tasks; static HashMap user_load_tokens;