From ef0fe025f5e813a8afb3f6f2d007a66da79b3ce9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?IOhannes=20m=20zm=C3=B6lnig?= Date: Sat, 27 Mar 2010 08:55:17 +0000 Subject: no need to lock the entire Pd-process svn path=/trunk/externals/iem/iemnet/; revision=13291 --- iemnet.c | 53 +++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 41 insertions(+), 12 deletions(-) diff --git a/iemnet.c b/iemnet.c index 2a9b4ce..497c036 100644 --- a/iemnet.c +++ b/iemnet.c @@ -375,7 +375,7 @@ struct _iemnet_sender { /* the workhorse of the family */ static int iemnet__sender_dosend(int sockfd, t_queue*q) { - t_iemnet_chunk*c=queue_pop(q); + t_iemnet_chunk*c=queue_pop_block(q); if(c) { unsigned char*data=c->data; unsigned int size=c->size; @@ -488,8 +488,25 @@ struct _iemnet_receiver { int running; t_clock *clock; t_iemnet_floatlist*flist; + + int newdataflag; + pthread_mutex_t newdatamtx; }; +/* notifies Pd that there is new data to fetch */ +static void iemnet_signalNewData(t_iemnet_receiver*x) { + int already=0; + pthread_mutex_lock(&x->newdatamtx); + already=x->newdataflag; + x->newdataflag=1; + pthread_mutex_unlock(&x->newdatamtx); + + if(already)return; + sys_lock(); + if(x->clock)clock_delay(x->clock, 0); + sys_unlock(); +} + /* the workhorse of the family */ static void*iemnet__receiver_readthread(void*arg) { @@ -521,13 +538,10 @@ static void*iemnet__receiver_readthread(void*arg) { queue_push(q, c); - sys_lock(); - if(receiver->clock)clock_delay(receiver->clock, 0); - sys_unlock(); + iemnet_signalNewData(receiver); + } - sys_lock(); - if(result>=0 && receiver->clock)clock_delay(receiver->clock, 0); - sys_unlock(); + if(result>=0)iemnet_signalNewData(receiver); receiver->running=0; @@ -535,6 +549,7 @@ static void*iemnet__receiver_readthread(void*arg) { return NULL; } +/* callback from Pd's main thread to fetch queued data */ static void iemnet__receiver_tick(t_iemnet_receiver *x) { // received data @@ -545,6 +560,10 @@ static void iemnet__receiver_tick(t_iemnet_receiver *x) iemnet__chunk_destroy(c); c=queue_pop_noblock(x->queue); } + pthread_mutex_lock(&x->newdatamtx); + x->newdataflag=0; + pthread_mutex_unlock(&x->newdatamtx); + if(!x->running) { // read terminated x->callback(x->userdata, NULL, 0, NULL); @@ -553,6 +572,7 @@ static void iemnet__receiver_tick(t_iemnet_receiver *x) t_iemnet_receiver*iemnet__receiver_create(int sock, void*userdata, t_iemnet_receivecallback callback) { + static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; t_iemnet_receiver*rec=(t_iemnet_receiver*)getbytes(sizeof(t_iemnet_receiver)); DEBUG("create new receiver for 0x%X:%d", userdata, sock); //fprintf(stderr, "new receiver for %d\t%x\t%x\n", sock, userdata, callback); @@ -570,6 +590,9 @@ t_iemnet_receiver*iemnet__receiver_create(int sock, void*userdata, t_iemnet_rece rec->callback=callback; rec->flist=iemnet__floatlist_create(1024); + memcpy(&rec->newdatamtx , &mtx, sizeof(pthread_mutex_t)); + rec->newdataflag=0; + rec->queue = queue_create(); rec->clock = clock_new(rec, (t_method)iemnet__receiver_tick); rec->running=1; @@ -582,20 +605,26 @@ t_iemnet_receiver*iemnet__receiver_create(int sock, void*userdata, t_iemnet_rece void iemnet__receiver_destroy(t_iemnet_receiver*rec) { DEBUG("destroy receiver %x", rec); if(NULL==rec)return; - if(rec->data)iemnet__chunk_destroy(rec->data); - if(rec->flist)iemnet__floatlist_destroy(rec->flist); - clock_free(rec->clock); - rec->clock=NULL; shutdown(rec->sockfd, 2); /* needed on linux, since the recv won't shutdown on sys_closesocket() alone */ sys_closesocket(rec->sockfd); rec->sockfd=0; pthread_join(rec->thread, NULL); + iemnet__receiver_tick(rec); + + + if(rec->data)iemnet__chunk_destroy(rec->data); + if(rec->flist)iemnet__floatlist_destroy(rec->flist); + + pthread_mutex_destroy(&rec->newdatamtx); + + clock_free(rec->clock); + rec->clock=NULL; + rec->userdata=NULL; rec->data=NULL; rec->callback=NULL; - rec->clock=NULL; rec->flist=NULL; freebytes(rec, sizeof(t_iemnet_receiver)); -- cgit v1.2.1