aboutsummaryrefslogtreecommitdiff
path: root/iemnet_receiver.c
diff options
context:
space:
mode:
authorIOhannes m zmölnig <zmoelnig@users.sourceforge.net>2010-03-30 07:36:07 +0000
committerIOhannes m zmölnig <zmoelnig@users.sourceforge.net>2010-03-30 07:36:07 +0000
commit806df1c7645fdcfc945fb9e1467ea7aaada2b903 (patch)
tree0293221367f241b09da41ffaaedc3060ab52cdfb /iemnet_receiver.c
parent3b0519565763342e77515975173b681fa7171ee5 (diff)
split core library into separate files
svn path=/trunk/externals/iem/iemnet/; revision=13310
Diffstat (limited to 'iemnet_receiver.c')
-rw-r--r--iemnet_receiver.c208
1 files changed, 208 insertions, 0 deletions
diff --git a/iemnet_receiver.c b/iemnet_receiver.c
new file mode 100644
index 0000000..5359f65
--- /dev/null
+++ b/iemnet_receiver.c
@@ -0,0 +1,208 @@
+/* iemnet
+ * copyright (c) 2010 IOhannes m zmölnig, IEM
+ */
+
+//#define DEBUG
+
+#include "iemnet.h"
+#include "iemnet_data.h"
+
+#include <string.h>
+#include <stdio.h>
+#include <errno.h>
+
+#include <sys/types.h>
+
+#ifdef _WIN32
+# include <winsock2.h>
+# include <ws2tcpip.h> /* for socklen_t */
+#else
+# include <sys/socket.h>
+#endif
+
+#include <pthread.h>
+
+#define INBUFSIZE 65536L /* was 4096: size of receiving data buffer */
+
+
+struct _iemnet_receiver {
+ pthread_t thread;
+ int sockfd; /* owned outside; you must call iemnet__receiver_destroy() before freeing socket yourself */
+ void*userdata;
+ t_iemnet_chunk*data;
+ t_iemnet_receivecallback callback;
+ t_iemnet_queue*queue;
+ int running;
+ t_clock *clock;
+ t_iemnet_floatlist*flist;
+
+ int keepreceiving;
+
+ int newdataflag;
+ pthread_mutex_t newdatamtx;
+};
+
+/* notifies Pd that there is new data to fetch */
+static void iemnet_signalNewData(t_iemnet_receiver*x) {
+ int already=0;
+ pthread_mutex_lock(&x->newdatamtx);
+ already=x->newdataflag;
+ x->newdataflag=1;
+
+ /* don't schedule ticks at the end of life */
+ if(x->sockfd<0)already=1;
+
+ pthread_mutex_unlock(&x->newdatamtx);
+
+ if(already)return;
+ sys_lock();
+ if(x->clock)clock_delay(x->clock, 0);
+ sys_unlock();
+}
+
+
+/* the workhorse of the family */
+static void*iemnet__receiver_readthread(void*arg) {
+ int result = 0;
+ t_iemnet_receiver*receiver=(t_iemnet_receiver*)arg;
+
+ int sockfd=receiver->sockfd;
+ t_iemnet_queue*q=receiver->queue;
+
+ unsigned char data[INBUFSIZE];
+ unsigned int size=INBUFSIZE;
+
+ struct sockaddr_in from;
+ socklen_t fromlen = sizeof(from);
+
+ unsigned int i=0;
+ for(i=0; i<size; i++)data[i]=0;
+ receiver->running=1;
+ while(1) {
+ t_iemnet_chunk*c=NULL;
+ fromlen = sizeof(from);
+ //fprintf(stderr, "reading %d bytes...\n", size);
+ //result = recv(sockfd, data, size, 0);
+ result = recvfrom(sockfd, data, size, 0, (struct sockaddr *)&from, &fromlen);
+ //fprintf(stderr, "read %d bytes...\n", result);
+
+ if(result<=0)break;
+ c= iemnet__chunk_create_dataaddr(result, data, &from);
+
+ queue_push(q, c);
+
+ iemnet_signalNewData(receiver);
+
+ }
+ if(result>=0)iemnet_signalNewData(receiver);
+
+ receiver->running=0;
+
+ //fprintf(stderr, "read thread terminated\n");
+ return NULL;
+}
+
+/* callback from Pd's main thread to fetch queued data */
+static void iemnet__receiver_tick(t_iemnet_receiver *x)
+{
+ // received data
+ t_iemnet_chunk*c=queue_pop_noblock(x->queue);
+ while(NULL!=c) {
+ x->flist = iemnet__chunk2list(c, x->flist);
+ (x->callback)(x->userdata, c, x->flist->argc, x->flist->argv);
+ iemnet__chunk_destroy(c);
+ c=queue_pop_noblock(x->queue);
+ }
+ pthread_mutex_lock(&x->newdatamtx);
+ x->newdataflag=0;
+ pthread_mutex_unlock(&x->newdatamtx);
+
+ if(!x->running) {
+ // read terminated
+
+ /* keepreceiving is set, if receiver is not yet in shutdown mode */
+ if(x->keepreceiving)
+ x->callback(x->userdata, NULL, 0, NULL);
+ }
+}
+
+t_iemnet_receiver*iemnet__receiver_create(int sock, void*userdata, t_iemnet_receivecallback callback) {
+ static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
+ t_iemnet_receiver*rec=(t_iemnet_receiver*)getbytes(sizeof(t_iemnet_receiver));
+ DEBUG("create new receiver for 0x%X:%d", userdata, sock);
+ //fprintf(stderr, "new receiver for %d\t%x\t%x\n", sock, userdata, callback);
+ if(rec) {
+ t_iemnet_chunk*data=iemnet__chunk_create_empty(INBUFSIZE);
+ int res=0;
+ if(NULL==data) {
+ iemnet__receiver_destroy(rec);
+ DEBUG("create receiver failed");
+ return NULL;
+ }
+ rec->keepreceiving=1;
+ rec->sockfd=sock;
+ rec->userdata=userdata;
+ rec->data=data;
+ rec->callback=callback;
+ rec->flist=iemnet__floatlist_create(1024);
+
+ memcpy(&rec->newdatamtx , &mtx, sizeof(pthread_mutex_t));
+ rec->newdataflag=0;
+
+ rec->queue = queue_create();
+ rec->clock = clock_new(rec, (t_method)iemnet__receiver_tick);
+ rec->running=1;
+ res=pthread_create(&rec->thread, 0, iemnet__receiver_readthread, rec);
+ }
+ //fprintf(stderr, "new receiver created\n");
+
+ return rec;
+}
+
+void iemnet__receiver_destroy(t_iemnet_receiver*rec) {
+ static int instance=0;
+ int inst=instance++;
+
+ int sockfd;
+ DEBUG("[%d] destroy receiver %x", inst, rec);
+ if(NULL==rec)return;
+ if(!rec->keepreceiving)return;
+ rec->keepreceiving=0;
+
+
+ sockfd=rec->sockfd;
+ rec->sockfd=-1;
+
+ DEBUG("[%d] really destroying receiver %x -> %d", inst, rec, sockfd);
+
+ if(sockfd>=0) {
+ shutdown(sockfd, 2); /* needed on linux, since the recv won't shutdown on sys_closesocket() alone */
+ sys_closesocket(sockfd);
+ }
+ DEBUG("[%d] closed socket %d", inst, sockfd);
+
+ pthread_join(rec->thread, NULL);
+
+
+ // empty the queue
+ DEBUG("[%d] tick %d", inst, rec->running);
+ iemnet__receiver_tick(rec);
+ DEBUG("[%d] tack", inst);
+
+ if(rec->data)iemnet__chunk_destroy(rec->data);
+ if(rec->flist)iemnet__floatlist_destroy(rec->flist);
+
+ pthread_mutex_destroy(&rec->newdatamtx);
+
+ clock_free(rec->clock);
+ rec->clock=NULL;
+
+ rec->userdata=NULL;
+ rec->data=NULL;
+ rec->callback=NULL;
+ rec->flist=NULL;
+
+ freebytes(rec, sizeof(t_iemnet_receiver));
+ rec=NULL;
+ DEBUG("[%d] destroyed receiver", inst);
+}