aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIOhannes m zmölnig <zmoelnig@users.sourceforge.net>2010-09-14 08:39:48 +0000
committerIOhannes m zmölnig <zmoelnig@users.sourceforge.net>2010-09-14 08:39:48 +0000
commit0382fa293070ba7400fd86bafeabf1d0b61d0fb1 (patch)
tree8acf7371cd07ea1e1d7acd5f9bd0174bcea3926f
parent2be21576e46acf6b22c2c4d4191b2ccc9e6671f6 (diff)
on the way to thread safety...(?)
a number of new mutexes to protect what is there to protect. avoid deadlocks when using the big pd-lock svn path=/trunk/externals/iem/iemnet/; revision=14138
-rw-r--r--iemnet_receiver.c156
1 files changed, 111 insertions, 45 deletions
diff --git a/iemnet_receiver.c b/iemnet_receiver.c
index 98e72c8..0576c4e 100644
--- a/iemnet_receiver.c
+++ b/iemnet_receiver.c
@@ -24,36 +24,49 @@ struct _iemnet_receiver {
t_iemnet_chunk*data;
t_iemnet_receivecallback callback;
t_iemnet_queue*queue;
- int running;
t_clock *clock;
- int keepreceiving;
int newdataflag;
- pthread_mutex_t newdatamtx;
+ int running;
+ int keepreceiving;
+
+ pthread_mutex_t newdata_mtx, running_mtx, keeprec_mtx;
};
/* notifies Pd that there is new data to fetch */
static void iemnet_signalNewData(t_iemnet_receiver*x) {
int already=0;
- pthread_mutex_lock(&x->newdatamtx);
- already=x->newdataflag;
- x->newdataflag=1;
+ int locked=0;
+ pthread_mutex_lock(&x->newdata_mtx);
+ already=x->newdataflag;
+ x->newdataflag=1;
+
+ /* don't schedule ticks at the end of life */
+ if(x->sockfd<0)already=1;
- /* don't schedule ticks at the end of life */
- if(x->sockfd<0)already=1;
+ pthread_mutex_unlock(&x->newdata_mtx);
- pthread_mutex_unlock(&x->newdatamtx);
+ if(already) {
+ return;
+ }
- if(already)return;
- sys_lock();
+ /*
+ * try to lock Pd's main mutex
+ * this is bound to deadlock if this function is called from within Pd's mainthread
+ * (which happens when we destroy the receiver and signalNewData is called on cleanup)
+ *
+ * - shan't we check whether sys_trylock() returns EBUSY ?
+ */
+ locked=(0==sys_trylock());
if(x->clock)clock_delay(x->clock, 0);
- sys_unlock();
+ if(locked)sys_unlock();
}
/* the workhorse of the family */
static void*iemnet__receiver_readthread(void*arg) {
+ unsigned int i=0;
int result = 0;
t_iemnet_receiver*receiver=(t_iemnet_receiver*)arg;
@@ -66,32 +79,60 @@ static void*iemnet__receiver_readthread(void*arg) {
struct sockaddr_in from;
socklen_t fromlen = sizeof(from);
- unsigned int i=0;
+ struct timeval timout;
+ fd_set readset;
+ FD_ZERO(&readset);
+ FD_SET(sockfd, &readset);
+
for(i=0; i<size; i++)data[i]=0;
- receiver->running=1;
+ pthread_mutex_lock(&receiver->running_mtx);
+ receiver->running=1;
+ pthread_mutex_unlock(&receiver->running_mtx);
+
while(1) {
t_iemnet_chunk*c=NULL;
- fromlen = sizeof(from);
- //fprintf(stderr, "reading %d bytes...\n", size);
- //result = recv(sockfd, data, size, 0);
- result = recvfrom(sockfd, data, size, 0, (struct sockaddr *)&from, &fromlen);
- //fprintf(stderr, "read %d bytes...\n", result);
- DEBUG("recfrom %d bytes", result);
- if(result<=0)break;
- c= iemnet__chunk_create_dataaddr(result, data, &from);
-
- queue_push(q, c);
-
- iemnet_signalNewData(receiver);
+ pthread_mutex_lock(&receiver->keeprec_mtx);
+ if(!receiver->keepreceiving) {
+ pthread_mutex_unlock(&receiver->keeprec_mtx);
+ break;
+ }
+ pthread_mutex_unlock(&receiver->keeprec_mtx);
+
+ fromlen = sizeof(from);
+ fd_set rs=readset;
+ timout.tv_sec=0;
+ timout.tv_usec=1000;
+
+ select(sockfd+1, &rs, NULL, NULL,
+ &timout);
+ if (FD_ISSET(sockfd, &rs)) {
+ DEBUG("select can read");
+
+ //fprintf(stderr, "reading %d bytes...\n", size);
+ //result = recv(sockfd, data, size, 0);
+
+ result = recvfrom(sockfd, data, size, 0, (struct sockaddr *)&from, &fromlen);
+ //fprintf(stderr, "read %d bytes...\n", result);
+ DEBUG("recfrom %d bytes", result);
+ if(result<=0)break;
+ c= iemnet__chunk_create_dataaddr(result, data, &from);
+ DEBUG("pushing");
+ queue_push(q, c);
+ DEBUG("signalling");
+ iemnet_signalNewData(receiver);
+ DEBUG("rereceive");
+ }
}
// oha
DEBUG("readthread loop termination: %d", result);
- if(result>=0)iemnet_signalNewData(receiver);
+ //if(result>=0)iemnet_signalNewData(receiver);
- receiver->running=0;
+ pthread_mutex_lock(&receiver->running_mtx);
+ receiver->running=0;
+ pthread_mutex_unlock(&receiver->running_mtx);
- //fprintf(stderr, "read thread terminated\n");
+ DEBUG("read thread terminated");
return NULL;
}
@@ -108,16 +149,21 @@ static void iemnet__receiver_tick(t_iemnet_receiver *x)
c=queue_pop_noblock(x->queue);
}
DEBUG("tick cleanup");
- pthread_mutex_lock(&x->newdatamtx);
- x->newdataflag=0;
- running = x->running;
- keepreceiving=x->keepreceiving;
- pthread_mutex_unlock(&x->newdatamtx);
-
- DEBUG("tick running %d (%d)", running, keepreceiving);
+ pthread_mutex_lock(&x->newdata_mtx);
+ x->newdataflag=0;
+ pthread_mutex_unlock(&x->newdata_mtx);
+
+ pthread_mutex_lock(&x->running_mtx);
+ running = x->running;
+ pthread_mutex_unlock(&x->running_mtx);
+
+ DEBUG("tick running %d", running);
if(!running) {
// read terminated
-
+ pthread_mutex_lock(&x->keeprec_mtx);
+ keepreceiving=x->keepreceiving;
+ pthread_mutex_unlock(&x->keeprec_mtx);
+
/* keepreceiving is set, if receiver is not yet in shutdown mode */
if(keepreceiving)
x->callback(x->userdata, NULL);
@@ -140,25 +186,31 @@ t_iemnet_receiver*iemnet__receiver_create(int sock, void*userdata, t_iemnet_rece
DEBUG("create new receiver for 0x%X:%d", userdata, sock);
//fprintf(stderr, "new receiver for %d\t%x\t%x\n", sock, userdata, callback);
if(rec) {
- t_iemnet_chunk*data=iemnet__chunk_create_empty(INBUFSIZE);
+ t_iemnet_chunk*data=NULL;
int res=0;
+
+ data=iemnet__chunk_create_empty(INBUFSIZE);
if(NULL==data) {
iemnet__receiver_destroy(rec);
DEBUG("create receiver failed");
return NULL;
}
+
rec->keepreceiving=1;
rec->sockfd=sock;
rec->userdata=userdata;
rec->data=data;
rec->callback=callback;
- memcpy(&rec->newdatamtx , &mtx, sizeof(pthread_mutex_t));
+ memcpy(&rec->newdata_mtx , &mtx, sizeof(pthread_mutex_t));
+ memcpy(&rec->running_mtx , &mtx, sizeof(pthread_mutex_t));
+ memcpy(&rec->keeprec_mtx , &mtx, sizeof(pthread_mutex_t));
rec->newdataflag=0;
+ rec->running=1;
rec->queue = queue_create();
rec->clock = clock_new(rec, (t_method)iemnet__receiver_tick);
- rec->running=1;
+
res=pthread_create(&rec->thread, 0, iemnet__receiver_readthread, rec);
}
//fprintf(stderr, "new receiver created\n");
@@ -173,21 +225,33 @@ void iemnet__receiver_destroy(t_iemnet_receiver*rec) {
int sockfd;
DEBUG("[%d] destroy receiver %x", inst, rec);
if(NULL==rec)return;
- if(!rec->keepreceiving)return;
- rec->keepreceiving=0;
+ pthread_mutex_lock(&rec->keeprec_mtx);
+ if(!rec->keepreceiving) {
+ pthread_mutex_unlock(&rec->keeprec_mtx);
+ return;
+ }
+ rec->keepreceiving=0;
+ pthread_mutex_unlock(&rec->keeprec_mtx);
sockfd=rec->sockfd;
- rec->sockfd=-1;
+
+ DEBUG("joining thread");
+ pthread_join(rec->thread, NULL);
DEBUG("[%d] really destroying receiver %x -> %d", inst, rec, sockfd);
if(sockfd>=0) {
+ /* this doesn't alway make recvfrom() return!
+ * - try polling
+ * - try sending a signal with pthread_kill() ?
+ */
+
shutdown(sockfd, 2); /* needed on linux, since the recv won't shutdown on sys_closesocket() alone */
sys_closesocket(sockfd);
}
DEBUG("[%d] closed socket %d", inst, sockfd);
- pthread_join(rec->thread, NULL);
+ rec->sockfd=-1;
// empty the queue
DEBUG("[%d] tick %d", inst, rec->running);
@@ -197,7 +261,9 @@ void iemnet__receiver_destroy(t_iemnet_receiver*rec) {
if(rec->data)iemnet__chunk_destroy(rec->data);
- pthread_mutex_destroy(&rec->newdatamtx);
+ pthread_mutex_destroy(&rec->newdata_mtx);
+ pthread_mutex_destroy(&rec->running_mtx);
+ pthread_mutex_destroy(&rec->keeprec_mtx);
clock_free(rec->clock);
rec->clock=NULL;