Cycles: use TBB for task pools and task scheduler

No significant performance improvement is expected, but it means we have a
single thread pool throughout Blender. And it should make adding more
parallellization in the future easier.

After previous refactoring commits this is basically a drop-in replacement.
One difference is that the task pool had a mechanism for scheduling tasks to
the front of the queue to minimize memory usage. TBB has a smarter algorithm
to balance depth-first and breadth-first scheduling of tasks and we assume that
removes the need to manually provide hints to the scheduler.

Fixes T77533
This commit is contained in:
Brecht Van Lommel 2020-06-05 16:39:57 +02:00
parent 54e3487c9e
commit e50f1ddc65
6 changed files with 49 additions and 396 deletions

@ -626,8 +626,8 @@ BVHNode *BVHBuild::build_node(const BVHObjectBinning &range, int level)
/* Threaded build */
inner = new InnerNode(bounds);
task_pool.push([=] { thread_build_node(inner, 0, left, level + 1); }, true);
task_pool.push([=] { thread_build_node(inner, 1, right, level + 1); }, true);
task_pool.push([=] { thread_build_node(inner, 0, left, level + 1); });
task_pool.push([=] { thread_build_node(inner, 1, right, level + 1); });
}
if (do_unalinged_split) {
@ -742,16 +742,12 @@ BVHNode *BVHBuild::build_node(const BVHRange &range,
/* Create tasks for left and right nodes, using copy for most arguments and
* move for reference to avoid memory copies. */
task_pool.push(
[=, refs = std::move(left_references)]() mutable {
thread_build_spatial_split_node(inner, 0, left, refs, level + 1);
},
true);
task_pool.push(
[=, refs = std::move(right_references)]() mutable {
thread_build_spatial_split_node(inner, 1, right, refs, level + 1);
},
true);
task_pool.push([=, refs = std::move(left_references)]() mutable {
thread_build_spatial_split_node(inner, 0, left, refs, level + 1);
});
task_pool.push([=, refs = std::move(right_references)]() mutable {
thread_build_spatial_split_node(inner, 1, right, refs, level + 1);
});
}
if (do_unalinged_split) {

@ -147,7 +147,7 @@ static void bvh_reference_sort_threaded(TaskPool *task_pool,
if (left < end) {
if (start < right) {
task_pool->push(
function_bind(bvh_reference_sort_threaded, task_pool, data, left, end, compare), true);
function_bind(bvh_reference_sort_threaded, task_pool, data, left, end, compare));
}
else {
start = left;

@ -77,7 +77,7 @@ std::ostream &operator<<(std::ostream &os, const DeviceRequestedFeatures &reques
/* Device */
Device::~Device()
Device::~Device() noexcept(false)
{
if (!background) {
if (vertex_buffer != 0) {

@ -319,7 +319,8 @@ class Device {
virtual void mem_free_sub_ptr(device_ptr /*ptr*/){};
public:
virtual ~Device();
/* noexcept needed to silence TBB warning. */
virtual ~Device() noexcept(false);
/* info */
DeviceInfo info;

@ -20,28 +20,12 @@
#include "util/util_system.h"
#include "util/util_time.h"
//#define THREADING_DEBUG_ENABLED
#ifdef THREADING_DEBUG_ENABLED
# include <stdio.h>
# define THREADING_DEBUG(...) \
do { \
printf(__VA_ARGS__); \
fflush(stdout); \
} while (0)
#else
# define THREADING_DEBUG(...)
#endif
CCL_NAMESPACE_BEGIN
/* Task Pool */
TaskPool::TaskPool()
TaskPool::TaskPool() : start_time(time_dt()), num_tasks_handled(0)
{
num_tasks_handled = 0;
num = 0;
do_cancel = false;
}
TaskPool::~TaskPool()
@ -49,66 +33,15 @@ TaskPool::~TaskPool()
cancel();
}
void TaskPool::push(TaskRunFunction &&task, bool front)
void TaskPool::push(TaskRunFunction &&task)
{
TaskScheduler::Entry entry;
entry.task = new TaskRunFunction(std::move(task));
entry.pool = this;
TaskScheduler::push(entry, front);
tbb_group.run(std::move(task));
num_tasks_handled++;
}
void TaskPool::wait_work(Summary *stats)
{
thread_scoped_lock num_lock(num_mutex);
while (num != 0) {
num_lock.unlock();
thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
/* find task from this pool. if we get a task from another pool,
* we can get into deadlock */
TaskScheduler::Entry work_entry;
bool found_entry = false;
list<TaskScheduler::Entry>::iterator it;
for (it = TaskScheduler::queue.begin(); it != TaskScheduler::queue.end(); it++) {
TaskScheduler::Entry &entry = *it;
if (entry.pool == this) {
work_entry = entry;
found_entry = true;
TaskScheduler::queue.erase(it);
break;
}
}
queue_lock.unlock();
/* if found task, do it, otherwise wait until other tasks are done */
if (found_entry) {
/* run task */
(*work_entry.task)();
/* delete task */
delete work_entry.task;
/* notify pool task was done */
num_decrease(1);
}
num_lock.lock();
if (num == 0)
break;
if (!found_entry) {
THREADING_DEBUG("num==%d, Waiting for condition in TaskPool::wait_work !found_entry\n", num);
num_cond.wait(num_lock);
THREADING_DEBUG("num==%d, condition wait done in TaskPool::wait_work !found_entry\n", num);
}
}
tbb_group.wait();
if (stats != NULL) {
stats->time_total = time_dt() - start_time;
@ -118,180 +51,21 @@ void TaskPool::wait_work(Summary *stats)
void TaskPool::cancel()
{
do_cancel = true;
TaskScheduler::clear(this);
{
thread_scoped_lock num_lock(num_mutex);
while (num) {
THREADING_DEBUG("num==%d, Waiting for condition in TaskPool::cancel\n", num);
num_cond.wait(num_lock);
THREADING_DEBUG("num==%d condition wait done in TaskPool::cancel\n", num);
}
}
do_cancel = false;
tbb_group.cancel();
tbb_group.wait();
}
bool TaskPool::canceled()
{
return do_cancel;
}
void TaskPool::num_decrease(int done)
{
num_mutex.lock();
num -= done;
assert(num >= 0);
if (num == 0) {
THREADING_DEBUG("num==%d, notifying all in TaskPool::num_decrease\n", num);
num_cond.notify_all();
}
num_mutex.unlock();
}
void TaskPool::num_increase()
{
thread_scoped_lock num_lock(num_mutex);
if (num_tasks_handled == 0) {
start_time = time_dt();
}
num++;
num_tasks_handled++;
THREADING_DEBUG("num==%d, notifying all in TaskPool::num_increase\n", num);
num_cond.notify_all();
return tbb_group.is_canceling();
}
/* Task Scheduler */
thread_mutex TaskScheduler::mutex;
int TaskScheduler::users = 0;
vector<thread *> TaskScheduler::threads;
bool TaskScheduler::do_exit = false;
list<TaskScheduler::Entry> TaskScheduler::queue;
thread_mutex TaskScheduler::queue_mutex;
thread_condition_variable TaskScheduler::queue_cond;
namespace {
/* Get number of processors on each of the available nodes. The result is sized
* by the highest node index, and element corresponds to number of processors on
* that node.
* If node is not available, then the corresponding number of processors is
* zero. */
void get_per_node_num_processors(vector<int> *num_per_node_processors)
{
const int num_nodes = system_cpu_num_numa_nodes();
if (num_nodes == 0) {
LOG(ERROR) << "Zero available NUMA nodes, is not supposed to happen.";
return;
}
num_per_node_processors->resize(num_nodes);
for (int node = 0; node < num_nodes; ++node) {
if (!system_cpu_is_numa_node_available(node)) {
(*num_per_node_processors)[node] = 0;
continue;
}
(*num_per_node_processors)[node] = system_cpu_num_numa_node_processors(node);
}
}
/* Calculate total number of processors on all available nodes.
* This is similar to system_cpu_thread_count(), but uses pre-calculated number
* of processors on each of the node, avoiding extra system calls and checks for
* the node availability. */
int get_num_total_processors(const vector<int> &num_per_node_processors)
{
int num_total_processors = 0;
foreach (int num_node_processors, num_per_node_processors) {
num_total_processors += num_node_processors;
}
return num_total_processors;
}
/* Compute NUMA node for every thread to run on, for the best performance. */
vector<int> distribute_threads_on_nodes(const int num_threads)
{
/* Start with all threads unassigned to any specific NUMA node. */
vector<int> thread_nodes(num_threads, -1);
const int num_active_group_processors = system_cpu_num_active_group_processors();
VLOG(1) << "Detected " << num_active_group_processors << " processors "
<< "in active group.";
if (num_active_group_processors >= num_threads) {
/* If the current thread is set up in a way that its affinity allows to
* use at least requested number of threads we do not explicitly set
* affinity to the worker threads.
* This way we allow users to manually edit affinity of the parent
* thread, and here we follow that affinity. This way it's possible to
* have two Cycles/Blender instances running manually set to a different
* dies on a CPU. */
VLOG(1) << "Not setting thread group affinity.";
return thread_nodes;
}
vector<int> num_per_node_processors;
get_per_node_num_processors(&num_per_node_processors);
if (num_per_node_processors.size() == 0) {
/* Error was already reported, here we can't do anything, so we simply
* leave default affinity to all the worker threads. */
return thread_nodes;
}
const int num_nodes = num_per_node_processors.size();
int thread_index = 0;
/* First pass: fill in all the nodes to their maximum.
*
* If there is less threads than the overall nodes capacity, some of the
* nodes or parts of them will idle.
*
* TODO(sergey): Consider picking up fastest nodes if number of threads
* fits on them. For example, on Threadripper2 we might consider using nodes
* 0 and 2 if user requested 32 render threads. */
const int num_total_node_processors = get_num_total_processors(num_per_node_processors);
int current_node_index = 0;
while (thread_index < num_total_node_processors && thread_index < num_threads) {
const int num_node_processors = num_per_node_processors[current_node_index];
for (int processor_index = 0; processor_index < num_node_processors; ++processor_index) {
VLOG(1) << "Scheduling thread " << thread_index << " to node " << current_node_index << ".";
thread_nodes[thread_index] = current_node_index;
++thread_index;
if (thread_index == num_threads) {
/* All threads are scheduled on their nodes. */
return thread_nodes;
}
}
++current_node_index;
}
/* Second pass: keep scheduling threads to each node one by one,
* uniformly filling them in.
* This is where things becomes tricky to predict for the maximum
* performance: on the one hand this avoids too much threading overhead on
* few nodes, but for the final performance having all the overhead on one
* node might be better idea (since other nodes will have better chance of
* rendering faster).
* But more tricky is that nodes might have difference capacity, so we might
* want to do some weighted scheduling. For example, if node 0 has 16
* processors and node 1 has 32 processors, we'd better schedule 1 extra
* thread on node 0 and 2 extra threads on node 1. */
current_node_index = 0;
while (thread_index < num_threads) {
/* Skip unavailable nodes. */
/* TODO(sergey): Add sanity check against deadlock. */
while (num_per_node_processors[current_node_index] == 0) {
current_node_index = (current_node_index + 1) % num_nodes;
}
VLOG(1) << "Scheduling thread " << thread_index << " to node " << current_node_index << ".";
++thread_index;
current_node_index = (current_node_index + 1) % num_nodes;
}
return thread_nodes;
}
} // namespace
int TaskScheduler::active_num_threads = 0;
tbb::global_control *TaskScheduler::global_control = nullptr;
void TaskScheduler::init(int num_threads)
{
@ -302,22 +76,15 @@ void TaskScheduler::init(int num_threads)
if (users != 1) {
return;
}
do_exit = false;
const bool use_auto_threads = (num_threads == 0);
if (use_auto_threads) {
if (num_threads > 0) {
/* Automatic number of threads. */
num_threads = system_cpu_thread_count();
VLOG(1) << "Overriding number of TBB threads to " << num_threads << ".";
global_control = new tbb::global_control(tbb::global_control::max_allowed_parallelism,
num_threads);
active_num_threads = num_threads;
}
VLOG(1) << "Creating pool of " << num_threads << " threads.";
/* Compute distribution on NUMA nodes. */
vector<int> thread_nodes = distribute_threads_on_nodes(num_threads);
/* Launch threads that will be waiting for work. */
threads.resize(num_threads);
for (int thread_index = 0; thread_index < num_threads; ++thread_index) {
threads[thread_index] = new thread(function_bind(&TaskScheduler::thread_run),
thread_nodes[thread_index]);
else {
active_num_threads = system_cpu_thread_count();
}
}
@ -326,105 +93,20 @@ void TaskScheduler::exit()
thread_scoped_lock lock(mutex);
users--;
if (users == 0) {
VLOG(1) << "De-initializing thread pool of task scheduler.";
/* stop all waiting threads */
TaskScheduler::queue_mutex.lock();
do_exit = true;
TaskScheduler::queue_cond.notify_all();
TaskScheduler::queue_mutex.unlock();
/* delete threads */
foreach (thread *t, threads) {
t->join();
delete t;
}
threads.clear();
delete global_control;
global_control = nullptr;
active_num_threads = 0;
}
}
void TaskScheduler::free_memory()
{
assert(users == 0);
threads.free_memory();
}
bool TaskScheduler::thread_wait_pop(Entry &entry)
int TaskScheduler::num_threads()
{
thread_scoped_lock queue_lock(queue_mutex);
while (queue.empty() && !do_exit)
queue_cond.wait(queue_lock);
if (queue.empty()) {
assert(do_exit);
return false;
}
entry = queue.front();
queue.pop_front();
return true;
}
void TaskScheduler::thread_run()
{
Entry entry;
/* todo: test affinity/denormal mask */
/* keep popping off tasks */
while (thread_wait_pop(entry)) {
/* run task */
(*entry.task)();
/* delete task */
delete entry.task;
/* notify pool task was done */
entry.pool->num_decrease(1);
}
}
void TaskScheduler::push(Entry &entry, bool front)
{
entry.pool->num_increase();
/* add entry to queue */
TaskScheduler::queue_mutex.lock();
if (front)
TaskScheduler::queue.push_front(entry);
else
TaskScheduler::queue.push_back(entry);
TaskScheduler::queue_cond.notify_one();
TaskScheduler::queue_mutex.unlock();
}
void TaskScheduler::clear(TaskPool *pool)
{
thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
/* erase all tasks from this pool from the queue */
list<Entry>::iterator it = queue.begin();
int done = 0;
while (it != queue.end()) {
Entry &entry = *it;
if (entry.pool == pool) {
done++;
delete entry.task;
it = queue.erase(it);
}
else
it++;
}
queue_lock.unlock();
/* notify done */
pool->num_decrease(done);
return active_num_threads;
}
/* Dedicated Task Pool */

@ -25,6 +25,10 @@
#define TBB_SUPPRESS_DEPRECATED_MESSAGES 1
#include <tbb/tbb.h>
#if TBB_INTERFACE_VERSION_MAJOR >= 10
# define WITH_TBB_GLOBAL_CONTROL
#endif
CCL_NAMESPACE_BEGIN
using tbb::blocked_range;
@ -62,24 +66,15 @@ class TaskPool {
TaskPool();
~TaskPool();
void push(TaskRunFunction &&task, bool front = false);
void push(TaskRunFunction &&task);
void wait_work(Summary *stats = NULL); /* work and wait until all tasks are done */
void cancel(); /* cancel all tasks, keep worker threads running */
void cancel(); /* cancel all tasks and wait until they are no longer executing */
bool canceled(); /* for worker threads, test if canceled */
protected:
friend class TaskScheduler;
void num_decrease(int done);
void num_increase();
thread_mutex num_mutex;
thread_condition_variable num_cond;
int num;
bool do_cancel;
tbb::task_group tbb_group;
/* ** Statistics ** */
@ -101,40 +96,19 @@ class TaskScheduler {
static void exit();
static void free_memory();
/* number of threads that can work on task */
static int num_threads()
{
return threads.size();
}
/* test if any session is using the scheduler */
static bool active()
{
return users != 0;
}
/* Approximate number of threads that will work on task, which may be lower
* or higher than the actual number of threads. Use as little as possible and
* leave splitting up tasks to the scheduler.. */
static int num_threads();
protected:
friend class TaskPool;
struct Entry {
TaskRunFunction *task;
TaskPool *pool;
};
static thread_mutex mutex;
static int users;
static vector<thread *> threads;
static bool do_exit;
static int active_num_threads;
static list<Entry> queue;
static thread_mutex queue_mutex;
static thread_condition_variable queue_cond;
static void thread_run();
static bool thread_wait_pop(Entry &entry);
static void push(Entry &entry, bool front);
static void clear(TaskPool *pool);
#ifdef WITH_TBB_GLOBAL_CONTROL
static tbb::global_control *global_control;
#endif
};
/* Dedicated Task Pool