ResourceLoader: Enhance deadlock prevention

Benefits:
- Simpler code. The main load function is renamed so it's apparent that it's not just a thread entry point anymore.
- Cache and thread modes of the original task are honored. A beautiful consequence of this is that, unlike formerly, re-issued loads can use the resource cache, which makes this mechanism much more performant.
- The newly added getter for caller task id in WorkerThreadPool allows to remove the custom tracking of that in ResourceLoader.
- The check to replace a cached resource and the replacement itself happen atomically. That fixes deadlock prevention leading to multiple resource instances of the same one on disk. As a side effect, it also makes the regular check for replace load mode more robust.
This commit is contained in:
Pedro J. Estébanez 2024-07-10 12:53:14 +02:00
parent bd0959ebdd
commit 28619e26cf
4 changed files with 45 additions and 20 deletions

View file

@ -315,11 +315,11 @@ Ref<Resource> ResourceLoader::_load(const String &p_path, const String &p_origin
ERR_FAIL_V_MSG(Ref<Resource>(), vformat("No loader found for resource: %s (expected type: %s)", p_path, p_type_hint)); ERR_FAIL_V_MSG(Ref<Resource>(), vformat("No loader found for resource: %s (expected type: %s)", p_path, p_type_hint));
} }
void ResourceLoader::_thread_load_function(void *p_userdata) { // This implementation must allow re-entrancy for a task that started awaiting in a deeper stack frame.
void ResourceLoader::_run_load_task(void *p_userdata) {
ThreadLoadTask &load_task = *(ThreadLoadTask *)p_userdata; ThreadLoadTask &load_task = *(ThreadLoadTask *)p_userdata;
thread_load_mutex.lock(); thread_load_mutex.lock();
caller_task_id = load_task.task_id;
if (cleaning_tasks) { if (cleaning_tasks) {
load_task.status = THREAD_LOAD_FAILED; load_task.status = THREAD_LOAD_FAILED;
thread_load_mutex.unlock(); thread_load_mutex.unlock();
@ -376,15 +376,28 @@ void ResourceLoader::_thread_load_function(void *p_userdata) {
unlock_pending = false; unlock_pending = false;
if (!ignoring) { if (!ignoring) {
if (replacing) { ResourceCache::lock.lock(); // Check and operations must happen atomically.
Ref<Resource> old_res = ResourceCache::get_ref(load_task.local_path); bool pending_unlock = true;
if (old_res.is_valid() && old_res != load_task.resource) { Ref<Resource> old_res = ResourceCache::get_ref(load_task.local_path);
// If resource is already loaded, only replace its data, to avoid existing invalidating instances. if (old_res.is_valid()) {
old_res->copy_from(load_task.resource); if (old_res != load_task.resource) {
// Resource can already exists at this point for two reasons:
// a) The load uses replace mode.
// b) There were more than one load in flight for the same path because of deadlock prevention.
// Either case, we want to keep the resource that was already there.
ResourceCache::lock.unlock();
pending_unlock = false;
if (replacing) {
old_res->copy_from(load_task.resource);
}
load_task.resource = old_res; load_task.resource = old_res;
} }
} else {
load_task.resource->set_path(load_task.local_path);
}
if (pending_unlock) {
ResourceCache::lock.unlock();
} }
load_task.resource->set_path(load_task.local_path, replacing);
} else { } else {
load_task.resource->set_path_cache(load_task.local_path); load_task.resource->set_path_cache(load_task.local_path);
} }
@ -552,14 +565,20 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,
run_on_current_thread = must_not_register || p_thread_mode == LOAD_THREAD_FROM_CURRENT; run_on_current_thread = must_not_register || p_thread_mode == LOAD_THREAD_FROM_CURRENT;
if (run_on_current_thread) { if (run_on_current_thread) {
load_task_ptr->thread_id = Thread::get_caller_id(); // The current thread may happen to be a thread from the pool.
WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->get_caller_task_id();
if (tid != WorkerThreadPool::INVALID_TASK_ID) {
load_task_ptr->task_id = tid;
} else {
load_task_ptr->thread_id = Thread::get_caller_id();
}
} else { } else {
load_task_ptr->task_id = WorkerThreadPool::get_singleton()->add_native_task(&ResourceLoader::_thread_load_function, load_task_ptr); load_task_ptr->task_id = WorkerThreadPool::get_singleton()->add_native_task(&ResourceLoader::_run_load_task, load_task_ptr);
} }
} }
if (run_on_current_thread) { if (run_on_current_thread) {
_thread_load_function(load_task_ptr); _run_load_task(load_task_ptr);
if (must_not_register) { if (must_not_register) {
load_token->res_if_unregistered = load_task_ptr->resource; load_token->res_if_unregistered = load_task_ptr->resource;
} }
@ -702,7 +721,7 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro
if (load_task.status == THREAD_LOAD_IN_PROGRESS) { if (load_task.status == THREAD_LOAD_IN_PROGRESS) {
DEV_ASSERT((load_task.task_id == 0) != (load_task.thread_id == 0)); DEV_ASSERT((load_task.task_id == 0) != (load_task.thread_id == 0));
if ((load_task.task_id != 0 && load_task.task_id == caller_task_id) || if ((load_task.task_id != 0 && load_task.task_id == WorkerThreadPool::get_singleton()->get_caller_task_id()) ||
(load_task.thread_id != 0 && load_task.thread_id == Thread::get_caller_id())) { (load_task.thread_id != 0 && load_task.thread_id == Thread::get_caller_id())) {
// Load is in progress, but it's precisely this thread the one in charge. // Load is in progress, but it's precisely this thread the one in charge.
// That means this is a cyclic load. // That means this is a cyclic load.
@ -731,12 +750,10 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro
// resource loading that means that the task to wait for can be restarted here to break the // resource loading that means that the task to wait for can be restarted here to break the
// cycle, with as much recursion into this process as needed. // cycle, with as much recursion into this process as needed.
// When the stack is eventually unrolled, the original load will have been notified to go on. // When the stack is eventually unrolled, the original load will have been notified to go on.
// CACHE_MODE_IGNORE is needed because, otherwise, the new request would just see there's _run_load_task(&load_task);
// an ongoing load for that resource and wait for it again. This value forces a new load. Ref<Resource> resource = load_task.resource;
Ref<ResourceLoader::LoadToken> token = _load_start(load_task.local_path, load_task.type_hint, LOAD_THREAD_DISTRIBUTE, ResourceFormatLoader::CACHE_MODE_IGNORE);
Ref<Resource> resource = _load_complete(*token.ptr(), &wtp_task_err);
if (r_error) { if (r_error) {
*r_error = wtp_task_err; *r_error = load_task.error;
} }
thread_load_mutex.lock(); thread_load_mutex.lock();
return resource; return resource;
@ -1324,7 +1341,6 @@ bool ResourceLoader::abort_on_missing_resource = true;
bool ResourceLoader::timestamp_on_load = false; bool ResourceLoader::timestamp_on_load = false;
thread_local int ResourceLoader::load_nesting = 0; thread_local int ResourceLoader::load_nesting = 0;
thread_local WorkerThreadPool::TaskID ResourceLoader::caller_task_id = 0;
thread_local Vector<String> ResourceLoader::load_paths_stack; thread_local Vector<String> ResourceLoader::load_paths_stack;
thread_local HashMap<int, HashMap<String, Ref<Resource>>> ResourceLoader::res_ref_overrides; thread_local HashMap<int, HashMap<String, Ref<Resource>>> ResourceLoader::res_ref_overrides;

View file

@ -187,10 +187,9 @@ private:
HashSet<String> sub_tasks; HashSet<String> sub_tasks;
}; };
static void _thread_load_function(void *p_userdata); static void _run_load_task(void *p_userdata);
static thread_local int load_nesting; static thread_local int load_nesting;
static thread_local WorkerThreadPool::TaskID caller_task_id;
static thread_local HashMap<int, HashMap<String, Ref<Resource>>> res_ref_overrides; // Outermost key is nesting level. static thread_local HashMap<int, HashMap<String, Ref<Resource>>> res_ref_overrides; // Outermost key is nesting level.
static thread_local Vector<String> load_paths_stack; static thread_local Vector<String> load_paths_stack;
static SafeBinaryMutex<BINARY_MUTEX_TAG> thread_load_mutex; static SafeBinaryMutex<BINARY_MUTEX_TAG> thread_load_mutex;

View file

@ -665,6 +665,15 @@ int WorkerThreadPool::get_thread_index() {
return singleton->thread_ids.has(tid) ? singleton->thread_ids[tid] : -1; return singleton->thread_ids.has(tid) ? singleton->thread_ids[tid] : -1;
} }
WorkerThreadPool::TaskID WorkerThreadPool::get_caller_task_id() {
int th_index = get_thread_index();
if (th_index != -1 && singleton->threads[th_index].current_task) {
return singleton->threads[th_index].current_task->self;
} else {
return INVALID_TASK_ID;
}
}
#ifdef THREADS_ENABLED #ifdef THREADS_ENABLED
uint32_t WorkerThreadPool::thread_enter_unlock_allowance_zone(Mutex *p_mutex) { uint32_t WorkerThreadPool::thread_enter_unlock_allowance_zone(Mutex *p_mutex) {
return _thread_enter_unlock_allowance_zone(p_mutex, false); return _thread_enter_unlock_allowance_zone(p_mutex, false);

View file

@ -239,6 +239,7 @@ public:
static WorkerThreadPool *get_singleton() { return singleton; } static WorkerThreadPool *get_singleton() { return singleton; }
static int get_thread_index(); static int get_thread_index();
static TaskID get_caller_task_id();
#ifdef THREADS_ENABLED #ifdef THREADS_ENABLED
static uint32_t thread_enter_unlock_allowance_zone(Mutex *p_mutex); static uint32_t thread_enter_unlock_allowance_zone(Mutex *p_mutex);