Task scheduler: Optimize subsequent pushing bunch of tasks

The idea is to accumulate all new tasks in a thread local queue
first without doing any thread synchronization (aka, locks and
conditional variables) and move those tasks to a scheduler queue
once they are all ready. This way we avoid per-task-pool lock
and only have one lock per bunch of tasks.

This is particularly handy when scheduling new dependency graph
node children. Brings FPS of cached simulation from the linked
below file from ~30 to ~50.

See documentation for BLI_task_pool_delayed_push_{begin, end}
and for TaskThreadLocalStorage::do_delayed_push.

Fixes T50027: Rigidbody playback and simulation performance regression with new depsgraph

Thanks Bastien for the review!
This commit is contained in:
Sergey Sharybin 2017-05-31 15:24:09 +02:00
parent 2ae6973936
commit a481908232
3 changed files with 115 additions and 8 deletions

@ -106,6 +106,13 @@ void *BLI_task_pool_userdata(TaskPool *pool);
/* optional mutex to use from run function */
ThreadMutex *BLI_task_pool_user_mutex(TaskPool *pool);
/* Delayed push, use that to reduce thread overhead by accumulating
* all new tasks into local queue first and pushing it to scheduler
* from within a single mutex lock.
*/
void BLI_task_pool_delayed_push_begin(TaskPool *pool, int thread_id);
void BLI_task_pool_delayed_push_end(TaskPool *pool, int thread_id);
/* Parallel for routines */
typedef void (*TaskParallelRangeFunc)(void *userdata, const int iter);
typedef void (*TaskParallelRangeFuncEx)(void *userdata, void *userdata_chunk, const int iter, const int thread_id);

@ -54,6 +54,13 @@
*/
#define LOCAL_QUEUE_SIZE 1
/* Number of tasks which are allowed to be scheduled in a delayed manner.
*
* This allows to use less locks per graph node children schedule. More details
* could be found at TaskThreadLocalStorage::do_delayed_push.
*/
#define DELAYED_QUEUE_SIZE 4096
#ifndef NDEBUG
# define ASSERT_THREAD_ID(scheduler, thread_id) \
do { \
@ -129,9 +136,28 @@ typedef struct TaskMemPoolStats {
#endif
typedef struct TaskThreadLocalStorage {
/* Memory pool for faster task allocation.
* The idea is to re-use memory of finished/discarded tasks by this thread.
*/
TaskMemPool task_mempool;
/* Local queue keeps thread alive by keeping small amount of tasks ready
* to be picked up without causing global thread locks for synchronization.
*/
int num_local_queue;
Task *local_queue[LOCAL_QUEUE_SIZE];
/* Thread can be marked for delayed tasks push. This is helpful when it's
* know that lots of subsequent task pushed will happen from the same thread
* without "interrupting" for task execution.
*
* We try to accumulate as much tasks as possible in a local queue without
* any locks first, and then we push all of them into a scheduler's queue
* from within a single mutex lock.
*/
bool do_delayed_push;
int num_delayed_queue;
Task *delayed_queue[DELAYED_QUEUE_SIZE];
} TaskThreadLocalStorage;
struct TaskPool {
@ -378,6 +404,7 @@ static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task
BLI_INLINE void handle_local_queue(TaskThreadLocalStorage *tls,
const int thread_id)
{
BLI_assert(!tls->do_delayed_push);
while (tls->num_local_queue > 0) {
/* We pop task from queue before handling it so handler of the task can
* push next job to the local queue.
@ -391,6 +418,7 @@ BLI_INLINE void handle_local_queue(TaskThreadLocalStorage *tls,
local_task->run(local_pool, local_task->taskdata, thread_id);
task_free(local_pool, local_task, thread_id);
}
BLI_assert(!tls->do_delayed_push);
}
static void *task_scheduler_thread_run(void *thread_p)
@ -408,7 +436,9 @@ static void *task_scheduler_thread_run(void *thread_p)
TaskPool *pool = task->pool;
/* run task */
BLI_assert(!tls->do_delayed_push);
task->run(pool, task->taskdata, thread_id);
BLI_assert(!tls->do_delayed_push);
/* delete task */
task_free(pool, task, thread_id);
@ -547,6 +577,27 @@ static void task_scheduler_push(TaskScheduler *scheduler, Task *task, TaskPriori
BLI_mutex_unlock(&scheduler->queue_mutex);
}
static void task_scheduler_push_all(TaskScheduler *scheduler,
TaskPool *pool,
Task **tasks,
int num_tasks)
{
if (num_tasks == 0) {
return;
}
task_pool_num_increase(pool, num_tasks);
BLI_mutex_lock(&scheduler->queue_mutex);
for (int i = 0; i < num_tasks; i++) {
BLI_addhead(&scheduler->queue, tasks[i]);
}
BLI_condition_notify_all(&scheduler->queue_cond);
BLI_mutex_unlock(&scheduler->queue_mutex);
}
static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool)
{
Task *task, *nexttask;
@ -714,38 +765,59 @@ void BLI_task_pool_free(TaskPool *pool)
BLI_end_threaded_malloc();
}
BLI_INLINE bool task_can_use_local_queues(TaskPool *pool, int thread_id)
{
return (thread_id != -1 && (thread_id != pool->thread_id || pool->do_work));
}
static void task_pool_push(
TaskPool *pool, TaskRunFunction run, void *taskdata,
bool free_taskdata, TaskFreeFunction freedata, TaskPriority priority,
int thread_id)
{
/* Allocate task and fill it's properties. */
Task *task = task_alloc(pool, thread_id);
task->run = run;
task->taskdata = taskdata;
task->free_taskdata = free_taskdata;
task->freedata = freedata;
task->pool = pool;
/* For suspended pools we put everything yo a global queue first
* and exit as soon as possible.
*
* This tasks will be moved to actual execution when pool is
* activated by work_and_wait().
*/
if (pool->is_suspended) {
BLI_addhead(&pool->suspended_queue, task);
atomic_fetch_and_add_z(&pool->num_suspended, 1);
return;
}
if (thread_id != -1 &&
(thread_id != pool->thread_id || pool->do_work))
{
/* Populate to any local queue first, this is cheapest push ever. */
if (task_can_use_local_queues(pool, thread_id)) {
ASSERT_THREAD_ID(pool->scheduler, thread_id);
TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
/* Try to push to a local execution queue.
* These tasks will be picked up next.
*/
if (tls->num_local_queue < LOCAL_QUEUE_SIZE) {
tls->local_queue[tls->num_local_queue] = task;
tls->num_local_queue++;
return;
}
/* If we are in the delayed tasks push mode, we push tasks to a
* temporary local queue first without any locks, and then move them
* to global execution queue with a single lock.
*/
if (tls->do_delayed_push && tls->num_delayed_queue < DELAYED_QUEUE_SIZE) {
tls->delayed_queue[tls->num_delayed_queue] = task;
tls->num_delayed_queue++;
return;
}
}
/* Do push to a global execution ppol, slowest possible method,
* causes quite reasonable amount of threading overhead.
*/
task_scheduler_push(pool->scheduler, task, priority);
}
@ -816,7 +888,9 @@ void BLI_task_pool_work_and_wait(TaskPool *pool)
/* if found task, do it, otherwise wait until other tasks are done */
if (found_task) {
/* run task */
BLI_assert(!tls->do_delayed_push);
work_task->run(pool, work_task->taskdata, pool->thread_id);
BLI_assert(!tls->do_delayed_push);
/* delete task */
task_free(pool, task, pool->thread_id);
@ -871,6 +945,30 @@ ThreadMutex *BLI_task_pool_user_mutex(TaskPool *pool)
return &pool->user_mutex;
}
void BLI_task_pool_delayed_push_begin(TaskPool *pool, int thread_id)
{
if (task_can_use_local_queues(pool, thread_id)) {
ASSERT_THREAD_ID(pool->scheduler, thread_id);
TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
tls->do_delayed_push = true;
}
}
void BLI_task_pool_delayed_push_end(TaskPool *pool, int thread_id)
{
if (task_can_use_local_queues(pool, thread_id)) {
ASSERT_THREAD_ID(pool->scheduler, thread_id);
TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id);
BLI_assert(tls->do_delayed_push);
task_scheduler_push_all(pool->scheduler,
pool,
tls->delayed_queue,
tls->num_delayed_queue);
tls->do_delayed_push = false;
tls->num_delayed_queue = 0;
}
}
/* Parallel range routines */
/**

@ -126,7 +126,9 @@ static void deg_task_run_func(TaskPool *pool,
#endif
}
BLI_task_pool_delayed_push_begin(pool, thread_id);
schedule_children(pool, state->graph, node, state->layers, thread_id);
BLI_task_pool_delayed_push_end(pool, thread_id);
}
typedef struct CalculatePengindData {