blender/intern/cycles/util/util_task.cpp
Jeroen Bakker 2f6257fd7f Cycles/OpenCL: Compile Kernels During Scene Update
The main goals of this change is faster starting when using foreground
rendering.

This patch will build kernels in parallel to the update process of
the scene. When these optimized kernels are not available (yet) an AO
kernel will be used.

These AO kernels are fast to compile (3-7 seconds) and can be
reused by all scenes. When the final kernels become available we
will switch to these kernels.

In background mode the AO kernels will not be used.
Some kernels are being used during Scene update (displace, background
light). When these kernels are being used the process can halt until
these become available.

Reviewed By: brecht, #cycles

Maniphest Tasks: T61752

Differential Revision: https://developer.blender.org/D4428
2019-03-15 16:18:21 +01:00

610 lines
14 KiB
C++

/*
* Copyright 2011-2013 Blender Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "util/util_foreach.h"
#include "util/util_logging.h"
#include "util/util_system.h"
#include "util/util_task.h"
#include "util/util_time.h"
//#define THREADING_DEBUG_ENABLED
#ifdef THREADING_DEBUG_ENABLED
#include <stdio.h>
#define THREADING_DEBUG(...) do { printf(__VA_ARGS__); fflush(stdout); } while(0)
#else
#define THREADING_DEBUG(...)
#endif
CCL_NAMESPACE_BEGIN
/* Task Pool */
TaskPool::TaskPool()
{
num_tasks_handled = 0;
num = 0;
do_cancel = false;
}
TaskPool::~TaskPool()
{
stop();
}
void TaskPool::push(Task *task, bool front)
{
TaskScheduler::Entry entry;
entry.task = task;
entry.pool = this;
TaskScheduler::push(entry, front);
}
void TaskPool::push(const TaskRunFunction& run, bool front)
{
push(new Task(run), front);
}
void TaskPool::wait_work(Summary *stats)
{
thread_scoped_lock num_lock(num_mutex);
while(num != 0) {
num_lock.unlock();
thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
/* find task from this pool. if we get a task from another pool,
* we can get into deadlock */
TaskScheduler::Entry work_entry;
bool found_entry = false;
list<TaskScheduler::Entry>::iterator it;
for(it = TaskScheduler::queue.begin(); it != TaskScheduler::queue.end(); it++) {
TaskScheduler::Entry& entry = *it;
if(entry.pool == this) {
work_entry = entry;
found_entry = true;
TaskScheduler::queue.erase(it);
break;
}
}
queue_lock.unlock();
/* if found task, do it, otherwise wait until other tasks are done */
if(found_entry) {
/* run task */
work_entry.task->run(0);
/* delete task */
delete work_entry.task;
/* notify pool task was done */
num_decrease(1);
}
num_lock.lock();
if(num == 0)
break;
if(!found_entry) {
THREADING_DEBUG("num==%d, Waiting for condition in TaskPool::wait_work !found_entry\n", num);
num_cond.wait(num_lock);
THREADING_DEBUG("num==%d, condition wait done in TaskPool::wait_work !found_entry\n", num);
}
}
if(stats != NULL) {
stats->time_total = time_dt() - start_time;
stats->num_tasks_handled = num_tasks_handled;
}
}
void TaskPool::cancel()
{
do_cancel = true;
TaskScheduler::clear(this);
{
thread_scoped_lock num_lock(num_mutex);
while(num) {
THREADING_DEBUG("num==%d, Waiting for condition in TaskPool::cancel\n", num);
num_cond.wait(num_lock);
THREADING_DEBUG("num==%d condition wait done in TaskPool::cancel\n", num);
}
}
do_cancel = false;
}
void TaskPool::stop()
{
TaskScheduler::clear(this);
assert(num == 0);
}
bool TaskPool::canceled()
{
return do_cancel;
}
bool TaskPool::finished()
{
thread_scoped_lock num_lock(num_mutex);
return num == 0;
}
void TaskPool::num_decrease(int done)
{
num_mutex.lock();
num -= done;
assert(num >= 0);
if(num == 0) {
THREADING_DEBUG("num==%d, notifying all in TaskPool::num_decrease\n", num);
num_cond.notify_all();
}
num_mutex.unlock();
}
void TaskPool::num_increase()
{
thread_scoped_lock num_lock(num_mutex);
if(num_tasks_handled == 0) {
start_time = time_dt();
}
num++;
num_tasks_handled++;
THREADING_DEBUG("num==%d, notifying all in TaskPool::num_increase\n", num);
num_cond.notify_all();
}
/* Task Scheduler */
thread_mutex TaskScheduler::mutex;
int TaskScheduler::users = 0;
vector<thread*> TaskScheduler::threads;
bool TaskScheduler::do_exit = false;
list<TaskScheduler::Entry> TaskScheduler::queue;
thread_mutex TaskScheduler::queue_mutex;
thread_condition_variable TaskScheduler::queue_cond;
namespace {
/* Get number of processors on each of the available nodes. The result is sized
* by the highest node index, and element corresponds to number of processors on
* that node.
* If node is not available, then the corresponding number of processors is
* zero. */
void get_per_node_num_processors(vector<int>* num_per_node_processors)
{
const int num_nodes = system_cpu_num_numa_nodes();
if(num_nodes == 0) {
LOG(ERROR) << "Zero available NUMA nodes, is not supposed to happen.";
return;
}
num_per_node_processors->resize(num_nodes);
for(int node = 0; node < num_nodes; ++node) {
if(!system_cpu_is_numa_node_available(node)) {
(*num_per_node_processors)[node] = 0;
continue;
}
(*num_per_node_processors)[node] =
system_cpu_num_numa_node_processors(node);
}
}
/* Calculate total number of processors on all available nodes.
* This is similar to system_cpu_thread_count(), but uses pre-calculated number
* of processors on each of the node, avoiding extra system calls and checks for
* the node availability. */
int get_num_total_processors(const vector<int>& num_per_node_processors)
{
int num_total_processors = 0;
foreach(int num_node_processors, num_per_node_processors) {
num_total_processors += num_node_processors;
}
return num_total_processors;
}
/* Compute NUMA node for every thread to run on, for the best performance. */
vector<int> distribute_threads_on_nodes(const int num_threads)
{
/* Start with all threads unassigned to any specific NUMA node. */
vector<int> thread_nodes(num_threads, -1);
const int num_active_group_processors =
system_cpu_num_active_group_processors();
VLOG(1) << "Detected " << num_active_group_processors << " processors "
<< "in active group.";
if(num_active_group_processors >= num_threads) {
/* If the current thread is set up in a way that its affinity allows to
* use at least requested number of threads we do not explicitly set
* affinity to the worker therads.
* This way we allow users to manually edit affinity of the parent
* thread, and here we follow that affinity. This way it's possible to
* have two Cycles/Blender instances running manually set to a different
* dies on a CPU. */
VLOG(1) << "Not setting thread group affinity.";
return thread_nodes;
}
vector<int> num_per_node_processors;
get_per_node_num_processors(&num_per_node_processors);
if(num_per_node_processors.size() == 0) {
/* Error was already repported, here we can't do anything, so we simply
* leave default affinity to all the worker threads. */
return thread_nodes;
}
const int num_nodes = num_per_node_processors.size();
int thread_index = 0;
/* First pass: fill in all the nodes to their maximum.
*
* If there is less threads than the overall nodes capacity, some of the
* nodes or parts of them will idle.
*
* TODO(sergey): Consider picking up fastest nodes if number of threads
* fits on them. For example, on Threadripper2 we might consider using nodes
* 0 and 2 if user requested 32 render threads. */
const int num_total_node_processors =
get_num_total_processors(num_per_node_processors);
int current_node_index = 0;
while(thread_index < num_total_node_processors &&
thread_index < num_threads) {
const int num_node_processors =
num_per_node_processors[current_node_index];
for(int processor_index = 0;
processor_index < num_node_processors;
++processor_index)
{
VLOG(1) << "Scheduling thread " << thread_index << " to node "
<< current_node_index << ".";
thread_nodes[thread_index] = current_node_index;
++thread_index;
if(thread_index == num_threads) {
/* All threads are scheduled on their nodes. */
return thread_nodes;
}
}
++current_node_index;
}
/* Second pass: keep scheduling threads to each node one by one, uniformly
* fillign them in.
* This is where things becomes tricky to predict for the maximum
* performance: on the one hand this avoids too much threading overhead on
* few nodes, but for the final performance having all the overhead on one
* node might be better idea (since other nodes will have better chance of
* rendering faster).
* But more tricky is that nodes might have difference capacity, so we might
* want to do some weighted scheduling. For example, if node 0 has 16
* processors and node 1 has 32 processors, we'd better schedule 1 extra
* thread on node 0 and 2 extra threads on node 1. */
current_node_index = 0;
while(thread_index < num_threads) {
/* Skip unavailable nodes. */
/* TODO(sergey): Add sanity check against deadlock. */
while(num_per_node_processors[current_node_index] == 0) {
current_node_index = (current_node_index + 1) % num_nodes;
}
VLOG(1) << "Scheduling thread " << thread_index << " to node "
<< current_node_index << ".";
++thread_index;
current_node_index = (current_node_index + 1) % num_nodes;
}
return thread_nodes;
}
} // namespace
void TaskScheduler::init(int num_threads)
{
thread_scoped_lock lock(mutex);
/* Multiple cycles instances can use this task scheduler, sharing the same
* threads, so we keep track of the number of users. */
++users;
if(users != 1) {
return;
}
do_exit = false;
const bool use_auto_threads = (num_threads == 0);
if(use_auto_threads) {
/* Automatic number of threads. */
num_threads = system_cpu_thread_count();
}
VLOG(1) << "Creating pool of " << num_threads << " threads.";
/* Compute distribution on NUMA nodes. */
vector<int> thread_nodes = distribute_threads_on_nodes(num_threads);
/* Launch threads that will be waiting for work. */
threads.resize(num_threads);
for(int thread_index = 0; thread_index < num_threads; ++thread_index) {
threads[thread_index] = new thread(
function_bind(&TaskScheduler::thread_run, thread_index + 1),
thread_nodes[thread_index]);
}
}
void TaskScheduler::exit()
{
thread_scoped_lock lock(mutex);
users--;
if(users == 0) {
VLOG(1) << "De-initializing thread pool of task scheduler.";
/* stop all waiting threads */
TaskScheduler::queue_mutex.lock();
do_exit = true;
TaskScheduler::queue_cond.notify_all();
TaskScheduler::queue_mutex.unlock();
/* delete threads */
foreach(thread *t, threads) {
t->join();
delete t;
}
threads.clear();
}
}
void TaskScheduler::free_memory()
{
assert(users == 0);
threads.free_memory();
}
bool TaskScheduler::thread_wait_pop(Entry& entry)
{
thread_scoped_lock queue_lock(queue_mutex);
while(queue.empty() && !do_exit)
queue_cond.wait(queue_lock);
if(queue.empty()) {
assert(do_exit);
return false;
}
entry = queue.front();
queue.pop_front();
return true;
}
void TaskScheduler::thread_run(int thread_id)
{
Entry entry;
/* todo: test affinity/denormal mask */
/* keep popping off tasks */
while(thread_wait_pop(entry)) {
/* run task */
entry.task->run(thread_id);
/* delete task */
delete entry.task;
/* notify pool task was done */
entry.pool->num_decrease(1);
}
}
void TaskScheduler::push(Entry& entry, bool front)
{
entry.pool->num_increase();
/* add entry to queue */
TaskScheduler::queue_mutex.lock();
if(front)
TaskScheduler::queue.push_front(entry);
else
TaskScheduler::queue.push_back(entry);
TaskScheduler::queue_cond.notify_one();
TaskScheduler::queue_mutex.unlock();
}
void TaskScheduler::clear(TaskPool *pool)
{
thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
/* erase all tasks from this pool from the queue */
list<Entry>::iterator it = queue.begin();
int done = 0;
while(it != queue.end()) {
Entry& entry = *it;
if(entry.pool == pool) {
done++;
delete entry.task;
it = queue.erase(it);
}
else
it++;
}
queue_lock.unlock();
/* notify done */
pool->num_decrease(done);
}
/* Dedicated Task Pool */
DedicatedTaskPool::DedicatedTaskPool()
{
do_cancel = false;
do_exit = false;
num = 0;
worker_thread = new thread(function_bind(&DedicatedTaskPool::thread_run, this));
}
DedicatedTaskPool::~DedicatedTaskPool()
{
stop();
worker_thread->join();
delete worker_thread;
}
void DedicatedTaskPool::push(Task *task, bool front)
{
num_increase();
/* add task to queue */
queue_mutex.lock();
if(front)
queue.push_front(task);
else
queue.push_back(task);
queue_cond.notify_one();
queue_mutex.unlock();
}
void DedicatedTaskPool::push(const TaskRunFunction& run, bool front)
{
push(new Task(run), front);
}
void DedicatedTaskPool::wait()
{
thread_scoped_lock num_lock(num_mutex);
while(num)
num_cond.wait(num_lock);
}
void DedicatedTaskPool::cancel()
{
do_cancel = true;
clear();
wait();
do_cancel = false;
}
void DedicatedTaskPool::stop()
{
clear();
do_exit = true;
queue_cond.notify_all();
wait();
assert(num == 0);
}
bool DedicatedTaskPool::canceled()
{
return do_cancel;
}
void DedicatedTaskPool::num_decrease(int done)
{
thread_scoped_lock num_lock(num_mutex);
num -= done;
assert(num >= 0);
if(num == 0)
num_cond.notify_all();
}
void DedicatedTaskPool::num_increase()
{
thread_scoped_lock num_lock(num_mutex);
num++;
num_cond.notify_all();
}
bool DedicatedTaskPool::thread_wait_pop(Task*& task)
{
thread_scoped_lock queue_lock(queue_mutex);
while(queue.empty() && !do_exit)
queue_cond.wait(queue_lock);
if(queue.empty()) {
assert(do_exit);
return false;
}
task = queue.front();
queue.pop_front();
return true;
}
void DedicatedTaskPool::thread_run()
{
Task *task;
/* keep popping off tasks */
while(thread_wait_pop(task)) {
/* run task */
task->run(0);
/* delete task */
delete task;
/* notify task was done */
num_decrease(1);
}
}
void DedicatedTaskPool::clear()
{
thread_scoped_lock queue_lock(queue_mutex);
/* erase all tasks from the queue */
list<Task*>::iterator it = queue.begin();
int done = 0;
while(it != queue.end()) {
done++;
delete *it;
it = queue.erase(it);
}
queue_lock.unlock();
/* notify done */
num_decrease(done);
}
string TaskPool::Summary::full_report() const
{
string report = "";
report += string_printf("Total time: %f\n", time_total);
report += string_printf("Tasks handled: %d\n", num_tasks_handled);
return report;
}
CCL_NAMESPACE_END