From 69eeec4549081fd11a60f6eeb28e2a51421b7122 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?IOhannes=20m=20zm=C3=B6lnig?= Date: Fri, 13 Jul 2012 20:44:20 +0000 Subject: when finishing queue, wait until all users have quit for this to work, all uses must be registered. a better way would be to use semaphores, but afaik the w32 implementation of pthreads does not support those (yet?) svn path=/trunk/externals/iem/iemnet/; revision=16152 --- iemnet_data.c | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/iemnet_data.c b/iemnet_data.c index 3c6cb9c..bbd0f07 100644 --- a/iemnet_data.c +++ b/iemnet_data.c @@ -218,8 +218,23 @@ struct _iemnet_queue { int done; // in cleanup state int size; + + pthread_mutex_t usedmtx; + pthread_cond_t usedcond; + int used; // use counter, so queue_finish can wait for blocking accesses to finish }; +static void queue_use_increment(t_iemnet_queue* _this) { + pthread_mutex_lock(&_this->usedmtx); + _this->used++; + pthread_mutex_unlock(&_this->usedmtx); +} +static void queue_use_decrement(t_iemnet_queue* _this) { + pthread_mutex_lock(&_this->usedmtx); + _this->used--; + pthread_cond_signal(&_this->usedcond); + pthread_mutex_unlock(&_this->usedmtx); +} /* push a chunk into the queue * this will return the current queue size @@ -276,6 +291,7 @@ t_iemnet_chunk* queue_pop_block( t_iemnet_chunk*data=0; if(NULL == _this)return NULL; + queue_use_increment(_this); pthread_mutex_lock(&_this->mtx); /* if the queue is empty, wait */ @@ -286,6 +302,7 @@ t_iemnet_chunk* queue_pop_block( */ if(_this->done) { pthread_mutex_unlock(&_this->mtx); + queue_use_decrement(_this); return NULL; } } @@ -307,6 +324,7 @@ t_iemnet_chunk* queue_pop_block( free(head); head=NULL; } + queue_use_decrement(_this); return data; } /* pop a chunk from the queue @@ -320,10 +338,12 @@ t_iemnet_chunk* queue_pop_noblock( t_iemnet_chunk*data=0; if(NULL == _this)return NULL; + queue_use_increment(_this); pthread_mutex_lock(&_this->mtx); if (! (head = _this->head)) { // empty head pthread_mutex_unlock(&_this->mtx); + queue_use_decrement(_this); return NULL; } @@ -341,6 +361,7 @@ t_iemnet_chunk* queue_pop_noblock( free(head); head=NULL; } + queue_use_decrement(_this); return data; } @@ -367,6 +388,12 @@ void queue_finish(t_iemnet_queue* q) { pthread_cond_signal(&q->cond); DEBUG("queue signaled: %x", q); pthread_mutex_unlock(&q->mtx); + + /* wait until queue is no longer used */ + pthread_mutex_lock(&q->usedmtx); + while(q->used) pthread_cond_wait(&q->usedcond, &q->usedmtx); + pthread_mutex_unlock(&q->usedmtx); + DEBUG("queue_finished: %x", q); } @@ -389,6 +416,9 @@ void queue_destroy(t_iemnet_queue* q) { pthread_mutex_destroy(&q->mtx); pthread_cond_destroy(&q->cond); + pthread_mutex_destroy(&q->usedmtx); + pthread_cond_destroy(&q->usedcond); + free(q); q=NULL; DEBUG("queue destroyed %x", q); @@ -408,8 +438,12 @@ t_iemnet_queue* queue_create(void) { memcpy(&q->cond, &cond, sizeof(pthread_cond_t)); memcpy(&q->mtx , &mtx, sizeof(pthread_mutex_t)); + memcpy(&q->usedcond, &cond, sizeof(pthread_cond_t)); + memcpy(&q->usedmtx , &mtx, sizeof(pthread_mutex_t)); + q->done = 0; q->size = 0; + q->used = 0; DEBUG("queue created %x", q); return q; } -- cgit v1.2.1