Applied and completed a compositor patch by Brecht to use signalling and waiting in scheduling and worker threads instead of continuous loops with sleep times. This should help reduce unnecessary wait times in Tile.

This commit is contained in:
Lukas Toenne 2012-06-10 12:26:33 +00:00
parent 5e29381825
commit 7496a58cfb
5 changed files with 60 additions and 73 deletions

@ -136,6 +136,7 @@ void *BLI_thread_queue_pop(ThreadQueue *queue);
void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms);
int BLI_thread_queue_size(ThreadQueue *queue);
void BLI_thread_queue_wait_finish(ThreadQueue *queue);
void BLI_thread_queue_nowait(ThreadQueue *queue);
#endif

@ -520,8 +520,10 @@ void BLI_insert_work(ThreadedWorker *worker, void *param)
struct ThreadQueue {
GSQueue *queue;
pthread_mutex_t mutex;
pthread_cond_t cond;
int nowait;
pthread_cond_t push_cond;
pthread_cond_t finish_cond;
volatile int nowait;
volatile int cancelled;
};
ThreadQueue *BLI_thread_queue_init(void)
@ -532,14 +534,17 @@ ThreadQueue *BLI_thread_queue_init(void)
queue->queue = BLI_gsqueue_new(sizeof(void *));
pthread_mutex_init(&queue->mutex, NULL);
pthread_cond_init(&queue->cond, NULL);
pthread_cond_init(&queue->push_cond, NULL);
pthread_cond_init(&queue->finish_cond, NULL);
return queue;
}
void BLI_thread_queue_free(ThreadQueue *queue)
{
pthread_cond_destroy(&queue->cond);
/* destroy everything, assumes no one is using queue anymore */
pthread_cond_destroy(&queue->finish_cond);
pthread_cond_destroy(&queue->push_cond);
pthread_mutex_destroy(&queue->mutex);
BLI_gsqueue_free(queue->queue);
@ -554,7 +559,7 @@ void BLI_thread_queue_push(ThreadQueue *queue, void *work)
BLI_gsqueue_push(queue->queue, &work);
/* signal threads waiting to pop */
pthread_cond_signal(&queue->cond);
pthread_cond_signal(&queue->push_cond);
pthread_mutex_unlock(&queue->mutex);
}
@ -565,11 +570,15 @@ void *BLI_thread_queue_pop(ThreadQueue *queue)
/* wait until there is work */
pthread_mutex_lock(&queue->mutex);
while (BLI_gsqueue_is_empty(queue->queue) && !queue->nowait)
pthread_cond_wait(&queue->cond, &queue->mutex);
pthread_cond_wait(&queue->push_cond, &queue->mutex);
/* if we have something, pop it */
if (!BLI_gsqueue_is_empty(queue->queue))
if (!BLI_gsqueue_is_empty(queue->queue)) {
BLI_gsqueue_pop(queue->queue, &work);
if(BLI_gsqueue_is_empty(queue->queue))
pthread_cond_broadcast(&queue->finish_cond);
}
pthread_mutex_unlock(&queue->mutex);
@ -623,16 +632,20 @@ void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms)
/* wait until there is work */
pthread_mutex_lock(&queue->mutex);
while (BLI_gsqueue_is_empty(queue->queue) && !queue->nowait) {
if (pthread_cond_timedwait(&queue->cond, &queue->mutex, &timeout) == ETIMEDOUT)
if (pthread_cond_timedwait(&queue->push_cond, &queue->mutex, &timeout) == ETIMEDOUT)
break;
else if (PIL_check_seconds_timer() - t >= ms * 0.001)
break;
}
/* if we have something, pop it */
if (!BLI_gsqueue_is_empty(queue->queue))
if (!BLI_gsqueue_is_empty(queue->queue)) {
BLI_gsqueue_pop(queue->queue, &work);
if(BLI_gsqueue_is_empty(queue->queue))
pthread_cond_broadcast(&queue->finish_cond);
}
pthread_mutex_unlock(&queue->mutex);
return work;
@ -656,10 +669,23 @@ void BLI_thread_queue_nowait(ThreadQueue *queue)
queue->nowait = 1;
/* signal threads waiting to pop */
pthread_cond_signal(&queue->cond);
pthread_cond_broadcast(&queue->push_cond);
pthread_mutex_unlock(&queue->mutex);
}
void BLI_thread_queue_wait_finish(ThreadQueue *queue)
{
/* wait for finish condition */
pthread_mutex_lock(&queue->mutex);
while(!BLI_gsqueue_is_empty(queue->queue))
pthread_cond_wait(&queue->finish_cond, &queue->mutex);
pthread_mutex_unlock(&queue->mutex);
}
/* ************************************************ */
void BLI_begin_threaded_malloc(void)
{
if (thread_levels == 0) {
@ -674,3 +700,4 @@ void BLI_end_threaded_malloc(void)
if (thread_levels == 0)
MEM_set_lock_callback(NULL, NULL);
}

@ -351,7 +351,8 @@ void ExecutionGroup::execute(ExecutionSystem *graph)
startIndex = index+1;
}
}
PIL_sleep_ms(10);
WorkScheduler::finish();
if (bTree->test_break && bTree->test_break(bTree->tbh)) {
breaked = true;

@ -39,8 +39,6 @@
#endif
/// @brief global state of the WorkScheduler.
static WorkSchedulerState state;
/// @brief list of all CPUDevices. for every hardware thread an instance of CPUDevice is created
static vector<CPUDevice*> cpudevices;
@ -68,43 +66,29 @@ static bool openclActive = false;
#if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE
void *WorkScheduler::thread_execute_cpu(void *data)
{
bool continueLoop = true;
Device *device = (Device*)data;
while (continueLoop) {
WorkPackage *work = (WorkPackage*)BLI_thread_queue_pop(cpuqueue);
if (work) {
device->execute(work);
delete work;
}
PIL_sleep_ms(10);
if (WorkScheduler::isStopping()) {
continueLoop = false;
}
WorkPackage *work;
while ((work = (WorkPackage*)BLI_thread_queue_pop(cpuqueue))) {
device->execute(work);
delete work;
}
return NULL;
}
void *WorkScheduler::thread_execute_gpu(void *data)
{
bool continueLoop = true;
Device *device = (Device*)data;
while (continueLoop) {
WorkPackage *work = (WorkPackage*)BLI_thread_queue_pop(gpuqueue);
if (work) {
device->execute(work);
delete work;
}
PIL_sleep_ms(10);
if (WorkScheduler::isStopping()) {
continueLoop = false;
}
WorkPackage *work;
while ((work = (WorkPackage*)BLI_thread_queue_pop(gpuqueue))) {
device->execute(work);
delete work;
}
return NULL;
}
bool WorkScheduler::isStopping() {return state == COM_WSS_STOPPING;}
#endif
@ -135,7 +119,6 @@ void WorkScheduler::start(CompositorContext &context)
#if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE
unsigned int index;
cpuqueue = BLI_thread_queue_init();
BLI_thread_queue_nowait(cpuqueue);
BLI_init_threads(&cputhreads, thread_execute_cpu, cpudevices.size());
for (index = 0 ; index < cpudevices.size() ; index ++) {
Device *device = cpudevices[index];
@ -144,7 +127,6 @@ void WorkScheduler::start(CompositorContext &context)
#ifdef COM_OPENCL_ENABLED
if (context.getHasActiveOpenCLDevices()) {
gpuqueue = BLI_thread_queue_init();
BLI_thread_queue_nowait(gpuqueue);
BLI_init_threads(&gputhreads, thread_execute_gpu, gpudevices.size());
for (index = 0 ; index < gpudevices.size() ; index ++) {
Device *device = gpudevices[index];
@ -157,45 +139,39 @@ void WorkScheduler::start(CompositorContext &context)
}
#endif
#endif
state = COM_WSS_STARTED;
}
void WorkScheduler::finish()
{
#if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE
#ifdef COM_OPENCL_ENABLED
if (openclActive) {
while (BLI_thread_queue_size(gpuqueue) + BLI_thread_queue_size(cpuqueue) > 0) {
PIL_sleep_ms(10);
}
BLI_thread_queue_wait_finish(gpuqueue);
BLI_thread_queue_wait_finish(cpuqueue);
}
else {
while (BLI_thread_queue_size(cpuqueue) > 0) {
PIL_sleep_ms(10);
}
BLI_thread_queue_wait_finish(cpuqueue);
}
#else
while (BLI_thread_queue_size(cpuqueue) > 0) {
PIL_sleep_ms(10);
}
BLI_thread_queue_wait_finish(cpuqueue);
#endif
#endif
}
void WorkScheduler::stop()
{
state = COM_WSS_STOPPING;
#if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE
BLI_thread_queue_nowait(cpuqueue);
BLI_end_threads(&cputhreads);
BLI_thread_queue_free(cpuqueue);
cpuqueue = NULL;
#ifdef COM_OPENCL_ENABLED
if (openclActive) {
BLI_thread_queue_nowait(gpuqueue);
BLI_end_threads(&gputhreads);
BLI_thread_queue_free(gpuqueue);
gpuqueue = NULL;
}
#endif
#endif
state = COM_WSS_STOPPED;
}
bool WorkScheduler::hasGPUDevices()
@ -218,8 +194,6 @@ extern void clContextError(const char *errinfo, const void *private_info, size_t
void WorkScheduler::initialize()
{
state = COM_WSS_UNKNOWN;
#if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE
int numberOfCPUThreads = BLI_system_thread_count();
@ -298,8 +272,6 @@ void WorkScheduler::initialize()
}
#endif
#endif
state = COM_WSS_INITIALIZED;
}
void WorkScheduler::deinitialize()
@ -329,5 +301,4 @@ void WorkScheduler::deinitialize()
}
#endif
#endif
state = COM_WSS_DEINITIALIZED;
}

@ -31,19 +31,6 @@ extern "C" {
#include "COM_defines.h"
#include "COM_Device.h"
// STATES
/** @brief states of the WorkScheduler
* @ingroup execution
*/
typedef enum WorkSchedulerState {
COM_WSS_UNKNOWN = -1,
COM_WSS_INITIALIZED = 0,
COM_WSS_STARTED = 1,
COM_WSS_STOPPING = 2,
COM_WSS_STOPPED = 3,
COM_WSS_DEINITIALIZED = 4
} WorkSchedulerState;
/** @brief the workscheduler
* @ingroup execution
*/