aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--iemnet_receiver.c77
1 files changed, 13 insertions, 64 deletions
diff --git a/iemnet_receiver.c b/iemnet_receiver.c
index 9b4a330..f9f3fa8 100644
--- a/iemnet_receiver.c
+++ b/iemnet_receiver.c
@@ -38,60 +38,22 @@
#define INBUFSIZE 65536L /* was 4096: size of receiving data buffer */
struct _iemnet_receiver {
- pthread_t sigthread, recthread;
+ pthread_t recthread;
int sockfd; /* owned outside; you must call iemnet__receiver_destroy() before freeing socket yourself */
void*userdata;
t_iemnet_chunk*data;
t_iemnet_receivecallback callback;
t_iemnet_queue*queue;
- t_clock *clock;
- int newdataflag;
int running;
int keepreceiving;
- pthread_mutex_t newdata_mtx, running_mtx, keeprec_mtx;
- pthread_cond_t running_cond, newdata_cond;
+ pthread_mutex_t running_mtx, keeprec_mtx;
+ pthread_cond_t running_cond;
+ t_iemnet_notify*notifier;
};
-/* notifies Pd that there is new data to fetch */
-static void iemnet_signalNewData(t_iemnet_receiver*x) {
- pthread_cond_signal(&x->newdata_cond);
-}
-
-static void*iemnet__receiver_newdatathread(void*z) {
- int already=0;
-
- t_iemnet_receiver*rec= (t_iemnet_receiver*)z;
- pthread_mutex_lock (&rec->newdata_mtx);
- pthread_cond_signal(&rec->newdata_cond);
-
- while(1) {
- pthread_cond_wait(&rec->newdata_cond, &rec->newdata_mtx);
- already=rec->newdataflag;
- rec->newdataflag=1;
- pthread_mutex_unlock(&rec->newdata_mtx);
-
- pthread_mutex_lock(&rec->running_mtx);
- if(!rec->running) {
- pthread_mutex_unlock(&rec->running_mtx);
- break;
- }
- pthread_mutex_unlock(&rec->running_mtx);
-
- if(!already) {
- /* signal Pd that we have new data */
- sys_lock();
- if(rec->clock)clock_delay(rec->clock, 0);
- sys_unlock();
- }
-
- pthread_mutex_lock(&rec->newdata_mtx);
- }
-
- return 0;
-}
-
+static t_iemnet_notifier*receivenotifier=NULL;
/* the workhorse of the family */
static void*iemnet__receiver_readthread(void*arg) {
@@ -157,7 +119,8 @@ static void*iemnet__receiver_readthread(void*arg) {
DEBUG("pushing");
queue_push(q, c);
DEBUG("signalling");
- iemnet_signalNewData(receiver);
+ iemnet__notify(receiver->notifier);
+
if(result<=0) break;
@@ -189,9 +152,6 @@ static void iemnet__receiver_tick(t_iemnet_receiver *x)
c=queue_pop_noblock(x->queue);
}
DEBUG("tick cleanup");
- pthread_mutex_lock(&x->newdata_mtx);
- x->newdataflag=0;
- pthread_mutex_unlock(&x->newdata_mtx);
pthread_mutex_lock(&x->running_mtx);
running = x->running;
@@ -223,6 +183,10 @@ int iemnet__receiver_getsize(t_iemnet_receiver*x) {
t_iemnet_receiver*iemnet__receiver_create(int sock, void*userdata, t_iemnet_receivecallback callback) {
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
t_iemnet_receiver*rec=(t_iemnet_receiver*)malloc(sizeof(t_iemnet_receiver));
+
+ if(NULL==receivenotifier)
+ receivenotifier=iemnet__notify_create();
+
DEBUG("create new receiver for 0x%X:%d", userdata, sock);
//fprintf(stderr, "new receiver for %d\t%x\t%x\n", sock, userdata, callback);
if(rec) {
@@ -242,24 +206,15 @@ t_iemnet_receiver*iemnet__receiver_create(int sock, void*userdata, t_iemnet_rece
rec->data=data;
rec->callback=callback;
- pthread_mutex_init(&rec->newdata_mtx , 0);
pthread_mutex_init(&rec->running_mtx , 0);
pthread_mutex_init(&rec->keeprec_mtx , 0);
pthread_cond_init(&rec->running_cond, 0);
- pthread_cond_init(&rec->newdata_cond, 0);
- rec->newdataflag=0;
rec->running=1;
rec->queue = queue_create();
- rec->clock = clock_new(rec, (t_method)iemnet__receiver_tick);
-
- /* start the newdata-signalling thread */
- pthread_mutex_lock(&rec->newdata_mtx);
- res=pthread_create(&rec->sigthread, 0, iemnet__receiver_newdatathread, rec);
- if(!res)pthread_cond_wait(&rec->newdata_cond, &rec->newdata_mtx);
- pthread_mutex_unlock(&rec->newdata_mtx);
+ rec->notifier = iemnet__notify_add(receivenotifier, (t_iemnet_notifun)iemnet__receiver_tick, rec);
/* start the recv thread */
pthread_mutex_lock(&rec->running_mtx);
@@ -290,9 +245,8 @@ void iemnet__receiver_destroy(t_iemnet_receiver*rec) {
sockfd=rec->sockfd;
pthread_join(rec->recthread, NULL);
+ iemnet__notify_remove(rec->notifier);
- pthread_cond_signal(&rec->newdata_cond);
- pthread_join(rec->sigthread, NULL);
DEBUG("[%d] really destroying receiver %x -> %d", inst, rec, sockfd);
if(sockfd>=0) {
@@ -316,15 +270,10 @@ void iemnet__receiver_destroy(t_iemnet_receiver*rec) {
if(rec->data)iemnet__chunk_destroy(rec->data);
- pthread_mutex_destroy(&rec->newdata_mtx);
pthread_mutex_destroy(&rec->running_mtx);
pthread_mutex_destroy(&rec->keeprec_mtx);
- pthread_cond_destroy(&rec->newdata_cond);
pthread_cond_destroy(&rec->running_cond);
- clock_free(rec->clock);
- rec->clock=NULL;
-
rec->userdata=NULL;
rec->data=NULL;
rec->callback=NULL;