From e82deafc3a75cd51ff3d93069a60c37c334fef5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?IOhannes=20m=20zm=C3=B6lnig?= Date: Fri, 20 Aug 2010 16:54:42 +0000 Subject: fixed bugs in multithreading code svn path=/trunk/externals/iem/iemnet/; revision=13871 --- iemnet_data.c | 166 ++++++++++++++++++++++++++++++++++---------------------- iemnet_sender.c | 43 +++++++++++++-- 2 files changed, 139 insertions(+), 70 deletions(-) diff --git a/iemnet_data.c b/iemnet_data.c index 9969e9a..e671d7d 100644 --- a/iemnet_data.c +++ b/iemnet_data.c @@ -203,6 +203,7 @@ struct _iemnet_queue { /* push a chunk into the queue + * this will return the current queue size */ int queue_push( t_iemnet_queue* const _this, @@ -211,37 +212,39 @@ int queue_push( t_node* tail; t_node* n=NULL; int size=-1; - if(_this) { - size=_this->size; + if(NULL == _this)return size; - if(NULL == data) return size; - //fprintf(stderr, "pushing %d bytes\n", data->size); + pthread_mutex_lock(&_this->mtx); + size=_this->size; + pthread_mutex_unlock(&_this->mtx); - n=(t_node*)getbytes(sizeof(t_node)); + if(NULL == data) return size; - n->next = 0; - n->data = data; - pthread_mutex_lock(&_this->mtx); - if (! (tail = _this->tail)) { - _this->head = n; - } else { - tail->next = n; - } - _this->tail = n; - - _this->size+=data->size; - size=_this->size; + n=(t_node*)getbytes(sizeof(t_node)); - //fprintf(stderr, "pushed %d bytes\n", data->size); + n->next = 0; + n->data = data; - pthread_mutex_unlock(&_this->mtx); - pthread_cond_signal(&_this->cond); + pthread_mutex_lock(&_this->mtx); + if (! (tail = _this->tail)) { + _this->head = n; + } else { + tail->next = n; } + _this->tail = n; + + _this->size+=data->size; + size=_this->size; + + // added new chunk, so tell waiting threads that they can pop the data + pthread_cond_signal(&_this->cond); + pthread_mutex_unlock(&_this->mtx); + return size; } -/* push a chunk from the queue +/* pop a chunk from the queue * if the queue is empty, this will block until * something has been pushed * OR the queue is "done" (in which case NULL is returned) @@ -252,61 +255,93 @@ t_iemnet_chunk* queue_pop_block( t_node* head=0; t_iemnet_chunk*data=0; - if(_this) { - pthread_mutex_lock(&_this->mtx); - - /* wait until there is something in the queue or the "done" flag is set */ - while (! (head = _this->head)) { - if(_this->done) { - pthread_mutex_unlock(&_this->mtx); - return NULL; - } else { - pthread_cond_wait(&_this->cond, &_this->mtx); - } + if(NULL == _this)return NULL; + + pthread_mutex_lock(&_this->mtx); +#if 1 + + /* if the queue is empty, wait */ + if(NULL == _this->head) { + pthread_cond_wait(&_this->cond, &_this->mtx); + /* somebody signaled us, that we should do some work + * either the queue has been filled, or we are done... + */ + if(_this->done) { + pthread_mutex_unlock(&_this->mtx); + return NULL; } + } + /* save the head, below we gonna work on this */ + head = _this->head; - if (! (_this->head = head->next)) { - _this->tail = 0; - } - if(head && head->data) { - _this->size-=head->data->size; - } + /* update _this */ + if (! (_this->head = head->next)) { + _this->tail = 0; + } + if(head && head->data) { + _this->size-=head->data->size; + } - pthread_mutex_unlock(&_this->mtx); - if(head) { - data=head->data; - freebytes(head, sizeof(t_node)); - head=NULL; + pthread_mutex_unlock(&_this->mtx); +#else + /* wait until there is something in the queue or the "done" flag is set */ + while (! (head = _this->head)) { + if(_this->done) { + pthread_mutex_unlock(&_this->mtx); + return NULL; + } else { + pthread_cond_wait(&_this->cond, &_this->mtx); } } + + if (! (_this->head = head->next)) { + _this->tail = 0; + } + if(head && head->data) { + _this->size-=head->data->size; + } + + pthread_mutex_unlock(&_this->mtx); +#endif + + if(head) { + data=head->data; + freebytes(head, sizeof(t_node)); + head=NULL; + } return data; } - +/* pop a chunk from the queue + * if the queue is empty, this will immediately return NULL + * (note that despite of the name this does block for synchronization) + */ t_iemnet_chunk* queue_pop_noblock( t_iemnet_queue* const _this ) { t_node* head=0; t_iemnet_chunk*data=0; - if(_this) { - pthread_mutex_lock(&_this->mtx); - if (! (head = _this->head)) { - // empty head - pthread_mutex_unlock(&_this->mtx); - return NULL; - } - if (! (_this->head = head->next)) { - _this->tail = 0; - } - if(head && head->data) { - _this->size-=head->data->size; - } + if(NULL == _this)return NULL; + pthread_mutex_lock(&_this->mtx); + if (! (head = _this->head)) { + // empty head pthread_mutex_unlock(&_this->mtx); - if(head) { - data=head->data; - freebytes(head, sizeof(t_node)); - head=NULL; - } + return NULL; + } + + if (! (_this->head = head->next)) { + _this->tail = 0; + } + if(head && head->data) { + _this->size-=head->data->size; + } + + pthread_mutex_unlock(&_this->mtx); + + if(head) { + data=head->data; + freebytes(head, sizeof(t_node)); + head=NULL; } return data; } @@ -326,12 +361,15 @@ int queue_getsize(t_iemnet_queue* const _this) { } void queue_finish(t_iemnet_queue* q) { DEBUG("queue_finish: %x", q); - if(NULL==q) - return; + if(NULL==q) return; + + pthread_mutex_lock(&q->mtx); q->done=1; DEBUG("queue signaling: %x", q); pthread_cond_signal(&q->cond); DEBUG("queue signaled: %x", q); + pthread_mutex_unlock(&q->mtx); + DEBUG("queue_finished: %x", q); } void queue_destroy(t_iemnet_queue* q) { diff --git a/iemnet_sender.c b/iemnet_sender.c index 20fb93b..bdbc26d 100644 --- a/iemnet_sender.c +++ b/iemnet_sender.c @@ -28,6 +28,14 @@ #include +#if IEMNET_HAVE_DEBUG +static int debug_lockcount=0; +# define LOCK(x) do {pthread_mutex_lock(x); debug_lockcount++; post(" LOCK %d (@%d)", debug_lockcount, __LINE__); } while(0) +# define UNLOCK(x) do {debug_lockcount--;post("UNLOCK %d (@%d)", debug_lockcount, __LINE__);pthread_mutex_unlock(x);}while(0) +#else +# define LOCK(x) pthread_mutex_lock(x) +# define UNLOCK(x) pthread_mutex_unlock(x) +#endif /* draft: * - there is a sender thread for each open connection @@ -42,6 +50,8 @@ struct _iemnet_sender { t_iemnet_queue*queue; int keepsending; // indicates whether we want to thread to continue or to terminate int isrunning; + + pthread_mutex_t mtx; /* mutex to protect isrunning,.. */ }; /* the workhorse of the family */ @@ -92,15 +102,23 @@ static void*iemnet__sender_sendthread(void*arg) { while(sender->keepsending) { if(!iemnet__sender_dosend(sockfd, q))break; } + LOCK (&sender->mtx); sender->isrunning=0; + UNLOCK (&sender->mtx); DEBUG("send thread terminated"); return NULL; } int iemnet__sender_send(t_iemnet_sender*s, t_iemnet_chunk*c) { - t_iemnet_queue*q=s->queue; + t_iemnet_queue*q=0; int size=-1; - if(!s->isrunning)return -1; + LOCK (&s->mtx); + q=s->queue; + if(!s->isrunning) { + UNLOCK (&s->mtx); + return -1; + } + UNLOCK (&s->mtx); if(q) { t_iemnet_chunk*chunk=iemnet__chunk_create_chunk(c); size = queue_push(q, chunk); @@ -109,27 +127,37 @@ int iemnet__sender_send(t_iemnet_sender*s, t_iemnet_chunk*c) { } void iemnet__sender_destroy(t_iemnet_sender*s) { - int sockfd=s->sockfd; + int sockfd=-1; /* simple protection against recursive calls: * s->keepsending is only set to "0" in here, * so if it is false, we know that we are already being called */ DEBUG("destroy sender %x with queue %x (%d)", s, s->queue, s->keepsending); - if(!s->keepsending)return; + LOCK (&s->mtx); + sockfd=s->sockfd; + // check s->isrunning + DEBUG("keepsending %d\tisrunning %d", s->keepsending, s->isrunning); + if(!s->keepsending) { + UNLOCK (&s->mtx); + return; + } s->keepsending=0; + UNLOCK (&s->mtx); queue_finish(s->queue); DEBUG("queue finished"); - s->sockfd = -1; if(sockfd>=0) { - shutdown(sockfd, 2); /* needed on linux, since the recv won't shutdown on sys_closesocket() alone */ + int err=shutdown(sockfd, 2); /* needed on linux, since the recv won't shutdown on sys_closesocket() alone */ sys_closesocket(sockfd); } + pthread_join(s->thread, NULL); DEBUG("thread joined"); queue_destroy(s->queue); + pthread_mutex_destroy (&s->mtx); + memset(s, 0, sizeof(t_iemnet_sender)); freebytes(s, sizeof(t_iemnet_sender)); s=NULL; @@ -138,6 +166,7 @@ void iemnet__sender_destroy(t_iemnet_sender*s) { t_iemnet_sender*iemnet__sender_create(int sock) { + static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; t_iemnet_sender*result=(t_iemnet_sender*)getbytes(sizeof(t_iemnet_sender)); int res=0; DEBUG("create sender %x", result); @@ -151,6 +180,8 @@ t_iemnet_sender*iemnet__sender_create(int sock) { result->keepsending =1; result->isrunning=1; DEBUG("create_sender queue=%x", result->queue); + + memcpy(&result->mtx , &mtx, sizeof(pthread_mutex_t)); res=pthread_create(&result->thread, 0, iemnet__sender_sendthread, result); if(0==res) { -- cgit v1.2.1