From 20365b87579947d7b099ad7eb65fcd7b33ce2397 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?IOhannes=20m=20zm=C3=B6lnig?= Date: Sun, 14 Apr 2013 21:04:28 +0000 Subject: switch to new notify system test how this behaves under heavy load... svn path=/trunk/externals/iem/iemnet/; revision=17092 --- iemnet_receiver.c | 77 ++++++++++--------------------------------------------- 1 file changed, 13 insertions(+), 64 deletions(-) diff --git a/iemnet_receiver.c b/iemnet_receiver.c index 9b4a330..f9f3fa8 100644 --- a/iemnet_receiver.c +++ b/iemnet_receiver.c @@ -38,60 +38,22 @@ #define INBUFSIZE 65536L /* was 4096: size of receiving data buffer */ struct _iemnet_receiver { - pthread_t sigthread, recthread; + pthread_t recthread; int sockfd; /* owned outside; you must call iemnet__receiver_destroy() before freeing socket yourself */ void*userdata; t_iemnet_chunk*data; t_iemnet_receivecallback callback; t_iemnet_queue*queue; - t_clock *clock; - int newdataflag; int running; int keepreceiving; - pthread_mutex_t newdata_mtx, running_mtx, keeprec_mtx; - pthread_cond_t running_cond, newdata_cond; + pthread_mutex_t running_mtx, keeprec_mtx; + pthread_cond_t running_cond; + t_iemnet_notify*notifier; }; -/* notifies Pd that there is new data to fetch */ -static void iemnet_signalNewData(t_iemnet_receiver*x) { - pthread_cond_signal(&x->newdata_cond); -} - -static void*iemnet__receiver_newdatathread(void*z) { - int already=0; - - t_iemnet_receiver*rec= (t_iemnet_receiver*)z; - pthread_mutex_lock (&rec->newdata_mtx); - pthread_cond_signal(&rec->newdata_cond); - - while(1) { - pthread_cond_wait(&rec->newdata_cond, &rec->newdata_mtx); - already=rec->newdataflag; - rec->newdataflag=1; - pthread_mutex_unlock(&rec->newdata_mtx); - - pthread_mutex_lock(&rec->running_mtx); - if(!rec->running) { - pthread_mutex_unlock(&rec->running_mtx); - break; - } - pthread_mutex_unlock(&rec->running_mtx); - - if(!already) { - /* signal Pd that we have new data */ - sys_lock(); - if(rec->clock)clock_delay(rec->clock, 0); - sys_unlock(); - } - - pthread_mutex_lock(&rec->newdata_mtx); - } - - return 0; -} - +static t_iemnet_notifier*receivenotifier=NULL; /* the workhorse of the family */ static void*iemnet__receiver_readthread(void*arg) { @@ -157,7 +119,8 @@ static void*iemnet__receiver_readthread(void*arg) { DEBUG("pushing"); queue_push(q, c); DEBUG("signalling"); - iemnet_signalNewData(receiver); + iemnet__notify(receiver->notifier); + if(result<=0) break; @@ -189,9 +152,6 @@ static void iemnet__receiver_tick(t_iemnet_receiver *x) c=queue_pop_noblock(x->queue); } DEBUG("tick cleanup"); - pthread_mutex_lock(&x->newdata_mtx); - x->newdataflag=0; - pthread_mutex_unlock(&x->newdata_mtx); pthread_mutex_lock(&x->running_mtx); running = x->running; @@ -223,6 +183,10 @@ int iemnet__receiver_getsize(t_iemnet_receiver*x) { t_iemnet_receiver*iemnet__receiver_create(int sock, void*userdata, t_iemnet_receivecallback callback) { static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; t_iemnet_receiver*rec=(t_iemnet_receiver*)malloc(sizeof(t_iemnet_receiver)); + + if(NULL==receivenotifier) + receivenotifier=iemnet__notify_create(); + DEBUG("create new receiver for 0x%X:%d", userdata, sock); //fprintf(stderr, "new receiver for %d\t%x\t%x\n", sock, userdata, callback); if(rec) { @@ -242,24 +206,15 @@ t_iemnet_receiver*iemnet__receiver_create(int sock, void*userdata, t_iemnet_rece rec->data=data; rec->callback=callback; - pthread_mutex_init(&rec->newdata_mtx , 0); pthread_mutex_init(&rec->running_mtx , 0); pthread_mutex_init(&rec->keeprec_mtx , 0); pthread_cond_init(&rec->running_cond, 0); - pthread_cond_init(&rec->newdata_cond, 0); - rec->newdataflag=0; rec->running=1; rec->queue = queue_create(); - rec->clock = clock_new(rec, (t_method)iemnet__receiver_tick); - - /* start the newdata-signalling thread */ - pthread_mutex_lock(&rec->newdata_mtx); - res=pthread_create(&rec->sigthread, 0, iemnet__receiver_newdatathread, rec); - if(!res)pthread_cond_wait(&rec->newdata_cond, &rec->newdata_mtx); - pthread_mutex_unlock(&rec->newdata_mtx); + rec->notifier = iemnet__notify_add(receivenotifier, (t_iemnet_notifun)iemnet__receiver_tick, rec); /* start the recv thread */ pthread_mutex_lock(&rec->running_mtx); @@ -290,9 +245,8 @@ void iemnet__receiver_destroy(t_iemnet_receiver*rec) { sockfd=rec->sockfd; pthread_join(rec->recthread, NULL); + iemnet__notify_remove(rec->notifier); - pthread_cond_signal(&rec->newdata_cond); - pthread_join(rec->sigthread, NULL); DEBUG("[%d] really destroying receiver %x -> %d", inst, rec, sockfd); if(sockfd>=0) { @@ -316,15 +270,10 @@ void iemnet__receiver_destroy(t_iemnet_receiver*rec) { if(rec->data)iemnet__chunk_destroy(rec->data); - pthread_mutex_destroy(&rec->newdata_mtx); pthread_mutex_destroy(&rec->running_mtx); pthread_mutex_destroy(&rec->keeprec_mtx); - pthread_cond_destroy(&rec->newdata_cond); pthread_cond_destroy(&rec->running_cond); - clock_free(rec->clock); - rec->clock=NULL; - rec->userdata=NULL; rec->data=NULL; rec->callback=NULL; -- cgit v1.2.1