forked from bartvdbraak/blender
Threads: added queue for passing data between threads. Includes a function
to wait for an item to be put in the queue and then pop immediately without, this makes it possible to avoid sleep() while waiting for the results of a thread.
This commit is contained in:
parent
cbc4aae06a
commit
2d2339a709
@ -48,6 +48,11 @@ GSQueue* BLI_gsqueue_new (int elem_size);
|
|||||||
*/
|
*/
|
||||||
int BLI_gsqueue_is_empty(GSQueue *gq);
|
int BLI_gsqueue_is_empty(GSQueue *gq);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Query number elements in the queue
|
||||||
|
*/
|
||||||
|
int BLI_gsqueue_size(GSQueue *gq);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Access the item at the head of the queue
|
* Access the item at the head of the queue
|
||||||
* without removing it.
|
* without removing it.
|
||||||
|
@ -107,6 +107,21 @@ void BLI_destroy_worker(struct ThreadedWorker *worker);
|
|||||||
* NOTE: inserting work is NOT thread safe, so make sure it is only done from one thread */
|
* NOTE: inserting work is NOT thread safe, so make sure it is only done from one thread */
|
||||||
void BLI_insert_work(struct ThreadedWorker *worker, void *param);
|
void BLI_insert_work(struct ThreadedWorker *worker, void *param);
|
||||||
|
|
||||||
|
/* ThreadWorkQueue
|
||||||
|
*
|
||||||
|
* Thread-safe work queue to push work/pointers between threads. */
|
||||||
|
|
||||||
|
typedef struct ThreadQueue ThreadQueue;
|
||||||
|
|
||||||
|
ThreadQueue *BLI_thread_queue_init();
|
||||||
|
void BLI_thread_queue_free(ThreadQueue *queue);
|
||||||
|
|
||||||
|
void BLI_thread_queue_push(ThreadQueue *queue, void *work);
|
||||||
|
void *BLI_thread_queue_pop(ThreadQueue *queue);
|
||||||
|
void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms);
|
||||||
|
int BLI_thread_queue_size(ThreadQueue *queue);
|
||||||
|
|
||||||
|
void BLI_thread_queue_nowait(ThreadQueue *queue);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
@ -32,12 +32,14 @@
|
|||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
|
#include <errno.h>
|
||||||
|
|
||||||
#include "MEM_guardedalloc.h"
|
#include "MEM_guardedalloc.h"
|
||||||
|
|
||||||
#include "DNA_listBase.h"
|
#include "DNA_listBase.h"
|
||||||
|
|
||||||
#include "BLI_blenlib.h"
|
#include "BLI_blenlib.h"
|
||||||
|
#include "BLI_gsqueue.h"
|
||||||
#include "BLI_threads.h"
|
#include "BLI_threads.h"
|
||||||
|
|
||||||
#include "PIL_time.h"
|
#include "PIL_time.h"
|
||||||
@ -458,4 +460,132 @@ void BLI_insert_work(ThreadedWorker *worker, void *param)
|
|||||||
BLI_insert_thread(&worker->threadbase, p);
|
BLI_insert_thread(&worker->threadbase, p);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* eof */
|
/* ************************************************ */
|
||||||
|
|
||||||
|
struct ThreadQueue {
|
||||||
|
GSQueue *queue;
|
||||||
|
pthread_mutex_t mutex;
|
||||||
|
pthread_cond_t cond;
|
||||||
|
int nowait;
|
||||||
|
};
|
||||||
|
|
||||||
|
ThreadQueue *BLI_thread_queue_init()
|
||||||
|
{
|
||||||
|
ThreadQueue *queue;
|
||||||
|
|
||||||
|
queue= MEM_callocN(sizeof(ThreadQueue), "ThreadQueue");
|
||||||
|
queue->queue= BLI_gsqueue_new(sizeof(void*));
|
||||||
|
|
||||||
|
pthread_mutex_init(&queue->mutex, NULL);
|
||||||
|
pthread_cond_init(&queue->cond, NULL);
|
||||||
|
|
||||||
|
return queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
void BLI_thread_queue_free(ThreadQueue *queue)
|
||||||
|
{
|
||||||
|
pthread_cond_destroy(&queue->cond);
|
||||||
|
pthread_mutex_destroy(&queue->mutex);
|
||||||
|
|
||||||
|
BLI_gsqueue_free(queue->queue);
|
||||||
|
|
||||||
|
MEM_freeN(queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
void BLI_thread_queue_push(ThreadQueue *queue, void *work)
|
||||||
|
{
|
||||||
|
pthread_mutex_lock(&queue->mutex);
|
||||||
|
|
||||||
|
BLI_gsqueue_push(queue->queue, &work);
|
||||||
|
|
||||||
|
/* signal threads waiting to pop */
|
||||||
|
pthread_cond_signal(&queue->cond);
|
||||||
|
pthread_mutex_unlock(&queue->mutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
void *BLI_thread_queue_pop(ThreadQueue *queue)
|
||||||
|
{
|
||||||
|
void *work= NULL;
|
||||||
|
|
||||||
|
/* wait until there is work */
|
||||||
|
pthread_mutex_lock(&queue->mutex);
|
||||||
|
while(BLI_gsqueue_is_empty(queue->queue) && !queue->nowait)
|
||||||
|
pthread_cond_wait(&queue->cond, &queue->mutex);
|
||||||
|
|
||||||
|
/* if we have something, pop it */
|
||||||
|
if(!BLI_gsqueue_is_empty(queue->queue))
|
||||||
|
BLI_gsqueue_pop(queue->queue, &work);
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&queue->mutex);
|
||||||
|
|
||||||
|
return work;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void wait_timeout(struct timespec *timeout, int ms)
|
||||||
|
{
|
||||||
|
struct timeval now;
|
||||||
|
ldiv_t div_result;
|
||||||
|
long x;
|
||||||
|
|
||||||
|
gettimeofday(&now, NULL);
|
||||||
|
div_result = ldiv(ms, 1000);
|
||||||
|
timeout->tv_sec = now.tv_sec + div_result.quot;
|
||||||
|
x = now.tv_usec + (div_result.rem*1000);
|
||||||
|
|
||||||
|
if (x >= 1000000) {
|
||||||
|
timeout->tv_sec++;
|
||||||
|
x -= 1000000;
|
||||||
|
}
|
||||||
|
|
||||||
|
timeout->tv_nsec = x*1000;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms)
|
||||||
|
{
|
||||||
|
double t;
|
||||||
|
void *work= NULL;
|
||||||
|
struct timespec timeout;
|
||||||
|
|
||||||
|
t= PIL_check_seconds_timer();
|
||||||
|
wait_timeout(&timeout, ms);
|
||||||
|
|
||||||
|
/* wait until there is work */
|
||||||
|
pthread_mutex_lock(&queue->mutex);
|
||||||
|
while(BLI_gsqueue_is_empty(queue->queue) && !queue->nowait) {
|
||||||
|
if(pthread_cond_timedwait(&queue->cond, &queue->mutex, &timeout) == ETIMEDOUT)
|
||||||
|
break;
|
||||||
|
else if(PIL_check_seconds_timer() - t >= ms*0.001)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* if we have something, pop it */
|
||||||
|
if(!BLI_gsqueue_is_empty(queue->queue))
|
||||||
|
BLI_gsqueue_pop(queue->queue, &work);
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&queue->mutex);
|
||||||
|
|
||||||
|
return work;
|
||||||
|
}
|
||||||
|
|
||||||
|
int BLI_thread_queue_size(ThreadQueue *queue)
|
||||||
|
{
|
||||||
|
int size;
|
||||||
|
|
||||||
|
pthread_mutex_lock(&queue->mutex);
|
||||||
|
size= BLI_gsqueue_size(queue->queue);
|
||||||
|
pthread_mutex_unlock(&queue->mutex);
|
||||||
|
|
||||||
|
return size;
|
||||||
|
}
|
||||||
|
|
||||||
|
void BLI_thread_queue_nowait(ThreadQueue *queue)
|
||||||
|
{
|
||||||
|
pthread_mutex_lock(&queue->mutex);
|
||||||
|
|
||||||
|
queue->nowait= 1;
|
||||||
|
|
||||||
|
/* signal threads waiting to pop */
|
||||||
|
pthread_cond_signal(&queue->cond);
|
||||||
|
pthread_mutex_unlock(&queue->mutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user