aboutsummaryrefslogtreecommitdiff
path: root/iemnet_receiver.c
diff options
context:
space:
mode:
Diffstat (limited to 'iemnet_receiver.c')
-rw-r--r--iemnet_receiver.c88
1 files changed, 56 insertions, 32 deletions
diff --git a/iemnet_receiver.c b/iemnet_receiver.c
index ff1f123..f1d2277 100644
--- a/iemnet_receiver.c
+++ b/iemnet_receiver.c
@@ -39,7 +39,7 @@
struct _iemnet_receiver {
- pthread_t thread;
+ pthread_t sigthread, recthread;
int sockfd; /* owned outside; you must call iemnet__receiver_destroy() before freeing socket yourself */
void*userdata;
t_iemnet_chunk*data;
@@ -53,41 +53,45 @@ struct _iemnet_receiver {
int keepreceiving;
pthread_mutex_t newdata_mtx, running_mtx, keeprec_mtx;
+ pthread_cond_t running_cond, newdata_cond;
};
/* notifies Pd that there is new data to fetch */
static void iemnet_signalNewData(t_iemnet_receiver*x) {
+ pthread_cond_signal(&x->newdata_cond);
+}
+
+static void*iemnet__receiver_newdatathread(void*z) {
int already=0;
- int trylock=0;
- pthread_mutex_lock(&x->newdata_mtx);
- already=x->newdataflag;
- x->newdataflag=1;
- /* don't schedule ticks at the end of life */
- if(x->sockfd<0)already=1;
+ t_iemnet_receiver*rec= (t_iemnet_receiver*)z;
+ pthread_mutex_lock (&rec->newdata_mtx);
+ pthread_cond_signal(&rec->newdata_cond);
- pthread_mutex_unlock(&x->newdata_mtx);
+ while(1) {
+ pthread_cond_wait(&rec->newdata_cond, &rec->newdata_mtx);
+ already=rec->newdataflag;
+ rec->newdataflag=1;
+ pthread_mutex_unlock(&rec->newdata_mtx);
+
+ pthread_mutex_lock(&rec->running_mtx);
+ if(!rec->running) {
+ pthread_mutex_unlock(&rec->running_mtx);
+ break;
+ }
+ pthread_mutex_unlock(&rec->running_mtx);
- if(already) {
- return;
- }
+ if(!already) {
+ /* signal Pd that we have new data */
+ sys_lock();
+ if(rec->clock)clock_delay(rec->clock, 0);
+ sys_unlock();
+ }
- /*
- * try to lock Pd's main mutex
- * this is bound to deadlock if this function is called from within Pd's mainthread
- * (which happens when we destroy the receiver and signalNewData is called on cleanup)
- *
- * - shan't we check whether sys_trylock() returns EBUSY ?
- */
- trylock=sys_trylock();
- switch(trylock) {
- case 0:
- case EBUSY:
- if(x->clock)clock_delay(x->clock, 0);
- if(0==trylock)sys_unlock();
- default:
- break;
+ pthread_mutex_lock(&rec->newdata_mtx);
}
+
+ return 0;
}
@@ -114,8 +118,10 @@ static void*iemnet__receiver_readthread(void*arg) {
FD_SET(sockfd, &readset);
for(i=0; i<size; i++)data[i]=0;
- pthread_mutex_lock(&receiver->running_mtx);
receiver->running=1;
+
+ pthread_mutex_lock (&receiver->running_mtx);
+ pthread_cond_signal (&receiver->running_cond);
pthread_mutex_unlock(&receiver->running_mtx);
while(1) {
@@ -238,16 +244,30 @@ t_iemnet_receiver*iemnet__receiver_create(int sock, void*userdata, t_iemnet_rece
rec->data=data;
rec->callback=callback;
- memcpy(&rec->newdata_mtx , &mtx, sizeof(pthread_mutex_t));
- memcpy(&rec->running_mtx , &mtx, sizeof(pthread_mutex_t));
- memcpy(&rec->keeprec_mtx , &mtx, sizeof(pthread_mutex_t));
+ pthread_mutex_init(&rec->newdata_mtx , 0);
+ pthread_mutex_init(&rec->running_mtx , 0);
+ pthread_mutex_init(&rec->keeprec_mtx , 0);
+
+ pthread_cond_init(&rec->running_cond, 0);
+ pthread_cond_init(&rec->newdata_cond, 0);
+
rec->newdataflag=0;
rec->running=1;
rec->queue = queue_create();
rec->clock = clock_new(rec, (t_method)iemnet__receiver_tick);
- res=pthread_create(&rec->thread, 0, iemnet__receiver_readthread, rec);
+ /* start the newdata-signalling thread */
+ pthread_mutex_lock(&rec->newdata_mtx);
+ res=pthread_create(&rec->sigthread, 0, iemnet__receiver_newdatathread, rec);
+ if(!res)pthread_cond_wait(&rec->newdata_cond, &rec->newdata_mtx);
+ pthread_mutex_unlock(&rec->newdata_mtx);
+
+ /* start the recv thread */
+ pthread_mutex_lock(&rec->running_mtx);
+ res=pthread_create(&rec->recthread, 0, iemnet__receiver_readthread, rec);
+ if(!res)pthread_cond_wait(&rec->running_cond, &rec->running_mtx);
+ pthread_mutex_unlock(&rec->running_mtx);
}
//fprintf(stderr, "new receiver created\n");
@@ -272,8 +292,10 @@ void iemnet__receiver_destroy(t_iemnet_receiver*rec) {
sockfd=rec->sockfd;
DEBUG("joining thread");
- pthread_join(rec->thread, NULL);
+ pthread_join(rec->recthread, NULL);
+ pthread_cond_signal(&rec->newdata_cond);
+ pthread_join(rec->sigthread, NULL);
DEBUG("[%d] really destroying receiver %x -> %d", inst, rec, sockfd);
if(sockfd>=0) {
@@ -300,6 +322,8 @@ void iemnet__receiver_destroy(t_iemnet_receiver*rec) {
pthread_mutex_destroy(&rec->newdata_mtx);
pthread_mutex_destroy(&rec->running_mtx);
pthread_mutex_destroy(&rec->keeprec_mtx);
+ pthread_cond_destroy(&rec->newdata_cond);
+ pthread_cond_destroy(&rec->running_cond);
clock_free(rec->clock);
rec->clock=NULL;