From 6bec37a0361aaa6e9deab125050f382fdbb03580 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?IOhannes=20m=20zm=C3=B6lnig?= Date: Fri, 13 Jul 2012 20:46:26 +0000 Subject: use separate thread for signalling new data to Pd this hopefully fixes the "hang after a few hours" bug (Closes SF#3541431) svn path=/trunk/externals/iem/iemnet/; revision=16154 --- iemnet_receiver.c | 88 +++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 56 insertions(+), 32 deletions(-) diff --git a/iemnet_receiver.c b/iemnet_receiver.c index ff1f123..f1d2277 100644 --- a/iemnet_receiver.c +++ b/iemnet_receiver.c @@ -39,7 +39,7 @@ struct _iemnet_receiver { - pthread_t thread; + pthread_t sigthread, recthread; int sockfd; /* owned outside; you must call iemnet__receiver_destroy() before freeing socket yourself */ void*userdata; t_iemnet_chunk*data; @@ -53,41 +53,45 @@ struct _iemnet_receiver { int keepreceiving; pthread_mutex_t newdata_mtx, running_mtx, keeprec_mtx; + pthread_cond_t running_cond, newdata_cond; }; /* notifies Pd that there is new data to fetch */ static void iemnet_signalNewData(t_iemnet_receiver*x) { + pthread_cond_signal(&x->newdata_cond); +} + +static void*iemnet__receiver_newdatathread(void*z) { int already=0; - int trylock=0; - pthread_mutex_lock(&x->newdata_mtx); - already=x->newdataflag; - x->newdataflag=1; - /* don't schedule ticks at the end of life */ - if(x->sockfd<0)already=1; + t_iemnet_receiver*rec= (t_iemnet_receiver*)z; + pthread_mutex_lock (&rec->newdata_mtx); + pthread_cond_signal(&rec->newdata_cond); - pthread_mutex_unlock(&x->newdata_mtx); + while(1) { + pthread_cond_wait(&rec->newdata_cond, &rec->newdata_mtx); + already=rec->newdataflag; + rec->newdataflag=1; + pthread_mutex_unlock(&rec->newdata_mtx); + + pthread_mutex_lock(&rec->running_mtx); + if(!rec->running) { + pthread_mutex_unlock(&rec->running_mtx); + break; + } + pthread_mutex_unlock(&rec->running_mtx); - if(already) { - return; - } + if(!already) { + /* signal Pd that we have new data */ + sys_lock(); + if(rec->clock)clock_delay(rec->clock, 0); + sys_unlock(); + } - /* - * try to lock Pd's main mutex - * this is bound to deadlock if this function is called from within Pd's mainthread - * (which happens when we destroy the receiver and signalNewData is called on cleanup) - * - * - shan't we check whether sys_trylock() returns EBUSY ? - */ - trylock=sys_trylock(); - switch(trylock) { - case 0: - case EBUSY: - if(x->clock)clock_delay(x->clock, 0); - if(0==trylock)sys_unlock(); - default: - break; + pthread_mutex_lock(&rec->newdata_mtx); } + + return 0; } @@ -114,8 +118,10 @@ static void*iemnet__receiver_readthread(void*arg) { FD_SET(sockfd, &readset); for(i=0; irunning_mtx); receiver->running=1; + + pthread_mutex_lock (&receiver->running_mtx); + pthread_cond_signal (&receiver->running_cond); pthread_mutex_unlock(&receiver->running_mtx); while(1) { @@ -238,16 +244,30 @@ t_iemnet_receiver*iemnet__receiver_create(int sock, void*userdata, t_iemnet_rece rec->data=data; rec->callback=callback; - memcpy(&rec->newdata_mtx , &mtx, sizeof(pthread_mutex_t)); - memcpy(&rec->running_mtx , &mtx, sizeof(pthread_mutex_t)); - memcpy(&rec->keeprec_mtx , &mtx, sizeof(pthread_mutex_t)); + pthread_mutex_init(&rec->newdata_mtx , 0); + pthread_mutex_init(&rec->running_mtx , 0); + pthread_mutex_init(&rec->keeprec_mtx , 0); + + pthread_cond_init(&rec->running_cond, 0); + pthread_cond_init(&rec->newdata_cond, 0); + rec->newdataflag=0; rec->running=1; rec->queue = queue_create(); rec->clock = clock_new(rec, (t_method)iemnet__receiver_tick); - res=pthread_create(&rec->thread, 0, iemnet__receiver_readthread, rec); + /* start the newdata-signalling thread */ + pthread_mutex_lock(&rec->newdata_mtx); + res=pthread_create(&rec->sigthread, 0, iemnet__receiver_newdatathread, rec); + if(!res)pthread_cond_wait(&rec->newdata_cond, &rec->newdata_mtx); + pthread_mutex_unlock(&rec->newdata_mtx); + + /* start the recv thread */ + pthread_mutex_lock(&rec->running_mtx); + res=pthread_create(&rec->recthread, 0, iemnet__receiver_readthread, rec); + if(!res)pthread_cond_wait(&rec->running_cond, &rec->running_mtx); + pthread_mutex_unlock(&rec->running_mtx); } //fprintf(stderr, "new receiver created\n"); @@ -272,8 +292,10 @@ void iemnet__receiver_destroy(t_iemnet_receiver*rec) { sockfd=rec->sockfd; DEBUG("joining thread"); - pthread_join(rec->thread, NULL); + pthread_join(rec->recthread, NULL); + pthread_cond_signal(&rec->newdata_cond); + pthread_join(rec->sigthread, NULL); DEBUG("[%d] really destroying receiver %x -> %d", inst, rec, sockfd); if(sockfd>=0) { @@ -300,6 +322,8 @@ void iemnet__receiver_destroy(t_iemnet_receiver*rec) { pthread_mutex_destroy(&rec->newdata_mtx); pthread_mutex_destroy(&rec->running_mtx); pthread_mutex_destroy(&rec->keeprec_mtx); + pthread_cond_destroy(&rec->newdata_cond); + pthread_cond_destroy(&rec->running_cond); clock_free(rec->clock); rec->clock=NULL; -- cgit v1.2.1