From e82deafc3a75cd51ff3d93069a60c37c334fef5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?IOhannes=20m=20zm=C3=B6lnig?= Date: Fri, 20 Aug 2010 16:54:42 +0000 Subject: fixed bugs in multithreading code svn path=/trunk/externals/iem/iemnet/; revision=13871 --- iemnet_sender.c | 43 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 37 insertions(+), 6 deletions(-) (limited to 'iemnet_sender.c') diff --git a/iemnet_sender.c b/iemnet_sender.c index 20fb93b..bdbc26d 100644 --- a/iemnet_sender.c +++ b/iemnet_sender.c @@ -28,6 +28,14 @@ #include +#if IEMNET_HAVE_DEBUG +static int debug_lockcount=0; +# define LOCK(x) do {pthread_mutex_lock(x); debug_lockcount++; post(" LOCK %d (@%d)", debug_lockcount, __LINE__); } while(0) +# define UNLOCK(x) do {debug_lockcount--;post("UNLOCK %d (@%d)", debug_lockcount, __LINE__);pthread_mutex_unlock(x);}while(0) +#else +# define LOCK(x) pthread_mutex_lock(x) +# define UNLOCK(x) pthread_mutex_unlock(x) +#endif /* draft: * - there is a sender thread for each open connection @@ -42,6 +50,8 @@ struct _iemnet_sender { t_iemnet_queue*queue; int keepsending; // indicates whether we want to thread to continue or to terminate int isrunning; + + pthread_mutex_t mtx; /* mutex to protect isrunning,.. */ }; /* the workhorse of the family */ @@ -92,15 +102,23 @@ static void*iemnet__sender_sendthread(void*arg) { while(sender->keepsending) { if(!iemnet__sender_dosend(sockfd, q))break; } + LOCK (&sender->mtx); sender->isrunning=0; + UNLOCK (&sender->mtx); DEBUG("send thread terminated"); return NULL; } int iemnet__sender_send(t_iemnet_sender*s, t_iemnet_chunk*c) { - t_iemnet_queue*q=s->queue; + t_iemnet_queue*q=0; int size=-1; - if(!s->isrunning)return -1; + LOCK (&s->mtx); + q=s->queue; + if(!s->isrunning) { + UNLOCK (&s->mtx); + return -1; + } + UNLOCK (&s->mtx); if(q) { t_iemnet_chunk*chunk=iemnet__chunk_create_chunk(c); size = queue_push(q, chunk); @@ -109,27 +127,37 @@ int iemnet__sender_send(t_iemnet_sender*s, t_iemnet_chunk*c) { } void iemnet__sender_destroy(t_iemnet_sender*s) { - int sockfd=s->sockfd; + int sockfd=-1; /* simple protection against recursive calls: * s->keepsending is only set to "0" in here, * so if it is false, we know that we are already being called */ DEBUG("destroy sender %x with queue %x (%d)", s, s->queue, s->keepsending); - if(!s->keepsending)return; + LOCK (&s->mtx); + sockfd=s->sockfd; + // check s->isrunning + DEBUG("keepsending %d\tisrunning %d", s->keepsending, s->isrunning); + if(!s->keepsending) { + UNLOCK (&s->mtx); + return; + } s->keepsending=0; + UNLOCK (&s->mtx); queue_finish(s->queue); DEBUG("queue finished"); - s->sockfd = -1; if(sockfd>=0) { - shutdown(sockfd, 2); /* needed on linux, since the recv won't shutdown on sys_closesocket() alone */ + int err=shutdown(sockfd, 2); /* needed on linux, since the recv won't shutdown on sys_closesocket() alone */ sys_closesocket(sockfd); } + pthread_join(s->thread, NULL); DEBUG("thread joined"); queue_destroy(s->queue); + pthread_mutex_destroy (&s->mtx); + memset(s, 0, sizeof(t_iemnet_sender)); freebytes(s, sizeof(t_iemnet_sender)); s=NULL; @@ -138,6 +166,7 @@ void iemnet__sender_destroy(t_iemnet_sender*s) { t_iemnet_sender*iemnet__sender_create(int sock) { + static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; t_iemnet_sender*result=(t_iemnet_sender*)getbytes(sizeof(t_iemnet_sender)); int res=0; DEBUG("create sender %x", result); @@ -151,6 +180,8 @@ t_iemnet_sender*iemnet__sender_create(int sock) { result->keepsending =1; result->isrunning=1; DEBUG("create_sender queue=%x", result->queue); + + memcpy(&result->mtx , &mtx, sizeof(pthread_mutex_t)); res=pthread_create(&result->thread, 0, iemnet__sender_sendthread, result); if(0==res) { -- cgit v1.2.1