diff options
Diffstat (limited to 'iemnet_data.c')
-rw-r--r-- | iemnet_data.c | 34 |
1 files changed, 34 insertions, 0 deletions
diff --git a/iemnet_data.c b/iemnet_data.c index 3c6cb9c..bbd0f07 100644 --- a/iemnet_data.c +++ b/iemnet_data.c @@ -218,8 +218,23 @@ struct _iemnet_queue { int done; // in cleanup state int size; + + pthread_mutex_t usedmtx; + pthread_cond_t usedcond; + int used; // use counter, so queue_finish can wait for blocking accesses to finish }; +static void queue_use_increment(t_iemnet_queue* _this) { + pthread_mutex_lock(&_this->usedmtx); + _this->used++; + pthread_mutex_unlock(&_this->usedmtx); +} +static void queue_use_decrement(t_iemnet_queue* _this) { + pthread_mutex_lock(&_this->usedmtx); + _this->used--; + pthread_cond_signal(&_this->usedcond); + pthread_mutex_unlock(&_this->usedmtx); +} /* push a chunk into the queue * this will return the current queue size @@ -276,6 +291,7 @@ t_iemnet_chunk* queue_pop_block( t_iemnet_chunk*data=0; if(NULL == _this)return NULL; + queue_use_increment(_this); pthread_mutex_lock(&_this->mtx); /* if the queue is empty, wait */ @@ -286,6 +302,7 @@ t_iemnet_chunk* queue_pop_block( */ if(_this->done) { pthread_mutex_unlock(&_this->mtx); + queue_use_decrement(_this); return NULL; } } @@ -307,6 +324,7 @@ t_iemnet_chunk* queue_pop_block( free(head); head=NULL; } + queue_use_decrement(_this); return data; } /* pop a chunk from the queue @@ -320,10 +338,12 @@ t_iemnet_chunk* queue_pop_noblock( t_iemnet_chunk*data=0; if(NULL == _this)return NULL; + queue_use_increment(_this); pthread_mutex_lock(&_this->mtx); if (! (head = _this->head)) { // empty head pthread_mutex_unlock(&_this->mtx); + queue_use_decrement(_this); return NULL; } @@ -341,6 +361,7 @@ t_iemnet_chunk* queue_pop_noblock( free(head); head=NULL; } + queue_use_decrement(_this); return data; } @@ -367,6 +388,12 @@ void queue_finish(t_iemnet_queue* q) { pthread_cond_signal(&q->cond); DEBUG("queue signaled: %x", q); pthread_mutex_unlock(&q->mtx); + + /* wait until queue is no longer used */ + pthread_mutex_lock(&q->usedmtx); + while(q->used) pthread_cond_wait(&q->usedcond, &q->usedmtx); + pthread_mutex_unlock(&q->usedmtx); + DEBUG("queue_finished: %x", q); } @@ -389,6 +416,9 @@ void queue_destroy(t_iemnet_queue* q) { pthread_mutex_destroy(&q->mtx); pthread_cond_destroy(&q->cond); + pthread_mutex_destroy(&q->usedmtx); + pthread_cond_destroy(&q->usedcond); + free(q); q=NULL; DEBUG("queue destroyed %x", q); @@ -408,8 +438,12 @@ t_iemnet_queue* queue_create(void) { memcpy(&q->cond, &cond, sizeof(pthread_cond_t)); memcpy(&q->mtx , &mtx, sizeof(pthread_mutex_t)); + memcpy(&q->usedcond, &cond, sizeof(pthread_cond_t)); + memcpy(&q->usedmtx , &mtx, sizeof(pthread_mutex_t)); + q->done = 0; q->size = 0; + q->used = 0; DEBUG("queue created %x", q); return q; } |