From 5cdf7066e1ceb823afa572fa596c37a935911cde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?IOhannes=20m=20zm=C3=B6lnig?= Date: Tue, 23 Mar 2010 19:05:53 +0000 Subject: hmm, less crashes; threads hang svn path=/trunk/externals/iem/iemnet/; revision=13245 --- shared.c | 79 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 70 insertions(+), 9 deletions(-) (limited to 'shared.c') diff --git a/shared.c b/shared.c index 14311d8..819afb3 100644 --- a/shared.c +++ b/shared.c @@ -53,6 +53,24 @@ t_iemnet_chunk* iemnet__chunk_create_empty(int size) { return result; } +t_iemnet_chunk* iemnet__chunk_create_data(int size, unsigned char*data) { + t_iemnet_chunk*result=(t_iemnet_chunk*)getbytes(sizeof(t_iemnet_chunk)); + if(result) { + result->size=size; + result->data=(unsigned char*)getbytes(sizeof(unsigned char)*size); + + if(NULL == result->data) { + result->size=0; + iemnet__chunk_destroy(result); + return NULL; + } + + memcpy(result->data, data, result->size); + } + return result; +} + + t_iemnet_chunk* iemnet__chunk_create_list(int argc, t_atom*argv) { t_iemnet_chunk*result=(t_iemnet_chunk*)getbytes(sizeof(t_iemnet_chunk)); int i; @@ -136,7 +154,7 @@ int queue_push( int size=_this->size; if(NULL == data) return size; - fprintf(stderr, "pushing %d bytes\n", data->size); + //fprintf(stderr, "pushing %d bytes\n", data->size); n=(t_node*)getbytes(sizeof(t_node)); @@ -154,7 +172,7 @@ int queue_push( size=_this->size; - fprintf(stderr, "pushed %d bytes\n", data->size); + //fprintf(stderr, "pushed %d bytes\n", data->size); pthread_mutex_unlock(&_this->mtx); pthread_cond_signal(&_this->cond); @@ -189,7 +207,7 @@ t_iemnet_chunk* queue_pop( freebytes(head, sizeof(t_node)); head=NULL; } - fprintf(stderr, "popped %d bytes\n", data->size); + //fprintf(stderr, "popped %d bytes\n", data->size); return data; } @@ -281,16 +299,17 @@ static void*iemnet__sender_sendthread(void*arg) { } sender->queue=NULL; queue_destroy(q); + fprintf(stderr, "write thread terminated\n"); return NULL; } int iemnet__sender_send(t_iemnet_sender*s, t_iemnet_chunk*c) { - post("send %x %x", s, c); + //post("send %x %x", s, c); t_queue*q=s->queue; int size=0; - post("queue=%x", q); + //post("queue=%x", q); if(q) { - post("sending data with %d bytes using %x", c->size, s); + // post("sending data with %d bytes using %x", c->size, s); size = queue_push(q, c); } return size; @@ -357,18 +376,54 @@ int iemnet__sender_setsockopt(t_iemnet_sender*s, int level, int optname, const v struct _iemnet_receiver { + pthread_t thread; int sockfd; /* owned outside; you must call iemnet__receiver_destroy() before freeing socket yourself */ void*owner; t_iemnet_chunk*data; t_iemnet_receivecallback callback; + t_queue*queue; + int cont; }; + +/* the workhorse of the family */ +static void*iemnet__receiver_readthread(void*arg) { + t_iemnet_receiver*receiver=(t_iemnet_receiver*)arg; + + int sockfd=receiver->sockfd; + t_queue*q=receiver->queue; + + unsigned char data[INBUFSIZE]; + unsigned int size=INBUFSIZE; + + fprintf(stderr, "read thread started\n"); + + int i=0; + for(i=0; icont) { + int result = recv(sockfd, data, size, 0); + + if(0==result)break; + t_iemnet_chunk*c = iemnet__chunk_create_data(result, data); + + queue_push(q, c); + + // shouldn't we do something with the result here? + } + fprintf(stderr, "read thread terminated\n"); + return NULL; +} + + static void iemnet__receiver_pollfn(t_iemnet_receiver*x, int fd) { int ret = recv(fd, /* socket */ x->data->data, /* buf */ x->data->size, /* len */ 0); /* flags */ + post("pollfn"); + return; if(ret<=0) { sys_rmpollfn(fd); x->callback(x->owner, fd, 0); @@ -384,9 +439,10 @@ static void iemnet__receiver_pollfn(t_iemnet_receiver*x, int fd) { t_iemnet_receiver*iemnet__receiver_create(int sock, void*owner, t_iemnet_receivecallback callback) { t_iemnet_receiver*result=(t_iemnet_receiver*)getbytes(sizeof(t_iemnet_receiver)); - fprintf(stderr, "new receiver for %d\t%x\t%x\n", sock, owner, callback); + //fprintf(stderr, "new receiver for %d\t%x\t%x\n", sock, owner, callback); if(result) { t_iemnet_chunk*data=iemnet__chunk_create_empty(INBUFSIZE); + int res=0; if(NULL==data) { iemnet__receiver_destroy(result); return NULL; @@ -396,9 +452,12 @@ t_iemnet_receiver*iemnet__receiver_create(int sock, void*owner, t_iemnet_receive result->data=data; result->callback=callback; - sys_addpollfn(sock, (t_fdpollfn)iemnet__receiver_pollfn, result); + result->queue = queue_create(); + + result->cont = 1; + res=pthread_create(&result->thread, 0, iemnet__receiver_readthread, result); } - fprintf(stderr, "new receiver created\n"); + //fprintf(stderr, "new receiver created\n"); return result; } @@ -414,6 +473,8 @@ void iemnet__receiver_destroy(t_iemnet_receiver*r) { r->data=NULL; r->callback=NULL; + r->cont = 0; + freebytes(r, sizeof(t_iemnet_receiver)); r=NULL; } -- cgit v1.2.1