From 55f063b243f282a9c7f8dceffd898d14fdb7a39f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?IOhannes=20m=20zm=C3=B6lnig?= Date: Wed, 24 Mar 2010 15:24:32 +0000 Subject: a first client svn path=/trunk/externals/iem/iemnet/; revision=13252 --- shared.c | 156 +++++++++++++++++++++++++++++++++------------------------------ 1 file changed, 81 insertions(+), 75 deletions(-) (limited to 'shared.c') diff --git a/shared.c b/shared.c index c172ceb..09faaad 100644 --- a/shared.c +++ b/shared.c @@ -13,8 +13,13 @@ #include #include -#include +#ifdef _WIN32 +# include +# include /* for socklen_t */ +#else +# include +#endif #include @@ -275,7 +280,6 @@ t_iemnet_chunk* queue_pop_block( freebytes(head, sizeof(t_node)); head=NULL; } - //fprintf(stderr, "popped %d bytes\n", data->size); return data; } @@ -303,7 +307,6 @@ t_iemnet_chunk* queue_pop_noblock( freebytes(head, sizeof(t_node)); head=NULL; } - //fprintf(stderr, "popped %d bytes\n", data->size); return data; } @@ -321,8 +324,6 @@ void queue_finish(t_queue* q) { void queue_destroy(t_queue* q) { t_iemnet_chunk*c=NULL; - - post("queue_destroy %x", q); if(NULL==q) return; @@ -341,7 +342,6 @@ void queue_destroy(t_queue* q) { freebytes(q, sizeof(t_queue)); q=NULL; - post("queue_destroyed %x", q); } t_queue* queue_create(void) { @@ -381,6 +381,24 @@ struct _iemnet_sender { }; /* the workhorse of the family */ + +static int iemnet__sender_dosend(int sockfd, t_queue*q) { + t_iemnet_chunk*c=queue_pop(q); + if(c) { + unsigned char*data=c->data; + unsigned int size=c->size; + + fprintf(stderr, "sending %d bytes at %x to %d\n", size, data, sockfd); + + int result = send(sockfd, data, size, 0); + // shouldn't we do something with the result here? + iemnet__chunk_destroy(c); + } else { + return 0; + } + return 1; +} + static void*iemnet__sender_sendthread(void*arg) { t_iemnet_sender*sender=(t_iemnet_sender*)arg; @@ -388,47 +406,28 @@ static void*iemnet__sender_sendthread(void*arg) { t_queue*q=sender->queue; while(sender->cont) { - t_iemnet_chunk*c=NULL; - c=queue_pop(q); - if(c) { - unsigned char*data=c->data; - unsigned int size=c->size; - - int result = send(sockfd, data, size, 0); - - // shouldn't we do something with the result here? - - iemnet__chunk_destroy(c); - } else { - break; - } + if(!iemnet__sender_dosend(sockfd, q))break; } - sender->queue=NULL; - queue_destroy(q); fprintf(stderr, "write thread terminated\n"); return NULL; } int iemnet__sender_send(t_iemnet_sender*s, t_iemnet_chunk*c) { - //post("send %x %x", s, c); t_queue*q=s->queue; int size=0; - //post("queue=%x", q); if(q) { - // post("sending data with %d bytes using %x", c->size, s); - size = queue_push(q, c); + t_iemnet_chunk*chunk=iemnet__chunk_create_chunk(c); + size = queue_push(q, chunk); } return size; } void iemnet__sender_destroy(t_iemnet_sender*s) { s->cont=0; - if(s->queue) - queue_finish(s->queue); - + queue_finish(s->queue); s->sockfd = -1; - pthread_join(s->thread, NULL); + queue_destroy(s->queue); freebytes(s, sizeof(t_iemnet_sender)); s=NULL; @@ -496,6 +495,7 @@ struct _iemnet_receiver { /* the workhorse of the family */ static void*iemnet__receiver_readthread(void*arg) { + int result = 0; t_iemnet_receiver*receiver=(t_iemnet_receiver*)arg; int sockfd=receiver->sockfd; @@ -508,89 +508,95 @@ static void*iemnet__receiver_readthread(void*arg) { for(i=0; irunning=1; while(1) { - // fprintf(stderr, "reading %d bytes...\n", size); - int result = recv(sockfd, data, size, 0); - //fprintf(stderr, "read %d bytes...\n", result); + fprintf(stderr, "reading %d bytes...\n", size); + result = recv(sockfd, data, size, 0); + fprintf(stderr, "read %d bytes...\n", result); - if(0==result)break; + if(result<=0)break; t_iemnet_chunk*c = iemnet__chunk_create_data(result, data); queue_push(q, c); - clock_delay(receiver->clock, 0); + + if(receiver->clock)clock_delay(receiver->clock, 0); } - clock_delay(receiver->clock, 0); + + + if(result>=0) + if(receiver->clock)clock_delay(receiver->clock, 0); + receiver->running=0; + + fprintf(stderr, "read thread terminated\n"); return NULL; } - +#define WHERE fprintf(stderr, "%s:%d", __FUNCTION__, __LINE__) static void iemnet__receiver_tick(t_iemnet_receiver *x) { - static int ticks=0; - static int packets=0; - static double totaltime=0; - - double start=sys_getrealtime(); + WHERE; fprintf(stderr, "\treceiver=%x", x); // received data t_iemnet_chunk*c=queue_pop_noblock(x->queue); + WHERE; fprintf(stderr, "\tchunk=%x", c); while(NULL!=c) { x->flist = iemnet__chunk2list(c, x->flist); (x->callback)(x->owner, x->sockfd, x->flist->argc, x->flist->argv); iemnet__chunk_destroy(c); c=queue_pop_noblock(x->queue); - - packets++; } - ticks++; - totaltime+=(sys_getrealtime()-start); + WHERE; fprintf(stderr, "\trunning=%d", x->running); if(!x->running) { // read terminated x->callback(x->owner, x->sockfd, 0, NULL); } + WHERE; fprintf(stderr, "\ttick done\n"); } t_iemnet_receiver*iemnet__receiver_create(int sock, void*owner, t_iemnet_receivecallback callback) { - t_iemnet_receiver*result=(t_iemnet_receiver*)getbytes(sizeof(t_iemnet_receiver)); + t_iemnet_receiver*rec=(t_iemnet_receiver*)getbytes(sizeof(t_iemnet_receiver)); //fprintf(stderr, "new receiver for %d\t%x\t%x\n", sock, owner, callback); - if(result) { + if(rec) { t_iemnet_chunk*data=iemnet__chunk_create_empty(INBUFSIZE); int res=0; if(NULL==data) { - iemnet__receiver_destroy(result); + iemnet__receiver_destroy(rec); return NULL; } - result->sockfd=sock; - result->owner=owner; - result->data=data; - result->callback=callback; - result->flist=iemnet__floatlist_create(1024); - - result->queue = queue_create(); - result->clock = clock_new(result, (t_method)iemnet__receiver_tick); - result->running=1; - res=pthread_create(&result->thread, 0, iemnet__receiver_readthread, result); + rec->sockfd=sock; + rec->owner=owner; + rec->data=data; + rec->callback=callback; + rec->flist=iemnet__floatlist_create(1024); + + rec->queue = queue_create(); + rec->clock = clock_new(rec, (t_method)iemnet__receiver_tick); + rec->running=1; + res=pthread_create(&rec->thread, 0, iemnet__receiver_readthread, rec); } //fprintf(stderr, "new receiver created\n"); - return result; + return rec; } -void iemnet__receiver_destroy(t_iemnet_receiver*r) { - if(NULL==r)return; - if(r->data)iemnet__chunk_destroy(r->data); - if(r->flist)iemnet__floatlist_destroy(r->flist); - clock_free(r->clock); - - r->sockfd=0; - r->owner=NULL; - r->data=NULL; - r->callback=NULL; - r->clock=NULL; - r->flist=NULL; - - freebytes(r, sizeof(t_iemnet_receiver)); - r=NULL; +void iemnet__receiver_destroy(t_iemnet_receiver*rec) { + if(NULL==rec)return; + if(rec->data)iemnet__chunk_destroy(rec->data); + if(rec->flist)iemnet__floatlist_destroy(rec->flist); + clock_free(rec->clock); + sys_closesocket(rec->sockfd); + + rec->sockfd=0; + fprintf(stderr, "receiverdestroy join thread\n"); + pthread_join(rec->thread, NULL); + fprintf(stderr, "receiverdestroy joined thread\n"); + rec->owner=NULL; + rec->data=NULL; + rec->callback=NULL; + rec->clock=NULL; + rec->flist=NULL; + + freebytes(rec, sizeof(t_iemnet_receiver)); + rec=NULL; } -- cgit v1.2.1