From 1af905631bc3970cc8cee3ec795da3ea40f97144 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?IOhannes=20m=20zm=C3=B6lnig?= Date: Wed, 24 Mar 2010 12:09:57 +0000 Subject: kind of works now: we can dump about 500MB within 5secs into tcpserver on lo svn path=/trunk/externals/iem/iemnet/; revision=13251 --- iemnet.h | 4 +- shared.c | 172 ++++++++++++++++++++++++++++++++++++++++++++++++++++-------- tcpserver.c | 27 ++++------ 3 files changed, 161 insertions(+), 42 deletions(-) diff --git a/iemnet.h b/iemnet.h index 390e1ea..4d35794 100644 --- a/iemnet.h +++ b/iemnet.h @@ -13,8 +13,6 @@ t_iemnet_chunk*iemnet__chunk_create_data(int, unsigned char*); t_iemnet_chunk*iemnet__chunk_create_list(int, t_atom*); t_iemnet_chunk*iemnet__chunk_create_chunk(t_iemnet_chunk*); -t_atom*iemnet__chunk2list(t_iemnet_chunk*); - /* sender */ #define t_iemnet_sender struct _iemnet_sender EXTERN_STRUCT _iemnet_sender; @@ -33,7 +31,7 @@ int iemnet__sender_setsockopt(t_iemnet_sender*, int level, int optname, const vo #define t_iemnet_receiver struct _iemnet_receiver EXTERN_STRUCT _iemnet_receiver; -typedef void (*t_iemnet_receivecallback)(void*x, int, t_iemnet_chunk*); +typedef void (*t_iemnet_receivecallback)(void*x, int sockfd, int argc, t_atom*argv); /** * create a receiver object: whenever something is received on the socket, diff --git a/shared.c b/shared.c index aeed276..c172ceb 100644 --- a/shared.c +++ b/shared.c @@ -19,11 +19,76 @@ #include -#define INBUFSIZE 4096L /* was 4096: size of receiving data buffer */ -//#define INBUFSIZE 65536L /* was 4096: size of receiving data buffer */ +//#define INBUFSIZE 4096L /* was 4096: size of receiving data buffer */ +#define INBUFSIZE 65536L /* was 4096: size of receiving data buffer */ - /* data handling */ +/* data handling */ + +typedef struct _iemnet_floatlist { + t_atom*argv; + size_t argc; + + size_t size; // real size (might be bigger than argc) +} t_iemnet_floatlist; + +t_iemnet_floatlist*iemnet__floatlist_init(t_iemnet_floatlist*cl) { + unsigned int i; + if(NULL==cl)return NULL; + for(i=0; isize; i++) + SETFLOAT((cl->argv+i), 0.f); + + return cl; +} + +void iemnet__floatlist_destroy(t_iemnet_floatlist*cl) { + if(NULL==cl)return; + if(cl->argv) freebytes(cl->argv, sizeof(t_atom)*cl->size); + cl->argv=NULL; + cl->argc=0; + cl->size=0; + + freebytes(cl, sizeof(t_iemnet_floatlist)); +} + +t_iemnet_floatlist*iemnet__floatlist_create(unsigned int size) { + t_iemnet_floatlist*result=(t_iemnet_floatlist*)getbytes(sizeof(t_iemnet_floatlist)); + if(NULL==result)return NULL; + + result->argv = (t_atom*)getbytes(size*sizeof(t_atom)); + if(NULL==result->argv) { + iemnet__floatlist_destroy(result); + return NULL; + } + + result->argc=size; + result->size=size; + + result=iemnet__floatlist_init(result); + + return result; +} + +t_iemnet_floatlist*iemnet__floatlist_resize(t_iemnet_floatlist*cl, unsigned int size) { + t_atom*tmp; + if(size<=cl->size) { + cl->argc=size; + return cl; + } + + tmp=(t_atom*)getbytes(size*sizeof(t_atom)); + if(NULL==tmp) return NULL; + + freebytes(cl->argv, sizeof(t_atom)*cl->size); + + cl->argv=tmp; + cl->argc=cl->size=size; + + cl=iemnet__floatlist_init(cl); + + return cl; +} + void iemnet__chunk_destroy(t_iemnet_chunk*c) { @@ -115,17 +180,17 @@ t_iemnet_chunk*iemnet__chunk_create_chunk(t_iemnet_chunk*c) { } -t_atom*iemnet__chunk2list(t_iemnet_chunk*c) { - t_atom*result=NULL; +t_iemnet_floatlist*iemnet__chunk2list(t_iemnet_chunk*c, t_iemnet_floatlist*dest) { unsigned int i; if(NULL==c)return NULL; - result=(t_atom*)getbytes(c->size*sizeof(t_atom)); - if(NULL==result)return NULL; + dest=iemnet__floatlist_resize(dest, c->size); + if(NULL==dest)return NULL; + for(i=0; isize; i++) { - SETFLOAT(result+i, c->data[i]); + dest->argv[i].a_w.w_float = c->data[i]; } - return result; + return dest; } @@ -181,7 +246,7 @@ int queue_push( return size; } -t_iemnet_chunk* queue_pop( +t_iemnet_chunk* queue_pop_block( t_queue* const _this ) { t_node* head=0; @@ -192,8 +257,38 @@ t_iemnet_chunk* queue_pop( pthread_mutex_unlock(&_this->mtx); return NULL; } - else + else { pthread_cond_wait(&_this->cond, &_this->mtx); + } + } + + if (! (_this->head = head->next)) { + _this->tail = 0; + } + if(head && head->data) { + _this->size-=head->data->size; + } + + pthread_mutex_unlock(&_this->mtx); + if(head) { + data=head->data; + freebytes(head, sizeof(t_node)); + head=NULL; + } + //fprintf(stderr, "popped %d bytes\n", data->size); + return data; +} + +t_iemnet_chunk* queue_pop_noblock( + t_queue* const _this + ) { + t_node* head=0; + t_iemnet_chunk*data=0; + pthread_mutex_lock(&_this->mtx); + if (! (head = _this->head)) { + // empty head + pthread_mutex_unlock(&_this->mtx); + return NULL; } if (! (_this->head = head->next)) { _this->tail = 0; @@ -212,6 +307,11 @@ t_iemnet_chunk* queue_pop( return data; } +t_iemnet_chunk* queue_pop(t_queue* const _this) { + return queue_pop_block(_this); +} + + void queue_finish(t_queue* q) { if(NULL==q) return; @@ -221,13 +321,15 @@ void queue_finish(t_queue* q) { void queue_destroy(t_queue* q) { t_iemnet_chunk*c=NULL; + + post("queue_destroy %x", q); if(NULL==q) return; queue_finish(q); /* remove all the chunks from the queue */ - while(NULL!=(c=queue_pop(q))) { + while(NULL!=(c=queue_pop_noblock(q))) { iemnet__chunk_destroy(c); } @@ -239,6 +341,7 @@ void queue_destroy(t_queue* q) { freebytes(q, sizeof(t_queue)); q=NULL; + post("queue_destroyed %x", q); } t_queue* queue_create(void) { @@ -296,6 +399,8 @@ static void*iemnet__sender_sendthread(void*arg) { // shouldn't we do something with the result here? iemnet__chunk_destroy(c); + } else { + break; } } sender->queue=NULL; @@ -385,6 +490,7 @@ struct _iemnet_receiver { t_queue*queue; int running; t_clock *clock; + t_iemnet_floatlist*flist; }; @@ -402,7 +508,9 @@ static void*iemnet__receiver_readthread(void*arg) { for(i=0; irunning=1; while(1) { + // fprintf(stderr, "reading %d bytes...\n", size); int result = recv(sockfd, data, size, 0); + //fprintf(stderr, "read %d bytes...\n", result); if(0==result)break; t_iemnet_chunk*c = iemnet__chunk_create_data(result, data); @@ -412,19 +520,35 @@ static void*iemnet__receiver_readthread(void*arg) { } clock_delay(receiver->clock, 0); receiver->running=0; + fprintf(stderr, "read thread terminated\n"); return NULL; } static void iemnet__receiver_tick(t_iemnet_receiver *x) { - post("receiver tick"); - if(x->running) { - // received data - t_iemnet_chunk*c=queue_pop(x->queue); - (x->callback)(x->owner, x->sockfd, c); - } else { - x->callback(x->owner, x->sockfd, 0); + static int ticks=0; + static int packets=0; + static double totaltime=0; + + double start=sys_getrealtime(); + // received data + t_iemnet_chunk*c=queue_pop_noblock(x->queue); + while(NULL!=c) { + x->flist = iemnet__chunk2list(c, x->flist); + (x->callback)(x->owner, x->sockfd, x->flist->argc, x->flist->argv); + iemnet__chunk_destroy(c); + c=queue_pop_noblock(x->queue); + + packets++; + } + + ticks++; + totaltime+=(sys_getrealtime()-start); + + if(!x->running) { + // read terminated + x->callback(x->owner, x->sockfd, 0, NULL); } } @@ -443,6 +567,7 @@ t_iemnet_receiver*iemnet__receiver_create(int sock, void*owner, t_iemnet_receive result->owner=owner; result->data=data; result->callback=callback; + result->flist=iemnet__floatlist_create(1024); result->queue = queue_create(); result->clock = clock_new(result, (t_method)iemnet__receiver_tick); @@ -455,15 +580,16 @@ t_iemnet_receiver*iemnet__receiver_create(int sock, void*owner, t_iemnet_receive } void iemnet__receiver_destroy(t_iemnet_receiver*r) { if(NULL==r)return; - if(r->data) - iemnet__chunk_destroy(r->data); + if(r->data)iemnet__chunk_destroy(r->data); + if(r->flist)iemnet__floatlist_destroy(r->flist); + clock_free(r->clock); r->sockfd=0; r->owner=NULL; r->data=NULL; r->callback=NULL; - - clock_free(r->clock); + r->clock=NULL; + r->flist=NULL; freebytes(r, sizeof(t_iemnet_receiver)); r=NULL; diff --git a/tcpserver.c b/tcpserver.c index ba62ef8..9dcf529 100644 --- a/tcpserver.c +++ b/tcpserver.c @@ -100,7 +100,7 @@ static void tcpserver_send_bytes(t_tcpserver *x, int sockfd, t_iemnet_chunk*chun #ifdef SIOCOUTQ static int tcpserver_send_buffer_avaliable_for_client(t_tcpserver *x, int client); #endif -static void tcpserver_datacallback(t_tcpserver *x, int sockfd, t_iemnet_chunk*chunk); +static void tcpserver_datacallback(t_tcpserver *x, int sockfd, int argc, t_atom*argv); static void tcpserver_disconnect_client(t_tcpserver *x, t_floatarg fclient); static void tcpserver_disconnect_socket(t_tcpserver *x, t_floatarg fsocket); @@ -330,29 +330,24 @@ static void tcpserver_disconnect_socket(t_tcpserver *x, t_floatarg fsocket) tcpserver_disconnect_client(x, id+1); } -/* ---------------- main tcpserver (receive) stuff --------------------- */ - -static void tcpserver_datacallback(t_tcpserver *x, int sockfd, t_iemnet_chunk*chunk) { - post("data callback for %x with data @ %x", x, chunk); - if(NULL!=chunk) { - t_atom*argv=NULL; - const int size=chunk->size; - post("\t%d elements", size); - argv=iemnet__chunk2list(chunk); - post("got list at %x", argv); - outlet_list(x->x_msgout, &s_list, chunk->size, argv); - - post("freeing list"); - freebytes(argv, sizeof(t_atom)*chunk->size); +/* ---------------- main tcpserver (receive) stuff --------------------- */ +static void tcpserver_datacallback(t_tcpserver *x, int sockfd, int argc, t_atom*argv) { + static int packetcount=0; + static int bytecount=0; + if(argc) { + outlet_list(x->x_msgout, &s_list, argc, argv); + packetcount++; + bytecount+=argc; } else { // disconnected tcpserver_disconnect_socket(x, sockfd); } - post("callback done"); + + // post("tcpserver: %d bytes in %d packets", bytecount, packetcount); } static void tcpserver_connectpoll(t_tcpserver *x) -- cgit v1.2.1