diff --git a/core/os/os.h b/core/os/os.h index 688ac26d766..5809f486a3a 100644 --- a/core/os/os.h +++ b/core/os/os.h @@ -528,6 +528,7 @@ public: virtual int get_processor_count() const; virtual String get_processor_name() const; + virtual int get_default_thread_pool_size() const { return get_processor_count(); } virtual String get_unique_id() const; diff --git a/core/os/thread_work_pool.cpp b/core/os/thread_work_pool.cpp new file mode 100644 index 00000000000..a75fd06b9bf --- /dev/null +++ b/core/os/thread_work_pool.cpp @@ -0,0 +1,81 @@ +/*************************************************************************/ +/* thread_work_pool.cpp */ +/*************************************************************************/ +/* This file is part of: */ +/* GODOT ENGINE */ +/* https://godotengine.org */ +/*************************************************************************/ +/* Copyright (c) 2007-2022 Juan Linietsky, Ariel Manzur. */ +/* Copyright (c) 2014-2022 Godot Engine contributors (cf. AUTHORS.md). */ +/* */ +/* Permission is hereby granted, free of charge, to any person obtaining */ +/* a copy of this software and associated documentation files (the */ +/* "Software"), to deal in the Software without restriction, including */ +/* without limitation the rights to use, copy, modify, merge, publish, */ +/* distribute, sublicense, and/or sell copies of the Software, and to */ +/* permit persons to whom the Software is furnished to do so, subject to */ +/* the following conditions: */ +/* */ +/* The above copyright notice and this permission notice shall be */ +/* included in all copies or substantial portions of the Software. */ +/* */ +/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */ +/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */ +/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/ +/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */ +/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */ +/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */ +/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +/*************************************************************************/ + +#include "thread_work_pool.h" + +#include "core/os/os.h" + +void ThreadWorkPool::_thread_function(void *p_user) { + ThreadData *thread = static_cast(p_user); + while (true) { + thread->start.wait(); + if (thread->exit.load()) { + break; + } + thread->work->work(); + thread->completed.post(); + } +} + +void ThreadWorkPool::init(int p_thread_count) { + ERR_FAIL_COND(threads != nullptr); + if (p_thread_count < 0) { + p_thread_count = OS::get_singleton()->get_default_thread_pool_size(); + } + + thread_count = p_thread_count; + threads = memnew_arr(ThreadData, thread_count); + + for (uint32_t i = 0; i < thread_count; i++) { + threads[i].exit.store(false); + threads[i].thread.start(&ThreadWorkPool::_thread_function, &threads[i]); + } +} + +void ThreadWorkPool::finish() { + if (threads == nullptr) { + return; + } + + for (uint32_t i = 0; i < thread_count; i++) { + threads[i].exit.store(true); + threads[i].start.post(); + } + for (uint32_t i = 0; i < thread_count; i++) { + threads[i].thread.wait_to_finish(); + } + + memdelete_arr(threads); + threads = nullptr; +} + +ThreadWorkPool::~ThreadWorkPool() { + finish(); +} diff --git a/core/os/thread_work_pool.h b/core/os/thread_work_pool.h new file mode 100644 index 00000000000..b0cebf04f11 --- /dev/null +++ b/core/os/thread_work_pool.h @@ -0,0 +1,157 @@ +/*************************************************************************/ +/* thread_work_pool.h */ +/*************************************************************************/ +/* This file is part of: */ +/* GODOT ENGINE */ +/* https://godotengine.org */ +/*************************************************************************/ +/* Copyright (c) 2007-2022 Juan Linietsky, Ariel Manzur. */ +/* Copyright (c) 2014-2022 Godot Engine contributors (cf. AUTHORS.md). */ +/* */ +/* Permission is hereby granted, free of charge, to any person obtaining */ +/* a copy of this software and associated documentation files (the */ +/* "Software"), to deal in the Software without restriction, including */ +/* without limitation the rights to use, copy, modify, merge, publish, */ +/* distribute, sublicense, and/or sell copies of the Software, and to */ +/* permit persons to whom the Software is furnished to do so, subject to */ +/* the following conditions: */ +/* */ +/* The above copyright notice and this permission notice shall be */ +/* included in all copies or substantial portions of the Software. */ +/* */ +/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */ +/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */ +/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/ +/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */ +/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */ +/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */ +/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +/*************************************************************************/ + +#ifndef THREAD_WORK_POOL_H +#define THREAD_WORK_POOL_H + +#include "core/os/memory.h" +#include "core/os/semaphore.h" +#include "core/os/thread.h" + +#include + +class ThreadWorkPool { + std::atomic index; + + struct BaseWork { + std::atomic *index = nullptr; + uint32_t max_elements = 0; + virtual void work() = 0; + virtual ~BaseWork() = default; + }; + + template + struct Work : public BaseWork { + C *instance; + M method; + U userdata; + virtual void work() override { + while (true) { + uint32_t work_index = index->fetch_add(1, std::memory_order_relaxed); + if (work_index >= max_elements) { + break; + } + (instance->*method)(work_index, userdata); + } + } + }; + + struct ThreadData { + Thread thread; + Semaphore start; + Semaphore completed; + std::atomic exit; + BaseWork *work = nullptr; + }; + + ThreadData *threads = nullptr; + uint32_t thread_count = 0; + uint32_t threads_working = 0; + BaseWork *current_work = nullptr; + + static void _thread_function(void *p_user); + +public: + template + void begin_work(uint32_t p_elements, C *p_instance, M p_method, U p_userdata) { + ERR_FAIL_COND(!threads); //never initialized + ERR_FAIL_COND(current_work != nullptr); + + index.store(0, std::memory_order_release); + + Work *w = memnew((Work)); + w->instance = p_instance; + w->userdata = p_userdata; + w->method = p_method; + w->index = &index; + w->max_elements = p_elements; + + current_work = w; + + threads_working = MIN(p_elements, thread_count); + + for (uint32_t i = 0; i < threads_working; i++) { + threads[i].work = w; + threads[i].start.post(); + } + } + + bool is_working() const { + return current_work != nullptr; + } + + bool is_done_dispatching() const { + ERR_FAIL_COND_V(current_work == nullptr, true); + return index.load(std::memory_order_acquire) >= current_work->max_elements; + } + + uint32_t get_work_index() const { + ERR_FAIL_COND_V(current_work == nullptr, 0); + uint32_t idx = index.load(std::memory_order_acquire); + return MIN(idx, current_work->max_elements); + } + + void end_work() { + ERR_FAIL_COND(current_work == nullptr); + for (uint32_t i = 0; i < threads_working; i++) { + threads[i].completed.wait(); + threads[i].work = nullptr; + } + + threads_working = 0; + memdelete(current_work); + current_work = nullptr; + } + + template + void do_work(uint32_t p_elements, C *p_instance, M p_method, U p_userdata) { + switch (p_elements) { + case 0: + // Nothing to do, so do nothing. + break; + case 1: + // No value in pushing the work to another thread if it's a single job + // and we're going to wait for it to finish. Just run it right here. + (p_instance->*p_method)(0, p_userdata); + break; + default: + // Multiple jobs to do; commence threaded business. + begin_work(p_elements, p_instance, p_method, p_userdata); + end_work(); + } + } + + _FORCE_INLINE_ int get_thread_count() const { return thread_count; } + void init(int p_thread_count = -1); + void finish(); + ~ThreadWorkPool(); +}; + +#endif // THREAD_POOL_H diff --git a/modules/navigation/nav_map.cpp b/modules/navigation/nav_map.cpp index 574b45c271f..795e116f93c 100644 --- a/modules/navigation/nav_map.cpp +++ b/modules/navigation/nav_map.cpp @@ -30,7 +30,6 @@ #include "nav_map.h" -#include "core/os/threaded_array_processor.h" #include "nav_region.h" #include "rvo_agent.h" #include @@ -52,6 +51,10 @@ NavMap::NavMap() : deltatime(0.0), map_update_id(0) {} +NavMap::~NavMap() { + step_work_pool.finish(); +} + void NavMap::set_up(Vector3 p_up) { up = p_up; regenerate_polygons = true; @@ -702,7 +705,10 @@ void NavMap::compute_single_step(uint32_t index, RvoAgent **agent) { void NavMap::step(real_t p_deltatime) { deltatime = p_deltatime; if (controlled_agents.size() > 0) { - thread_process_array( + if (step_work_pool.get_thread_count() == 0) { + step_work_pool.init(); + } + step_work_pool.do_work( controlled_agents.size(), this, &NavMap::compute_single_step, diff --git a/modules/navigation/nav_map.h b/modules/navigation/nav_map.h index 50c6bebafd2..3750071d2a1 100644 --- a/modules/navigation/nav_map.h +++ b/modules/navigation/nav_map.h @@ -34,6 +34,7 @@ #include "nav_rid.h" #include "core/math/math_defs.h" +#include "core/os/thread_work_pool.h" #include "nav_utils.h" #include @@ -83,8 +84,12 @@ class NavMap : public NavRid { /// Change the id each time the map is updated. uint32_t map_update_id; + /// Pooled threads for computing steps + ThreadWorkPool step_work_pool; + public: NavMap(); + ~NavMap(); void set_up(Vector3 p_up); Vector3 get_up() const {