Cycles: reviewed the task scheduler code and fixed (hopefully all) windows threading problems.

This commit is contained in:
Brecht Van Lommel 2012-05-10 22:31:16 +00:00
parent d35d0e38ce
commit 8148d7b1df
2 changed files with 51 additions and 35 deletions

@ -28,8 +28,6 @@ CCL_NAMESPACE_BEGIN
TaskPool::TaskPool()
{
num = 0;
num_done = 0;
do_cancel = false;
}
@ -55,9 +53,11 @@ void TaskPool::push(const TaskRunFunction& run, bool front)
void TaskPool::wait_work()
{
thread_scoped_lock done_lock(done_mutex);
thread_scoped_lock num_lock(num_mutex);
while(num != 0) {
num_lock.unlock();
while(num_done != num) {
thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
/* find task from this pool. if we get a task from another pool,
@ -81,8 +81,6 @@ void TaskPool::wait_work()
/* if found task, do it, otherwise wait until other tasks are done */
if(found_entry) {
done_lock.unlock();
/* run task */
work_entry.task->run();
@ -90,26 +88,31 @@ void TaskPool::wait_work()
delete work_entry.task;
/* notify pool task was done */
done_increase(1);
done_lock.lock();
num_decrease(1);
}
else
done_cond.wait(done_lock);
num_lock.lock();
if(num == 0)
break;
if(!found_entry)
num_cond.wait(num_lock);
}
}
void TaskPool::cancel()
{
TaskScheduler::clear(this);
do_cancel = true;
{
thread_scoped_lock lock(done_mutex);
while(num_done != num)
done_cond.wait(lock);
TaskScheduler::clear(this);
{
thread_scoped_lock num_lock(num_mutex);
while(num)
num_cond.wait(num_lock);
}
do_cancel = false;
}
@ -117,7 +120,7 @@ void TaskPool::stop()
{
TaskScheduler::clear(this);
assert(num_done == num);
assert(num == 0);
}
bool TaskPool::cancelled()
@ -125,14 +128,23 @@ bool TaskPool::cancelled()
return do_cancel;
}
void TaskPool::done_increase(int done)
void TaskPool::num_decrease(int done)
{
done_mutex.lock();
num_done += done;
done_mutex.unlock();
num_mutex.lock();
num -= done;
assert(num_done <= num);
done_cond.notify_all();
assert(num >= 0);
if(num == 0)
num_cond.notify_all();
num_mutex.unlock();
}
void TaskPool::num_increase()
{
thread_scoped_lock num_lock(num_mutex);
num++;
num_cond.notify_all();
}
/* Task Scheduler */
@ -196,10 +208,10 @@ void TaskScheduler::exit()
bool TaskScheduler::thread_wait_pop(Entry& entry)
{
thread_scoped_lock lock(queue_mutex);
thread_scoped_lock queue_lock(queue_mutex);
while(queue.empty() && !do_exit)
queue_cond.wait(lock);
queue_cond.wait(queue_lock);
if(queue.empty()) {
assert(do_exit);
@ -227,27 +239,28 @@ void TaskScheduler::thread_run(int thread_id)
delete entry.task;
/* notify pool task was done */
entry.pool->done_increase(1);
entry.pool->num_decrease(1);
}
}
void TaskScheduler::push(Entry& entry, bool front)
{
entry.pool->num_increase();
/* add entry to queue */
TaskScheduler::queue_mutex.lock();
if(front)
TaskScheduler::queue.push_front(entry);
else
TaskScheduler::queue.push_back(entry);
entry.pool->num++;
TaskScheduler::queue_mutex.unlock();
TaskScheduler::queue_cond.notify_one();
TaskScheduler::queue_mutex.unlock();
}
void TaskScheduler::clear(TaskPool *pool)
{
thread_scoped_lock lock(queue_mutex);
thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
/* erase all tasks from this pool from the queue */
list<Entry>::iterator it = queue.begin();
@ -266,8 +279,10 @@ void TaskScheduler::clear(TaskPool *pool)
it++;
}
queue_lock.unlock();
/* notify done */
pool->done_increase(done);
pool->num_decrease(done);
}
CCL_NAMESPACE_END

@ -73,12 +73,13 @@ public:
protected:
friend class TaskScheduler;
void done_increase(int done);
void num_decrease(int done);
void num_increase();
thread_mutex done_mutex;
thread_condition_variable done_cond;
thread_mutex num_mutex;
thread_condition_variable num_cond;
volatile int num, num_done;
volatile int num;
volatile bool do_cancel;
};