Merge pull request #86587 from RandomShaper/wtp_enhance

Enhance & fix `WorkerThreadPool`
This commit is contained in:
Rémi Verschelde 2024-01-11 17:38:09 +01:00
commit dc79e956b6
No known key found for this signature in database
GPG key ID: C3336907360768E1
11 changed files with 389 additions and 308 deletions

View file

@ -630,15 +630,16 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro
if (load_task.task_id != 0) {
// Loading thread is in the worker pool.
load_task.awaited = true;
thread_load_mutex.unlock();
Error err = WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id);
if (err == ERR_BUSY) {
// The WorkerThreadPool has scheduled tasks in a way that the current load depends on
// another one in a lower stack frame. Restart such load here. When the stack is eventually
// unrolled, the original load will have been notified to go on.
// The WorkerThreadPool has reported that the current task wants to await on an older one.
// That't not allowed for safety, to avoid deadlocks. Fortunately, though, in the context of
// resource loading that means that the task to wait for can be restarted here to break the
// cycle, with as much recursion into this process as needed.
// When the stack is eventually unrolled, the original load will have been notified to go on.
#ifdef DEV_ENABLED
print_verbose("ResourceLoader: Load task happened to wait on another one deep in the call stack. Attempting to avoid deadlock by re-issuing the load now.");
print_verbose("ResourceLoader: Potential for deadlock detected in task dependency. Attempting to avoid it by re-issuing the load now.");
#endif
// CACHE_MODE_IGNORE is needed because, otherwise, the new request would just see there's
// an ongoing load for that resource and wait for it again. This value forces a new load.
@ -652,6 +653,7 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro
} else {
DEV_ASSERT(err == OK);
thread_load_mutex.lock();
load_task.awaited = true;
}
} else {
// Loading thread is main or user thread.

View file

@ -33,6 +33,7 @@
#include "core/object/script_language.h"
#include "core/os/os.h"
#include "core/os/thread_safe.h"
#include "core/templates/command_queue_mt.h"
void WorkerThreadPool::Task::free_template_userdata() {
ERR_FAIL_NULL(template_userdata);
@ -43,24 +44,17 @@ void WorkerThreadPool::Task::free_template_userdata() {
WorkerThreadPool *WorkerThreadPool::singleton = nullptr;
void WorkerThreadPool::_process_task_queue() {
task_mutex.lock();
Task *task = task_queue.first()->self();
task_queue.remove(task_queue.first());
task_mutex.unlock();
_process_task(task);
}
thread_local CommandQueueMT *WorkerThreadPool::flushing_cmd_queue = nullptr;
void WorkerThreadPool::_process_task(Task *p_task) {
bool low_priority = p_task->low_priority;
int pool_thread_index = -1;
Task *prev_low_prio_task = nullptr; // In case this is recursively called.
int pool_thread_index = thread_ids[Thread::get_caller_id()];
ThreadData &curr_thread = threads[pool_thread_index];
Task *prev_task = nullptr; // In case this is recursively called.
bool safe_for_nodes_backup = is_current_thread_safe_for_nodes();
if (!use_native_low_priority_threads) {
{
// Tasks must start with this unset. They are free to set-and-forget otherwise.
set_current_thread_safe_for_nodes(false);
pool_thread_index = thread_ids[Thread::get_caller_id()];
ThreadData &curr_thread = threads[pool_thread_index];
// Since the WorkerThreadPool is started before the script server,
// its pre-created threads can't have ScriptServer::thread_enter() called on them early.
// Therefore, we do it late at the first opportunity, so in case the task
@ -71,13 +65,8 @@ void WorkerThreadPool::_process_task(Task *p_task) {
}
task_mutex.lock();
p_task->pool_thread_index = pool_thread_index;
if (low_priority) {
low_priority_tasks_running++;
prev_low_prio_task = curr_thread.current_low_prio_task;
curr_thread.current_low_prio_task = p_task;
} else {
curr_thread.current_low_prio_task = nullptr;
}
prev_task = curr_thread.current_task;
curr_thread.current_task = p_task;
task_mutex.unlock();
}
@ -111,33 +100,24 @@ void WorkerThreadPool::_process_task(Task *p_task) {
memdelete(p_task->template_userdata); // This is no longer needed at this point, so get rid of it.
}
if (low_priority && use_native_low_priority_threads) {
p_task->completed = true;
p_task->done_semaphore.post();
if (do_post) {
p_task->group->completed.set_to(true);
}
} else {
if (do_post) {
p_task->group->done_semaphore.post();
p_task->group->completed.set_to(true);
}
uint32_t max_users = p_task->group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment.
uint32_t finished_users = p_task->group->finished.increment();
if (finished_users == max_users) {
// Get rid of the group, because nobody else is using it.
task_mutex.lock();
group_allocator.free(p_task->group);
task_mutex.unlock();
}
// For groups, tasks get rid of themselves.
if (do_post) {
p_task->group->done_semaphore.post();
p_task->group->completed.set_to(true);
}
uint32_t max_users = p_task->group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment.
uint32_t finished_users = p_task->group->finished.increment();
if (finished_users == max_users) {
// Get rid of the group, because nobody else is using it.
task_mutex.lock();
task_allocator.free(p_task);
group_allocator.free(p_task->group);
task_mutex.unlock();
}
// For groups, tasks get rid of themselves.
task_mutex.lock();
task_allocator.free(p_task);
} else {
if (p_task->native_func) {
p_task->native_func(p_task->native_func_userdata);
@ -150,88 +130,162 @@ void WorkerThreadPool::_process_task(Task *p_task) {
task_mutex.lock();
p_task->completed = true;
for (uint8_t i = 0; i < p_task->waiting; i++) {
p_task->done_semaphore.post();
p_task->pool_thread_index = -1;
if (p_task->waiting_user) {
p_task->done_semaphore.post(p_task->waiting_user);
}
if (!use_native_low_priority_threads) {
p_task->pool_thread_index = -1;
// Let awaiters know.
for (uint32_t i = 0; i < threads.size(); i++) {
if (threads[i].awaited_task == p_task) {
threads[i].cond_var.notify_one();
threads[i].signaled = true;
}
}
task_mutex.unlock(); // Keep mutex down to here since on unlock the task may be freed.
}
// Task may have been freed by now (all callers notified).
p_task = nullptr;
if (!use_native_low_priority_threads) {
bool post = false;
task_mutex.lock();
ThreadData &curr_thread = threads[pool_thread_index];
curr_thread.current_low_prio_task = prev_low_prio_task;
if (low_priority) {
{
curr_thread.current_task = prev_task;
if (p_task->low_priority) {
low_priority_threads_used--;
low_priority_tasks_running--;
// A low prioriry task was freed, so see if we can move a pending one to the high priority queue.
if (_try_promote_low_priority_task()) {
post = true;
}
if (low_priority_tasks_awaiting_others == low_priority_tasks_running) {
_prevent_low_prio_saturation_deadlock();
if (_try_promote_low_priority_task()) {
if (prev_task) { // Otherwise, this thread will catch it.
_notify_threads(&curr_thread, 1, 0);
}
}
}
task_mutex.unlock();
if (post) {
task_available_semaphore.post();
}
}
set_current_thread_safe_for_nodes(safe_for_nodes_backup);
}
void WorkerThreadPool::_thread_function(void *p_user) {
ThreadData *thread_data = (ThreadData *)p_user;
while (true) {
singleton->task_available_semaphore.wait();
if (singleton->exit_threads) {
break;
Task *task_to_process = nullptr;
{
MutexLock lock(singleton->task_mutex);
if (singleton->exit_threads) {
return;
}
thread_data->signaled = false;
if (singleton->task_queue.first()) {
task_to_process = singleton->task_queue.first()->self();
singleton->task_queue.remove(singleton->task_queue.first());
} else {
thread_data->cond_var.wait(lock);
DEV_ASSERT(singleton->exit_threads || thread_data->signaled);
}
}
if (task_to_process) {
singleton->_process_task(task_to_process);
}
singleton->_process_task_queue();
}
}
void WorkerThreadPool::_native_low_priority_thread_function(void *p_user) {
Task *task = (Task *)p_user;
singleton->_process_task(task);
}
void WorkerThreadPool::_post_task(Task *p_task, bool p_high_priority) {
void WorkerThreadPool::_post_tasks_and_unlock(Task **p_tasks, uint32_t p_count, bool p_high_priority) {
// Fall back to processing on the calling thread if there are no worker threads.
// Separated into its own variable to make it easier to extend this logic
// in custom builds.
bool process_on_calling_thread = threads.size() == 0;
if (process_on_calling_thread) {
_process_task(p_task);
task_mutex.unlock();
for (uint32_t i = 0; i < p_count; i++) {
_process_task(p_tasks[i]);
}
return;
}
task_mutex.lock();
p_task->low_priority = !p_high_priority;
if (!p_high_priority && use_native_low_priority_threads) {
p_task->low_priority_thread = native_thread_allocator.alloc();
task_mutex.unlock();
uint32_t to_process = 0;
uint32_t to_promote = 0;
if (p_task->group) {
p_task->group->low_priority_native_tasks.push_back(p_task);
ThreadData *caller_pool_thread = thread_ids.has(Thread::get_caller_id()) ? &threads[thread_ids[Thread::get_caller_id()]] : nullptr;
for (uint32_t i = 0; i < p_count; i++) {
p_tasks[i]->low_priority = !p_high_priority;
if (p_high_priority || low_priority_threads_used < max_low_priority_threads) {
task_queue.add_last(&p_tasks[i]->task_elem);
if (!p_high_priority) {
low_priority_threads_used++;
}
to_process++;
} else {
// Too many threads using low priority, must go to queue.
low_priority_task_queue.add_last(&p_tasks[i]->task_elem);
to_promote++;
}
p_task->low_priority_thread->start(_native_low_priority_thread_function, p_task); // Pask task directly to thread.
} else if (p_high_priority || low_priority_threads_used < max_low_priority_threads) {
task_queue.add_last(&p_task->task_elem);
if (!p_high_priority) {
low_priority_threads_used++;
}
_notify_threads(caller_pool_thread, to_process, to_promote);
task_mutex.unlock();
}
void WorkerThreadPool::_notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count) {
uint32_t to_process = p_process_count;
uint32_t to_promote = p_promote_count;
// This is where which threads are awaken is decided according to the workload.
// Threads that will anyway have a chance to check the situation and process/promote tasks
// are excluded from being notified. Others will be tried anyway to try to distribute load.
// The current thread, if is a pool thread, is also excluded depending on the promoting/processing
// needs because it will anyway loop again. However, it will contribute to decreasing the count,
// which helps reducing sync traffic.
uint32_t thread_count = threads.size();
// First round:
// 1. For processing: notify threads that are not running tasks, to keep the stacks as shallow as possible.
// 2. For promoting: since it's exclusive with processing, we fin threads able to promote low-prio tasks now.
for (uint32_t i = 0;
i < thread_count && (to_process || to_promote);
i++, notify_index = (notify_index + 1) % thread_count) {
ThreadData &th = threads[notify_index];
if (th.signaled) {
continue;
}
if (th.current_task) {
// Good thread for promoting low-prio?
if (to_promote && th.awaited_task && th.current_task->low_priority) {
if (likely(&th != p_current_thread_data)) {
th.cond_var.notify_one();
}
th.signaled = true;
to_promote--;
}
} else {
if (to_process) {
if (likely(&th != p_current_thread_data)) {
th.cond_var.notify_one();
}
th.signaled = true;
to_process--;
}
}
}
// Second round:
// For processing: if the first round wasn't enough, let's try now with threads processing tasks but currently awaiting.
for (uint32_t i = 0;
i < thread_count && to_process;
i++, notify_index = (notify_index + 1) % thread_count) {
ThreadData &th = threads[notify_index];
if (th.signaled) {
continue;
}
if (th.awaited_task) {
if (likely(&th != p_current_thread_data)) {
th.cond_var.notify_one();
}
th.signaled = true;
to_process--;
}
task_mutex.unlock();
task_available_semaphore.post();
} else {
// Too many threads using low priority, must go to queue.
low_priority_task_queue.add_last(&p_task->task_elem);
task_mutex.unlock();
}
}
@ -247,23 +301,6 @@ bool WorkerThreadPool::_try_promote_low_priority_task() {
}
}
void WorkerThreadPool::_prevent_low_prio_saturation_deadlock() {
if (low_priority_tasks_awaiting_others == low_priority_tasks_running) {
#ifdef DEV_ENABLED
print_verbose("WorkerThreadPool: Low-prio slots saturated with tasks all waiting for other low-prio tasks. Attempting to avoid deadlock by scheduling one extra task.");
#endif
// In order not to create dependency cycles, we can only schedule the next one.
// We'll keep doing the same until the deadlock is broken,
SelfList<Task> *to_promote = low_priority_task_queue.first();
if (to_promote) {
low_priority_task_queue.remove(to_promote);
task_queue.add_last(to_promote);
low_priority_threads_used++;
task_available_semaphore.post();
}
}
}
WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void *), void *p_userdata, bool p_high_priority, const String &p_description) {
return _add_task(Callable(), p_func, p_userdata, nullptr, p_high_priority, p_description);
}
@ -273,15 +310,15 @@ WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable,
// Get a free task
Task *task = task_allocator.alloc();
TaskID id = last_task++;
task->self = id;
task->callable = p_callable;
task->native_func = p_func;
task->native_func_userdata = p_userdata;
task->description = p_description;
task->template_userdata = p_template_userdata;
tasks.insert(id, task);
task_mutex.unlock();
_post_task(task, p_high_priority);
_post_tasks_and_unlock(&task, 1, p_high_priority);
return id;
}
@ -313,105 +350,117 @@ Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) {
}
Task *task = *taskp;
if (!task->completed) {
if (!use_native_low_priority_threads && task->pool_thread_index != -1) { // Otherwise, it's not running yet.
int caller_pool_th_index = thread_ids.has(Thread::get_caller_id()) ? thread_ids[Thread::get_caller_id()] : -1;
if (caller_pool_th_index == task->pool_thread_index) {
// Deadlock prevention.
// Waiting for a task run on this same thread? That means the task to be awaited started waiting as well
// and another task was run to make use of the thread in the meantime, with enough bad luck as to
// the need to wait for the original task arose in turn.
// In other words, the task we want to wait for is buried in the stack.
// Let's report the caller about the issue to it handles as it sees fit.
task_mutex.unlock();
return ERR_BUSY;
}
if (task->completed) {
if (task->waiting_pool == 0 && task->waiting_user == 0) {
tasks.erase(p_task_id);
task_allocator.free(task);
}
task->waiting++;
bool is_low_prio_waiting_for_another = false;
if (!use_native_low_priority_threads) {
// Deadlock prevention:
// If all low-prio tasks are waiting for other low-prio tasks and there are no more free low-prio slots,
// we have a no progressable situation. We can apply a workaround, consisting in promoting an awaited queued
// low-prio task to the schedule queue so it can run and break the "impasse".
// NOTE: A similar reasoning could be made about high priority tasks, but there are usually much more
// than low-prio. Therefore, a deadlock there would only happen when dealing with a very complex task graph
// or when there are too few worker threads (limited platforms or exotic settings). If that turns out to be
// an issue in the real world, a further fix can be applied against that.
if (task->low_priority) {
bool awaiter_is_a_low_prio_task = thread_ids.has(Thread::get_caller_id()) && threads[thread_ids[Thread::get_caller_id()]].current_low_prio_task;
if (awaiter_is_a_low_prio_task) {
is_low_prio_waiting_for_another = true;
low_priority_tasks_awaiting_others++;
if (low_priority_tasks_awaiting_others == low_priority_tasks_running) {
_prevent_low_prio_saturation_deadlock();
}
}
}
}
task_mutex.unlock();
if (use_native_low_priority_threads && task->low_priority) {
task->done_semaphore.wait();
} else {
bool current_is_pool_thread = thread_ids.has(Thread::get_caller_id());
if (current_is_pool_thread) {
// We are an actual process thread, we must not be blocked so continue processing stuff if available.
bool must_exit = false;
while (true) {
if (task->done_semaphore.try_wait()) {
// If done, exit
break;
}
if (!must_exit) {
if (task_available_semaphore.try_wait()) {
if (exit_threads) {
must_exit = true;
} else {
// Solve tasks while they are around.
bool safe_for_nodes_backup = is_current_thread_safe_for_nodes();
_process_task_queue();
set_current_thread_safe_for_nodes(safe_for_nodes_backup);
continue;
}
} else if (!use_native_low_priority_threads && task->low_priority) {
// A low prioriry task started waiting, so see if we can move a pending one to the high priority queue.
task_mutex.lock();
bool post = _try_promote_low_priority_task();
task_mutex.unlock();
if (post) {
task_available_semaphore.post();
}
}
}
OS::get_singleton()->delay_usec(1); // Microsleep, this could be converted to waiting for multiple objects in supported platforms for a bit more performance.
}
} else {
task->done_semaphore.wait();
}
}
task_mutex.lock();
if (is_low_prio_waiting_for_another) {
low_priority_tasks_awaiting_others--;
}
task->waiting--;
return OK;
}
if (task->waiting == 0) {
if (use_native_low_priority_threads && task->low_priority) {
task->low_priority_thread->wait_to_finish();
native_thread_allocator.free(task->low_priority_thread);
}
tasks.erase(p_task_id);
task_allocator.free(task);
ThreadData *caller_pool_thread = thread_ids.has(Thread::get_caller_id()) ? &threads[thread_ids[Thread::get_caller_id()]] : nullptr;
if (caller_pool_thread && p_task_id <= caller_pool_thread->current_task->self) {
// Deadlock prevention:
// When a pool thread wants to wait for an older task, the following situations can happen:
// 1. Awaited task is deep in the stack of the awaiter.
// 2. A group of awaiter threads end up depending on some tasks buried in the stack
// of their worker threads in such a way that progress can't be made.
// Both would entail a deadlock. Some may be handled here in the WorkerThreadPool
// with some extra logic and bookkeeping. However, there would still be unavoidable
// cases of deadlock because of the way waiting threads process outstanding tasks.
// Taking into account there's no feasible solution for every possible case
// with the current design, we just simply reject attempts to await on older tasks,
// with a specific error code that signals the situation so the caller can handle it.
task_mutex.unlock();
return ERR_BUSY;
}
if (caller_pool_thread) {
task->waiting_pool++;
} else {
task->waiting_user++;
}
task_mutex.unlock();
if (caller_pool_thread) {
while (true) {
Task *task_to_process = nullptr;
{
MutexLock lock(task_mutex);
bool was_signaled = caller_pool_thread->signaled;
caller_pool_thread->signaled = false;
if (task->completed) {
// This thread was awaken also for some reason, but it's about to exit.
// Let's find out what may be pending and forward the requests.
if (!exit_threads && was_signaled) {
uint32_t to_process = task_queue.first() ? 1 : 0;
uint32_t to_promote = caller_pool_thread->current_task->low_priority && low_priority_task_queue.first() ? 1 : 0;
if (to_process || to_promote) {
// This thread must be left alone since it won't loop again.
caller_pool_thread->signaled = true;
_notify_threads(caller_pool_thread, to_process, to_promote);
}
}
task->waiting_pool--;
if (task->waiting_pool == 0 && task->waiting_user == 0) {
tasks.erase(p_task_id);
task_allocator.free(task);
}
break;
}
if (!exit_threads) {
// This is a thread from the pool. It shouldn't just idle.
// Let's try to process other tasks while we wait.
if (caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) {
if (_try_promote_low_priority_task()) {
_notify_threads(caller_pool_thread, 1, 0);
}
}
if (singleton->task_queue.first()) {
task_to_process = task_queue.first()->self();
task_queue.remove(task_queue.first());
}
if (!task_to_process) {
caller_pool_thread->awaited_task = task;
if (flushing_cmd_queue) {
flushing_cmd_queue->unlock();
}
caller_pool_thread->cond_var.wait(lock);
if (flushing_cmd_queue) {
flushing_cmd_queue->lock();
}
DEV_ASSERT(exit_threads || caller_pool_thread->signaled || task->completed);
caller_pool_thread->awaited_task = nullptr;
}
}
}
if (task_to_process) {
_process_task(task_to_process);
}
}
} else {
task->done_semaphore.wait();
task_mutex.lock();
task->waiting_user--;
if (task->waiting_pool == 0 && task->waiting_user == 0) {
tasks.erase(p_task_id);
task_allocator.free(task);
}
task_mutex.unlock();
}
return OK;
}
@ -455,11 +504,8 @@ WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_ca
}
groups[id] = group;
task_mutex.unlock();
for (int i = 0; i < p_tasks; i++) {
_post_task(tasks_posted[i], p_high_priority);
}
_post_tasks_and_unlock(tasks_posted, p_tasks, p_high_priority);
return id;
}
@ -502,22 +548,17 @@ void WorkerThreadPool::wait_for_group_task_completion(GroupID p_group) {
if (!groupp) {
ERR_FAIL_MSG("Invalid Group ID");
}
Group *group = *groupp;
if (group->low_priority_native_tasks.size() > 0) {
for (Task *task : group->low_priority_native_tasks) {
task->low_priority_thread->wait_to_finish();
task_mutex.lock();
native_thread_allocator.free(task->low_priority_thread);
task_allocator.free(task);
task_mutex.unlock();
{
Group *group = *groupp;
if (flushing_cmd_queue) {
flushing_cmd_queue->unlock();
}
task_mutex.lock();
group_allocator.free(group);
task_mutex.unlock();
} else {
group->done_semaphore.wait();
if (flushing_cmd_queue) {
flushing_cmd_queue->lock();
}
uint32_t max_users = group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment.
uint32_t finished_users = group->finished.increment(); // fetch happens before inc, so increment later.
@ -540,19 +581,23 @@ int WorkerThreadPool::get_thread_index() {
return singleton->thread_ids.has(tid) ? singleton->thread_ids[tid] : -1;
}
void WorkerThreadPool::init(int p_thread_count, bool p_use_native_threads_low_priority, float p_low_priority_task_ratio) {
void WorkerThreadPool::thread_enter_command_queue_mt_flush(CommandQueueMT *p_queue) {
ERR_FAIL_COND(flushing_cmd_queue != nullptr);
flushing_cmd_queue = p_queue;
}
void WorkerThreadPool::thread_exit_command_queue_mt_flush() {
ERR_FAIL_NULL(flushing_cmd_queue);
flushing_cmd_queue = nullptr;
}
void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio) {
ERR_FAIL_COND(threads.size() > 0);
if (p_thread_count < 0) {
p_thread_count = OS::get_singleton()->get_default_thread_pool_size();
}
if (p_use_native_threads_low_priority) {
max_low_priority_threads = 0;
} else {
max_low_priority_threads = CLAMP(p_thread_count * p_low_priority_task_ratio, 1, p_thread_count - 1);
}
use_native_low_priority_threads = p_use_native_threads_low_priority;
max_low_priority_threads = CLAMP(p_thread_count * p_low_priority_task_ratio, 1, p_thread_count - 1);
threads.resize(p_thread_count);
@ -568,24 +613,33 @@ void WorkerThreadPool::finish() {
return;
}
task_mutex.lock();
SelfList<Task> *E = low_priority_task_queue.first();
while (E) {
print_error("Task waiting was never re-claimed: " + E->self()->description);
E = E->next();
}
task_mutex.unlock();
exit_threads = true;
for (uint32_t i = 0; i < threads.size(); i++) {
task_available_semaphore.post();
{
MutexLock lock(task_mutex);
SelfList<Task> *E = low_priority_task_queue.first();
while (E) {
print_error("Task waiting was never re-claimed: " + E->self()->description);
E = E->next();
}
}
{
MutexLock lock(task_mutex);
exit_threads = true;
}
for (ThreadData &data : threads) {
data.cond_var.notify_one();
}
for (ThreadData &data : threads) {
data.thread.wait_to_finish();
}
{
MutexLock lock(task_mutex);
for (KeyValue<TaskID, Task *> &E : tasks) {
task_allocator.free(E.value);
}
}
threads.clear();
}

View file

@ -31,6 +31,7 @@
#ifndef WORKER_THREAD_POOL_H
#define WORKER_THREAD_POOL_H
#include "core/os/condition_variable.h"
#include "core/os/memory.h"
#include "core/os/os.h"
#include "core/os/semaphore.h"
@ -40,6 +41,8 @@
#include "core/templates/rid.h"
#include "core/templates/safe_refcount.h"
class CommandQueueMT;
class WorkerThreadPool : public Object {
GDCLASS(WorkerThreadPool, Object)
public:
@ -60,7 +63,7 @@ private:
};
struct Group {
GroupID self;
GroupID self = -1;
SafeNumeric<uint32_t> index;
SafeNumeric<uint32_t> completed_index;
uint32_t max = 0;
@ -68,23 +71,23 @@ private:
SafeFlag completed;
SafeNumeric<uint32_t> finished;
uint32_t tasks_used = 0;
TightLocalVector<Task *> low_priority_native_tasks;
};
struct Task {
TaskID self = -1;
Callable callable;
void (*native_func)(void *) = nullptr;
void (*native_group_func)(void *, uint32_t) = nullptr;
void *native_func_userdata = nullptr;
String description;
Semaphore done_semaphore;
Semaphore done_semaphore; // For user threads awaiting.
bool completed = false;
Group *group = nullptr;
SelfList<Task> task_elem;
uint32_t waiting = 0;
uint32_t waiting_pool = 0;
uint32_t waiting_user = 0;
bool low_priority = false;
BaseTemplateUserdata *template_userdata = nullptr;
Thread *low_priority_thread = nullptr;
int pool_thread_index = -1;
void free_template_userdata();
@ -92,51 +95,65 @@ private:
task_elem(this) {}
};
PagedAllocator<Task> task_allocator;
PagedAllocator<Group> group_allocator;
PagedAllocator<Thread> native_thread_allocator;
static const uint32_t TASKS_PAGE_SIZE = 1024;
static const uint32_t GROUPS_PAGE_SIZE = 256;
PagedAllocator<Task, false, TASKS_PAGE_SIZE> task_allocator;
PagedAllocator<Group, false, GROUPS_PAGE_SIZE> group_allocator;
SelfList<Task>::List low_priority_task_queue;
SelfList<Task>::List task_queue;
Mutex task_mutex;
Semaphore task_available_semaphore;
BinaryMutex task_mutex;
struct ThreadData {
uint32_t index;
uint32_t index = 0;
Thread thread;
Task *current_low_prio_task = nullptr;
bool ready_for_scripting = false;
bool signaled = false;
Task *current_task = nullptr;
Task *awaited_task = nullptr; // Null if not awaiting the condition variable. Special value for idle-waiting.
ConditionVariable cond_var;
};
TightLocalVector<ThreadData> threads;
bool exit_threads = false;
HashMap<Thread::ID, int> thread_ids;
HashMap<TaskID, Task *> tasks;
HashMap<GroupID, Group *> groups;
HashMap<
TaskID,
Task *,
HashMapHasherDefault,
HashMapComparatorDefault<TaskID>,
PagedAllocator<HashMapElement<TaskID, Task *>, false, TASKS_PAGE_SIZE>>
tasks;
HashMap<
GroupID,
Group *,
HashMapHasherDefault,
HashMapComparatorDefault<GroupID>,
PagedAllocator<HashMapElement<GroupID, Group *>, false, GROUPS_PAGE_SIZE>>
groups;
bool use_native_low_priority_threads = false;
uint32_t max_low_priority_threads = 0;
uint32_t low_priority_threads_used = 0;
uint32_t low_priority_tasks_running = 0;
uint32_t low_priority_tasks_awaiting_others = 0;
uint32_t notify_index = 0; // For rotating across threads, no help distributing load.
uint64_t last_task = 1;
static void _thread_function(void *p_user);
static void _native_low_priority_thread_function(void *p_user);
void _process_task_queue();
void _process_task(Task *task);
void _post_task(Task *p_task, bool p_high_priority);
void _post_tasks_and_unlock(Task **p_tasks, uint32_t p_count, bool p_high_priority);
void _notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count);
bool _try_promote_low_priority_task();
void _prevent_low_prio_saturation_deadlock();
static WorkerThreadPool *singleton;
static thread_local CommandQueueMT *flushing_cmd_queue;
TaskID _add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description);
GroupID _add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description);
@ -199,7 +216,10 @@ public:
static WorkerThreadPool *get_singleton() { return singleton; }
static int get_thread_index();
void init(int p_thread_count = -1, bool p_use_native_threads_low_priority = true, float p_low_priority_task_ratio = 0.3);
static void thread_enter_command_queue_mt_flush(CommandQueueMT *p_queue);
static void thread_exit_command_queue_mt_flush();
void init(int p_thread_count = -1, float p_low_priority_task_ratio = 0.3);
void finish();
WorkerThreadPool();
~WorkerThreadPool();

View file

@ -31,6 +31,8 @@
#ifndef CONDITION_VARIABLE_H
#define CONDITION_VARIABLE_H
#include "core/os/mutex.h"
#ifdef MINGW_ENABLED
#define MINGW_STDTHREAD_REDUNDANCY_WARNING
#include "thirdparty/mingw-std-threads/mingw.condition_variable.h"

View file

@ -58,10 +58,12 @@ private:
#endif
public:
_ALWAYS_INLINE_ void post() const {
_ALWAYS_INLINE_ void post(uint32_t p_count = 1) const {
std::lock_guard lock(mutex);
count++;
condition.notify_one();
count += p_count;
for (uint32_t i = 0; i < p_count; ++i) {
condition.notify_one();
}
}
_ALWAYS_INLINE_ void wait() const {

View file

@ -307,7 +307,6 @@ void register_core_settings() {
GLOBAL_DEF(PropertyInfo(Variant::STRING, "network/tls/certificate_bundle_override", PROPERTY_HINT_FILE, "*.crt"), "");
GLOBAL_DEF("threading/worker_pool/max_threads", -1);
GLOBAL_DEF("threading/worker_pool/use_system_threads_for_low_priority_tasks", true);
GLOBAL_DEF("threading/worker_pool/low_priority_thread_ratio", 0.3);
}

View file

@ -31,6 +31,7 @@
#ifndef COMMAND_QUEUE_MT_H
#define COMMAND_QUEUE_MT_H
#include "core/object/worker_thread_pool.h"
#include "core/os/memory.h"
#include "core/os/mutex.h"
#include "core/os/semaphore.h"
@ -306,15 +307,15 @@ class CommandQueueMT {
struct CommandBase {
virtual void call() = 0;
virtual void post() {}
virtual ~CommandBase() {}
virtual SyncSemaphore *get_sync_semaphore() { return nullptr; }
virtual ~CommandBase() = default; // Won't be called.
};
struct SyncCommand : public CommandBase {
SyncSemaphore *sync_sem = nullptr;
virtual void post() override {
sync_sem->sem.post();
virtual SyncSemaphore *get_sync_semaphore() override {
return sync_sem;
}
};
@ -340,6 +341,7 @@ class CommandQueueMT {
SyncSemaphore sync_sems[SYNC_SEMAPHORES];
Mutex mutex;
Semaphore *sync = nullptr;
uint64_t flush_read_ptr = 0;
template <class T>
T *allocate() {
@ -362,31 +364,41 @@ class CommandQueueMT {
void _flush() {
lock();
uint64_t read_ptr = 0;
uint64_t limit = command_mem.size();
WorkerThreadPool::thread_enter_command_queue_mt_flush(this);
while (flush_read_ptr < command_mem.size()) {
uint64_t size = *(uint64_t *)&command_mem[flush_read_ptr];
flush_read_ptr += 8;
CommandBase *cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
while (read_ptr < limit) {
uint64_t size = *(uint64_t *)&command_mem[read_ptr];
read_ptr += 8;
CommandBase *cmd = reinterpret_cast<CommandBase *>(&command_mem[read_ptr]);
SyncSemaphore *sync_sem = cmd->get_sync_semaphore();
cmd->call();
if (sync_sem) {
sync_sem->sem.post(); // Release in case it needs sync/ret.
}
cmd->call(); //execute the function
cmd->post(); //release in case it needs sync/ret
cmd->~CommandBase(); //should be done, so erase the command
if (unlikely(flush_read_ptr == 0)) {
// A reentrant call flushed.
DEV_ASSERT(command_mem.is_empty());
unlock();
return;
}
read_ptr += size;
flush_read_ptr += size;
}
WorkerThreadPool::thread_exit_command_queue_mt_flush();
command_mem.clear();
flush_read_ptr = 0;
unlock();
}
void lock();
void unlock();
void wait_for_flush();
SyncSemaphore *_alloc_sync_sem();
public:
void lock();
void unlock();
/* NORMAL PUSH COMMANDS */
DECL_PUSH(0)
SPACE_SEP_LIST(DECL_PUSH, 15)

View file

@ -40,7 +40,7 @@
#include <type_traits>
#include <typeinfo>
template <class T, bool thread_safe = false>
template <class T, bool thread_safe = false, uint32_t DEFAULT_PAGE_SIZE = 4096>
class PagedAllocator {
T **page_pool = nullptr;
T ***available_pool = nullptr;
@ -53,10 +53,6 @@ class PagedAllocator {
SpinLock spin_lock;
public:
enum {
DEFAULT_PAGE_SIZE = 4096
};
template <class... Args>
T *alloc(Args &&...p_args) {
if (thread_safe) {

View file

@ -2768,8 +2768,6 @@
<member name="threading/worker_pool/max_threads" type="int" setter="" getter="" default="-1">
Maximum number of threads to be used by [WorkerThreadPool]. Value of [code]-1[/code] means no limit.
</member>
<member name="threading/worker_pool/use_system_threads_for_low_priority_tasks" type="bool" setter="" getter="" default="true">
</member>
<member name="xr/openxr/default_action_map" type="String" setter="" getter="" default="&quot;res://openxr_action_map.tres&quot;">
Action map configuration to load by default.
</member>

View file

@ -106,7 +106,7 @@
Pauses the thread that calls this method until the task with the given ID is completed.
Returns [constant @GlobalScope.OK] if the task could be successfully awaited.
Returns [constant @GlobalScope.ERR_INVALID_PARAMETER] if a task with the passed ID does not exist (maybe because it was already awaited and disposed of).
Returns [constant @GlobalScope.ERR_BUSY] if the call is made from another running task and, due to task scheduling, the task to await is at a lower level in the call stack and therefore can't progress. This is an advanced situation that should only matter when some tasks depend on others.
Returns [constant @GlobalScope.ERR_BUSY] if the call is made from another running task and, due to task scheduling, there's potential for deadlocking (e.g., the task to await may be at a lower level in the call stack and therefore can't progress). This is an advanced situation that should only matter when some tasks depend on others (in the current implementation, the tricky case is a task trying to wait on an older one).
</description>
</method>
</methods>

View file

@ -1615,16 +1615,12 @@ Error Main::setup(const char *execpath, int argc, char *argv[], bool p_second_ph
}
// Initialize WorkerThreadPool.
{
if (editor || project_manager) {
WorkerThreadPool::get_singleton()->init(-1, 0.75);
} else {
int worker_threads = GLOBAL_GET("threading/worker_pool/max_threads");
bool low_priority_use_system_threads = GLOBAL_GET("threading/worker_pool/use_system_threads_for_low_priority_tasks");
float low_property_ratio = GLOBAL_GET("threading/worker_pool/low_priority_thread_ratio");
if (editor || project_manager) {
WorkerThreadPool::get_singleton()->init();
} else {
WorkerThreadPool::get_singleton()->init(worker_threads, low_priority_use_system_threads, low_property_ratio);
}
float low_priority_ratio = GLOBAL_GET("threading/worker_pool/low_priority_thread_ratio");
WorkerThreadPool::get_singleton()->init(worker_threads, low_priority_ratio);
}
#ifdef TOOLS_ENABLED