From e82deafc3a75cd51ff3d93069a60c37c334fef5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?IOhannes=20m=20zm=C3=B6lnig?= Date: Fri, 20 Aug 2010 16:54:42 +0000 Subject: fixed bugs in multithreading code svn path=/trunk/externals/iem/iemnet/; revision=13871 --- iemnet_data.c | 166 ++++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 102 insertions(+), 64 deletions(-) (limited to 'iemnet_data.c') diff --git a/iemnet_data.c b/iemnet_data.c index 9969e9a..e671d7d 100644 --- a/iemnet_data.c +++ b/iemnet_data.c @@ -203,6 +203,7 @@ struct _iemnet_queue { /* push a chunk into the queue + * this will return the current queue size */ int queue_push( t_iemnet_queue* const _this, @@ -211,37 +212,39 @@ int queue_push( t_node* tail; t_node* n=NULL; int size=-1; - if(_this) { - size=_this->size; + if(NULL == _this)return size; - if(NULL == data) return size; - //fprintf(stderr, "pushing %d bytes\n", data->size); + pthread_mutex_lock(&_this->mtx); + size=_this->size; + pthread_mutex_unlock(&_this->mtx); - n=(t_node*)getbytes(sizeof(t_node)); + if(NULL == data) return size; - n->next = 0; - n->data = data; - pthread_mutex_lock(&_this->mtx); - if (! (tail = _this->tail)) { - _this->head = n; - } else { - tail->next = n; - } - _this->tail = n; - - _this->size+=data->size; - size=_this->size; + n=(t_node*)getbytes(sizeof(t_node)); - //fprintf(stderr, "pushed %d bytes\n", data->size); + n->next = 0; + n->data = data; - pthread_mutex_unlock(&_this->mtx); - pthread_cond_signal(&_this->cond); + pthread_mutex_lock(&_this->mtx); + if (! (tail = _this->tail)) { + _this->head = n; + } else { + tail->next = n; } + _this->tail = n; + + _this->size+=data->size; + size=_this->size; + + // added new chunk, so tell waiting threads that they can pop the data + pthread_cond_signal(&_this->cond); + pthread_mutex_unlock(&_this->mtx); + return size; } -/* push a chunk from the queue +/* pop a chunk from the queue * if the queue is empty, this will block until * something has been pushed * OR the queue is "done" (in which case NULL is returned) @@ -252,61 +255,93 @@ t_iemnet_chunk* queue_pop_block( t_node* head=0; t_iemnet_chunk*data=0; - if(_this) { - pthread_mutex_lock(&_this->mtx); - - /* wait until there is something in the queue or the "done" flag is set */ - while (! (head = _this->head)) { - if(_this->done) { - pthread_mutex_unlock(&_this->mtx); - return NULL; - } else { - pthread_cond_wait(&_this->cond, &_this->mtx); - } + if(NULL == _this)return NULL; + + pthread_mutex_lock(&_this->mtx); +#if 1 + + /* if the queue is empty, wait */ + if(NULL == _this->head) { + pthread_cond_wait(&_this->cond, &_this->mtx); + /* somebody signaled us, that we should do some work + * either the queue has been filled, or we are done... + */ + if(_this->done) { + pthread_mutex_unlock(&_this->mtx); + return NULL; } + } + /* save the head, below we gonna work on this */ + head = _this->head; - if (! (_this->head = head->next)) { - _this->tail = 0; - } - if(head && head->data) { - _this->size-=head->data->size; - } + /* update _this */ + if (! (_this->head = head->next)) { + _this->tail = 0; + } + if(head && head->data) { + _this->size-=head->data->size; + } - pthread_mutex_unlock(&_this->mtx); - if(head) { - data=head->data; - freebytes(head, sizeof(t_node)); - head=NULL; + pthread_mutex_unlock(&_this->mtx); +#else + /* wait until there is something in the queue or the "done" flag is set */ + while (! (head = _this->head)) { + if(_this->done) { + pthread_mutex_unlock(&_this->mtx); + return NULL; + } else { + pthread_cond_wait(&_this->cond, &_this->mtx); } } + + if (! (_this->head = head->next)) { + _this->tail = 0; + } + if(head && head->data) { + _this->size-=head->data->size; + } + + pthread_mutex_unlock(&_this->mtx); +#endif + + if(head) { + data=head->data; + freebytes(head, sizeof(t_node)); + head=NULL; + } return data; } - +/* pop a chunk from the queue + * if the queue is empty, this will immediately return NULL + * (note that despite of the name this does block for synchronization) + */ t_iemnet_chunk* queue_pop_noblock( t_iemnet_queue* const _this ) { t_node* head=0; t_iemnet_chunk*data=0; - if(_this) { - pthread_mutex_lock(&_this->mtx); - if (! (head = _this->head)) { - // empty head - pthread_mutex_unlock(&_this->mtx); - return NULL; - } - if (! (_this->head = head->next)) { - _this->tail = 0; - } - if(head && head->data) { - _this->size-=head->data->size; - } + if(NULL == _this)return NULL; + pthread_mutex_lock(&_this->mtx); + if (! (head = _this->head)) { + // empty head pthread_mutex_unlock(&_this->mtx); - if(head) { - data=head->data; - freebytes(head, sizeof(t_node)); - head=NULL; - } + return NULL; + } + + if (! (_this->head = head->next)) { + _this->tail = 0; + } + if(head && head->data) { + _this->size-=head->data->size; + } + + pthread_mutex_unlock(&_this->mtx); + + if(head) { + data=head->data; + freebytes(head, sizeof(t_node)); + head=NULL; } return data; } @@ -326,12 +361,15 @@ int queue_getsize(t_iemnet_queue* const _this) { } void queue_finish(t_iemnet_queue* q) { DEBUG("queue_finish: %x", q); - if(NULL==q) - return; + if(NULL==q) return; + + pthread_mutex_lock(&q->mtx); q->done=1; DEBUG("queue signaling: %x", q); pthread_cond_signal(&q->cond); DEBUG("queue signaled: %x", q); + pthread_mutex_unlock(&q->mtx); + DEBUG("queue_finished: %x", q); } void queue_destroy(t_iemnet_queue* q) { -- cgit v1.2.1