From 0382fa293070ba7400fd86bafeabf1d0b61d0fb1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?IOhannes=20m=20zm=C3=B6lnig?= Date: Tue, 14 Sep 2010 08:39:48 +0000 Subject: on the way to thread safety...(?) a number of new mutexes to protect what is there to protect. avoid deadlocks when using the big pd-lock svn path=/trunk/externals/iem/iemnet/; revision=14138 --- iemnet_receiver.c | 156 ++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 111 insertions(+), 45 deletions(-) diff --git a/iemnet_receiver.c b/iemnet_receiver.c index 98e72c8..0576c4e 100644 --- a/iemnet_receiver.c +++ b/iemnet_receiver.c @@ -24,36 +24,49 @@ struct _iemnet_receiver { t_iemnet_chunk*data; t_iemnet_receivecallback callback; t_iemnet_queue*queue; - int running; t_clock *clock; - int keepreceiving; int newdataflag; - pthread_mutex_t newdatamtx; + int running; + int keepreceiving; + + pthread_mutex_t newdata_mtx, running_mtx, keeprec_mtx; }; /* notifies Pd that there is new data to fetch */ static void iemnet_signalNewData(t_iemnet_receiver*x) { int already=0; - pthread_mutex_lock(&x->newdatamtx); - already=x->newdataflag; - x->newdataflag=1; + int locked=0; + pthread_mutex_lock(&x->newdata_mtx); + already=x->newdataflag; + x->newdataflag=1; + + /* don't schedule ticks at the end of life */ + if(x->sockfd<0)already=1; - /* don't schedule ticks at the end of life */ - if(x->sockfd<0)already=1; + pthread_mutex_unlock(&x->newdata_mtx); - pthread_mutex_unlock(&x->newdatamtx); + if(already) { + return; + } - if(already)return; - sys_lock(); + /* + * try to lock Pd's main mutex + * this is bound to deadlock if this function is called from within Pd's mainthread + * (which happens when we destroy the receiver and signalNewData is called on cleanup) + * + * - shan't we check whether sys_trylock() returns EBUSY ? + */ + locked=(0==sys_trylock()); if(x->clock)clock_delay(x->clock, 0); - sys_unlock(); + if(locked)sys_unlock(); } /* the workhorse of the family */ static void*iemnet__receiver_readthread(void*arg) { + unsigned int i=0; int result = 0; t_iemnet_receiver*receiver=(t_iemnet_receiver*)arg; @@ -66,32 +79,60 @@ static void*iemnet__receiver_readthread(void*arg) { struct sockaddr_in from; socklen_t fromlen = sizeof(from); - unsigned int i=0; + struct timeval timout; + fd_set readset; + FD_ZERO(&readset); + FD_SET(sockfd, &readset); + for(i=0; irunning=1; + pthread_mutex_lock(&receiver->running_mtx); + receiver->running=1; + pthread_mutex_unlock(&receiver->running_mtx); + while(1) { t_iemnet_chunk*c=NULL; - fromlen = sizeof(from); - //fprintf(stderr, "reading %d bytes...\n", size); - //result = recv(sockfd, data, size, 0); - result = recvfrom(sockfd, data, size, 0, (struct sockaddr *)&from, &fromlen); - //fprintf(stderr, "read %d bytes...\n", result); - DEBUG("recfrom %d bytes", result); - if(result<=0)break; - c= iemnet__chunk_create_dataaddr(result, data, &from); - - queue_push(q, c); - - iemnet_signalNewData(receiver); + pthread_mutex_lock(&receiver->keeprec_mtx); + if(!receiver->keepreceiving) { + pthread_mutex_unlock(&receiver->keeprec_mtx); + break; + } + pthread_mutex_unlock(&receiver->keeprec_mtx); + + fromlen = sizeof(from); + fd_set rs=readset; + timout.tv_sec=0; + timout.tv_usec=1000; + + select(sockfd+1, &rs, NULL, NULL, + &timout); + if (FD_ISSET(sockfd, &rs)) { + DEBUG("select can read"); + + //fprintf(stderr, "reading %d bytes...\n", size); + //result = recv(sockfd, data, size, 0); + + result = recvfrom(sockfd, data, size, 0, (struct sockaddr *)&from, &fromlen); + //fprintf(stderr, "read %d bytes...\n", result); + DEBUG("recfrom %d bytes", result); + if(result<=0)break; + c= iemnet__chunk_create_dataaddr(result, data, &from); + DEBUG("pushing"); + queue_push(q, c); + DEBUG("signalling"); + iemnet_signalNewData(receiver); + DEBUG("rereceive"); + } } // oha DEBUG("readthread loop termination: %d", result); - if(result>=0)iemnet_signalNewData(receiver); + //if(result>=0)iemnet_signalNewData(receiver); - receiver->running=0; + pthread_mutex_lock(&receiver->running_mtx); + receiver->running=0; + pthread_mutex_unlock(&receiver->running_mtx); - //fprintf(stderr, "read thread terminated\n"); + DEBUG("read thread terminated"); return NULL; } @@ -108,16 +149,21 @@ static void iemnet__receiver_tick(t_iemnet_receiver *x) c=queue_pop_noblock(x->queue); } DEBUG("tick cleanup"); - pthread_mutex_lock(&x->newdatamtx); - x->newdataflag=0; - running = x->running; - keepreceiving=x->keepreceiving; - pthread_mutex_unlock(&x->newdatamtx); - - DEBUG("tick running %d (%d)", running, keepreceiving); + pthread_mutex_lock(&x->newdata_mtx); + x->newdataflag=0; + pthread_mutex_unlock(&x->newdata_mtx); + + pthread_mutex_lock(&x->running_mtx); + running = x->running; + pthread_mutex_unlock(&x->running_mtx); + + DEBUG("tick running %d", running); if(!running) { // read terminated - + pthread_mutex_lock(&x->keeprec_mtx); + keepreceiving=x->keepreceiving; + pthread_mutex_unlock(&x->keeprec_mtx); + /* keepreceiving is set, if receiver is not yet in shutdown mode */ if(keepreceiving) x->callback(x->userdata, NULL); @@ -140,25 +186,31 @@ t_iemnet_receiver*iemnet__receiver_create(int sock, void*userdata, t_iemnet_rece DEBUG("create new receiver for 0x%X:%d", userdata, sock); //fprintf(stderr, "new receiver for %d\t%x\t%x\n", sock, userdata, callback); if(rec) { - t_iemnet_chunk*data=iemnet__chunk_create_empty(INBUFSIZE); + t_iemnet_chunk*data=NULL; int res=0; + + data=iemnet__chunk_create_empty(INBUFSIZE); if(NULL==data) { iemnet__receiver_destroy(rec); DEBUG("create receiver failed"); return NULL; } + rec->keepreceiving=1; rec->sockfd=sock; rec->userdata=userdata; rec->data=data; rec->callback=callback; - memcpy(&rec->newdatamtx , &mtx, sizeof(pthread_mutex_t)); + memcpy(&rec->newdata_mtx , &mtx, sizeof(pthread_mutex_t)); + memcpy(&rec->running_mtx , &mtx, sizeof(pthread_mutex_t)); + memcpy(&rec->keeprec_mtx , &mtx, sizeof(pthread_mutex_t)); rec->newdataflag=0; + rec->running=1; rec->queue = queue_create(); rec->clock = clock_new(rec, (t_method)iemnet__receiver_tick); - rec->running=1; + res=pthread_create(&rec->thread, 0, iemnet__receiver_readthread, rec); } //fprintf(stderr, "new receiver created\n"); @@ -173,21 +225,33 @@ void iemnet__receiver_destroy(t_iemnet_receiver*rec) { int sockfd; DEBUG("[%d] destroy receiver %x", inst, rec); if(NULL==rec)return; - if(!rec->keepreceiving)return; - rec->keepreceiving=0; + pthread_mutex_lock(&rec->keeprec_mtx); + if(!rec->keepreceiving) { + pthread_mutex_unlock(&rec->keeprec_mtx); + return; + } + rec->keepreceiving=0; + pthread_mutex_unlock(&rec->keeprec_mtx); sockfd=rec->sockfd; - rec->sockfd=-1; + + DEBUG("joining thread"); + pthread_join(rec->thread, NULL); DEBUG("[%d] really destroying receiver %x -> %d", inst, rec, sockfd); if(sockfd>=0) { + /* this doesn't alway make recvfrom() return! + * - try polling + * - try sending a signal with pthread_kill() ? + */ + shutdown(sockfd, 2); /* needed on linux, since the recv won't shutdown on sys_closesocket() alone */ sys_closesocket(sockfd); } DEBUG("[%d] closed socket %d", inst, sockfd); - pthread_join(rec->thread, NULL); + rec->sockfd=-1; // empty the queue DEBUG("[%d] tick %d", inst, rec->running); @@ -197,7 +261,9 @@ void iemnet__receiver_destroy(t_iemnet_receiver*rec) { if(rec->data)iemnet__chunk_destroy(rec->data); - pthread_mutex_destroy(&rec->newdatamtx); + pthread_mutex_destroy(&rec->newdata_mtx); + pthread_mutex_destroy(&rec->running_mtx); + pthread_mutex_destroy(&rec->keeprec_mtx); clock_free(rec->clock); rec->clock=NULL; -- cgit v1.2.1