aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--iemnet_data.c34
1 files changed, 34 insertions, 0 deletions
diff --git a/iemnet_data.c b/iemnet_data.c
index 3c6cb9c..bbd0f07 100644
--- a/iemnet_data.c
+++ b/iemnet_data.c
@@ -218,8 +218,23 @@ struct _iemnet_queue {
int done; // in cleanup state
int size;
+
+ pthread_mutex_t usedmtx;
+ pthread_cond_t usedcond;
+ int used; // use counter, so queue_finish can wait for blocking accesses to finish
};
+static void queue_use_increment(t_iemnet_queue* _this) {
+ pthread_mutex_lock(&_this->usedmtx);
+ _this->used++;
+ pthread_mutex_unlock(&_this->usedmtx);
+}
+static void queue_use_decrement(t_iemnet_queue* _this) {
+ pthread_mutex_lock(&_this->usedmtx);
+ _this->used--;
+ pthread_cond_signal(&_this->usedcond);
+ pthread_mutex_unlock(&_this->usedmtx);
+}
/* push a chunk into the queue
* this will return the current queue size
@@ -276,6 +291,7 @@ t_iemnet_chunk* queue_pop_block(
t_iemnet_chunk*data=0;
if(NULL == _this)return NULL;
+ queue_use_increment(_this);
pthread_mutex_lock(&_this->mtx);
/* if the queue is empty, wait */
@@ -286,6 +302,7 @@ t_iemnet_chunk* queue_pop_block(
*/
if(_this->done) {
pthread_mutex_unlock(&_this->mtx);
+ queue_use_decrement(_this);
return NULL;
}
}
@@ -307,6 +324,7 @@ t_iemnet_chunk* queue_pop_block(
free(head);
head=NULL;
}
+ queue_use_decrement(_this);
return data;
}
/* pop a chunk from the queue
@@ -320,10 +338,12 @@ t_iemnet_chunk* queue_pop_noblock(
t_iemnet_chunk*data=0;
if(NULL == _this)return NULL;
+ queue_use_increment(_this);
pthread_mutex_lock(&_this->mtx);
if (! (head = _this->head)) {
// empty head
pthread_mutex_unlock(&_this->mtx);
+ queue_use_decrement(_this);
return NULL;
}
@@ -341,6 +361,7 @@ t_iemnet_chunk* queue_pop_noblock(
free(head);
head=NULL;
}
+ queue_use_decrement(_this);
return data;
}
@@ -367,6 +388,12 @@ void queue_finish(t_iemnet_queue* q) {
pthread_cond_signal(&q->cond);
DEBUG("queue signaled: %x", q);
pthread_mutex_unlock(&q->mtx);
+
+ /* wait until queue is no longer used */
+ pthread_mutex_lock(&q->usedmtx);
+ while(q->used) pthread_cond_wait(&q->usedcond, &q->usedmtx);
+ pthread_mutex_unlock(&q->usedmtx);
+
DEBUG("queue_finished: %x", q);
}
@@ -389,6 +416,9 @@ void queue_destroy(t_iemnet_queue* q) {
pthread_mutex_destroy(&q->mtx);
pthread_cond_destroy(&q->cond);
+ pthread_mutex_destroy(&q->usedmtx);
+ pthread_cond_destroy(&q->usedcond);
+
free(q);
q=NULL;
DEBUG("queue destroyed %x", q);
@@ -408,8 +438,12 @@ t_iemnet_queue* queue_create(void) {
memcpy(&q->cond, &cond, sizeof(pthread_cond_t));
memcpy(&q->mtx , &mtx, sizeof(pthread_mutex_t));
+ memcpy(&q->usedcond, &cond, sizeof(pthread_cond_t));
+ memcpy(&q->usedmtx , &mtx, sizeof(pthread_mutex_t));
+
q->done = 0;
q->size = 0;
+ q->used = 0;
DEBUG("queue created %x", q);
return q;
}