Merge pull request #93032 from RandomShaper/wtp_antilock

GDScript: Avoid deadlock possibility in multi-threaded load
This commit is contained in:
Rémi Verschelde 2024-06-28 14:42:48 +02:00
commit 6f8b90e412
No known key found for this signature in database
GPG key ID: C3336907360768E1
4 changed files with 96 additions and 27 deletions

View file

@ -33,7 +33,6 @@
#include "core/object/script_language.h"
#include "core/os/os.h"
#include "core/os/thread_safe.h"
#include "core/templates/command_queue_mt.h"
WorkerThreadPool::Task *const WorkerThreadPool::ThreadData::YIELDING = (Task *)1;
@ -46,7 +45,9 @@ void WorkerThreadPool::Task::free_template_userdata() {
WorkerThreadPool *WorkerThreadPool::singleton = nullptr;
thread_local CommandQueueMT *WorkerThreadPool::flushing_cmd_queue = nullptr;
#ifdef THREADS_ENABLED
thread_local uintptr_t WorkerThreadPool::unlockable_mutexes[MAX_UNLOCKABLE_MUTEXES] = {};
#endif
void WorkerThreadPool::_process_task(Task *p_task) {
#ifdef THREADS_ENABLED
@ -419,6 +420,34 @@ Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) {
return OK;
}
void WorkerThreadPool::_lock_unlockable_mutexes() {
#ifdef THREADS_ENABLED
for (uint32_t i = 0; i < MAX_UNLOCKABLE_MUTEXES; i++) {
if (unlockable_mutexes[i]) {
if ((((uintptr_t)unlockable_mutexes[i]) & 1) == 0) {
((Mutex *)unlockable_mutexes[i])->lock();
} else {
((BinaryMutex *)unlockable_mutexes[i])->lock();
}
}
}
#endif
}
void WorkerThreadPool::_unlock_unlockable_mutexes() {
#ifdef THREADS_ENABLED
for (uint32_t i = 0; i < MAX_UNLOCKABLE_MUTEXES; i++) {
if (unlockable_mutexes[i]) {
if ((((uintptr_t)unlockable_mutexes[i]) & 1) == 0) {
((Mutex *)unlockable_mutexes[i])->unlock();
} else {
((BinaryMutex *)unlockable_mutexes[i])->unlock();
}
}
}
#endif
}
void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task) {
// Keep processing tasks until the condition to stop waiting is met.
@ -426,6 +455,7 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
while (true) {
Task *task_to_process = nullptr;
bool relock_unlockables = false;
{
MutexLock lock(task_mutex);
bool was_signaled = p_caller_pool_thread->signaled;
@ -463,13 +493,9 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
if (!task_to_process) {
p_caller_pool_thread->awaited_task = p_task;
if (flushing_cmd_queue) {
flushing_cmd_queue->unlock();
}
_unlock_unlockable_mutexes();
relock_unlockables = true;
p_caller_pool_thread->cond_var.wait(lock);
if (flushing_cmd_queue) {
flushing_cmd_queue->lock();
}
DEV_ASSERT(exit_threads || p_caller_pool_thread->signaled || IS_WAIT_OVER);
p_caller_pool_thread->awaited_task = nullptr;
@ -477,6 +503,10 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
}
}
if (relock_unlockables) {
_lock_unlockable_mutexes();
}
if (task_to_process) {
_process_task(task_to_process);
}
@ -603,13 +633,9 @@ void WorkerThreadPool::wait_for_group_task_completion(GroupID p_group) {
{
Group *group = *groupp;
if (flushing_cmd_queue) {
flushing_cmd_queue->unlock();
}
_unlock_unlockable_mutexes();
group->done_semaphore.wait();
if (flushing_cmd_queue) {
flushing_cmd_queue->lock();
}
_lock_unlockable_mutexes();
uint32_t max_users = group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment.
uint32_t finished_users = group->finished.increment(); // fetch happens before inc, so increment later.
@ -633,16 +659,41 @@ int WorkerThreadPool::get_thread_index() {
return singleton->thread_ids.has(tid) ? singleton->thread_ids[tid] : -1;
}
void WorkerThreadPool::thread_enter_command_queue_mt_flush(CommandQueueMT *p_queue) {
ERR_FAIL_COND(flushing_cmd_queue != nullptr);
flushing_cmd_queue = p_queue;
#ifdef THREADS_ENABLED
uint32_t WorkerThreadPool::thread_enter_unlock_allowance_zone(Mutex *p_mutex) {
return _thread_enter_unlock_allowance_zone(p_mutex, false);
}
void WorkerThreadPool::thread_exit_command_queue_mt_flush() {
ERR_FAIL_NULL(flushing_cmd_queue);
flushing_cmd_queue = nullptr;
uint32_t WorkerThreadPool::thread_enter_unlock_allowance_zone(BinaryMutex *p_mutex) {
return _thread_enter_unlock_allowance_zone(p_mutex, true);
}
uint32_t WorkerThreadPool::_thread_enter_unlock_allowance_zone(void *p_mutex, bool p_is_binary) {
for (uint32_t i = 0; i < MAX_UNLOCKABLE_MUTEXES; i++) {
if (unlikely(unlockable_mutexes[i] == (uintptr_t)p_mutex)) {
// Already registered in the current thread.
return UINT32_MAX;
}
if (!unlockable_mutexes[i]) {
unlockable_mutexes[i] = (uintptr_t)p_mutex;
if (p_is_binary) {
unlockable_mutexes[i] |= 1;
}
return i;
}
}
ERR_FAIL_V_MSG(UINT32_MAX, "No more unlockable mutex slots available. Engine bug.");
}
void WorkerThreadPool::thread_exit_unlock_allowance_zone(uint32_t p_zone_id) {
if (p_zone_id == UINT32_MAX) {
return;
}
DEV_ASSERT(unlockable_mutexes[p_zone_id]);
unlockable_mutexes[p_zone_id] = 0;
}
#endif
void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio) {
ERR_FAIL_COND(threads.size() > 0);
if (p_thread_count < 0) {

View file

@ -41,8 +41,6 @@
#include "core/templates/rid.h"
#include "core/templates/safe_refcount.h"
class CommandQueueMT;
class WorkerThreadPool : public Object {
GDCLASS(WorkerThreadPool, Object)
public:
@ -163,7 +161,10 @@ private:
static WorkerThreadPool *singleton;
static thread_local CommandQueueMT *flushing_cmd_queue;
#ifdef THREADS_ENABLED
static const uint32_t MAX_UNLOCKABLE_MUTEXES = 2;
static thread_local uintptr_t unlockable_mutexes[MAX_UNLOCKABLE_MUTEXES];
#endif
TaskID _add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description);
GroupID _add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description);
@ -190,6 +191,13 @@ private:
void _wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task);
#ifdef THREADS_ENABLED
static uint32_t _thread_enter_unlock_allowance_zone(void *p_mutex, bool p_is_binary);
#endif
void _lock_unlockable_mutexes();
void _unlock_unlockable_mutexes();
protected:
static void _bind_methods();
@ -232,8 +240,14 @@ public:
static WorkerThreadPool *get_singleton() { return singleton; }
static int get_thread_index();
static void thread_enter_command_queue_mt_flush(CommandQueueMT *p_queue);
static void thread_exit_command_queue_mt_flush();
#ifdef THREADS_ENABLED
static uint32_t thread_enter_unlock_allowance_zone(Mutex *p_mutex);
static uint32_t thread_enter_unlock_allowance_zone(BinaryMutex *p_mutex);
static void thread_exit_unlock_allowance_zone(uint32_t p_zone_id);
#else
static uint32_t thread_enter_unlock_allowance_zone(void *p_mutex) { return UINT32_MAX; }
static void thread_exit_unlock_allowance_zone(uint32_t p_zone_id) {}
#endif
void init(int p_thread_count = -1, float p_low_priority_task_ratio = 0.3);
void finish();

View file

@ -364,7 +364,7 @@ class CommandQueueMT {
lock();
WorkerThreadPool::thread_enter_command_queue_mt_flush(this);
uint32_t allowance_id = WorkerThreadPool::thread_enter_unlock_allowance_zone(&mutex);
while (flush_read_ptr < command_mem.size()) {
uint64_t size = *(uint64_t *)&command_mem[flush_read_ptr];
flush_read_ptr += 8;
@ -383,7 +383,7 @@ class CommandQueueMT {
flush_read_ptr += size;
}
WorkerThreadPool::thread_exit_command_queue_mt_flush();
WorkerThreadPool::thread_exit_unlock_allowance_zone(allowance_id);
command_mem.clear();
flush_read_ptr = 0;

View file

@ -340,7 +340,11 @@ Ref<GDScript> GDScriptCache::get_full_script(const String &p_path, Error &r_erro
}
}
// Allowing lifting the lock might cause a script to be reloaded multiple times,
// which, as a last resort deadlock prevention strategy, is a good tradeoff.
uint32_t allowance_id = WorkerThreadPool::thread_enter_unlock_allowance_zone(&singleton->mutex);
r_error = script->reload(true);
WorkerThreadPool::thread_exit_unlock_allowance_zone(allowance_id);
if (r_error) {
return script;
}