aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--iemnet.h4
-rw-r--r--shared.c172
-rw-r--r--tcpserver.c27
3 files changed, 161 insertions, 42 deletions
diff --git a/iemnet.h b/iemnet.h
index 390e1ea..4d35794 100644
--- a/iemnet.h
+++ b/iemnet.h
@@ -13,8 +13,6 @@ t_iemnet_chunk*iemnet__chunk_create_data(int, unsigned char*);
t_iemnet_chunk*iemnet__chunk_create_list(int, t_atom*);
t_iemnet_chunk*iemnet__chunk_create_chunk(t_iemnet_chunk*);
-t_atom*iemnet__chunk2list(t_iemnet_chunk*);
-
/* sender */
#define t_iemnet_sender struct _iemnet_sender
EXTERN_STRUCT _iemnet_sender;
@@ -33,7 +31,7 @@ int iemnet__sender_setsockopt(t_iemnet_sender*, int level, int optname, const vo
#define t_iemnet_receiver struct _iemnet_receiver
EXTERN_STRUCT _iemnet_receiver;
-typedef void (*t_iemnet_receivecallback)(void*x, int, t_iemnet_chunk*);
+typedef void (*t_iemnet_receivecallback)(void*x, int sockfd, int argc, t_atom*argv);
/**
* create a receiver object: whenever something is received on the socket,
diff --git a/shared.c b/shared.c
index aeed276..c172ceb 100644
--- a/shared.c
+++ b/shared.c
@@ -19,11 +19,76 @@
#include <pthread.h>
-#define INBUFSIZE 4096L /* was 4096: size of receiving data buffer */
-//#define INBUFSIZE 65536L /* was 4096: size of receiving data buffer */
+//#define INBUFSIZE 4096L /* was 4096: size of receiving data buffer */
+#define INBUFSIZE 65536L /* was 4096: size of receiving data buffer */
- /* data handling */
+/* data handling */
+
+typedef struct _iemnet_floatlist {
+ t_atom*argv;
+ size_t argc;
+
+ size_t size; // real size (might be bigger than argc)
+} t_iemnet_floatlist;
+
+t_iemnet_floatlist*iemnet__floatlist_init(t_iemnet_floatlist*cl) {
+ unsigned int i;
+ if(NULL==cl)return NULL;
+ for(i=0; i<cl->size; i++)
+ SETFLOAT((cl->argv+i), 0.f);
+
+ return cl;
+}
+
+void iemnet__floatlist_destroy(t_iemnet_floatlist*cl) {
+ if(NULL==cl)return;
+ if(cl->argv) freebytes(cl->argv, sizeof(t_atom)*cl->size);
+ cl->argv=NULL;
+ cl->argc=0;
+ cl->size=0;
+
+ freebytes(cl, sizeof(t_iemnet_floatlist));
+}
+
+t_iemnet_floatlist*iemnet__floatlist_create(unsigned int size) {
+ t_iemnet_floatlist*result=(t_iemnet_floatlist*)getbytes(sizeof(t_iemnet_floatlist));
+ if(NULL==result)return NULL;
+
+ result->argv = (t_atom*)getbytes(size*sizeof(t_atom));
+ if(NULL==result->argv) {
+ iemnet__floatlist_destroy(result);
+ return NULL;
+ }
+
+ result->argc=size;
+ result->size=size;
+
+ result=iemnet__floatlist_init(result);
+
+ return result;
+}
+
+t_iemnet_floatlist*iemnet__floatlist_resize(t_iemnet_floatlist*cl, unsigned int size) {
+ t_atom*tmp;
+ if(size<=cl->size) {
+ cl->argc=size;
+ return cl;
+ }
+
+ tmp=(t_atom*)getbytes(size*sizeof(t_atom));
+ if(NULL==tmp) return NULL;
+
+ freebytes(cl->argv, sizeof(t_atom)*cl->size);
+
+ cl->argv=tmp;
+ cl->argc=cl->size=size;
+
+ cl=iemnet__floatlist_init(cl);
+
+ return cl;
+}
+
void iemnet__chunk_destroy(t_iemnet_chunk*c) {
@@ -115,17 +180,17 @@ t_iemnet_chunk*iemnet__chunk_create_chunk(t_iemnet_chunk*c) {
}
-t_atom*iemnet__chunk2list(t_iemnet_chunk*c) {
- t_atom*result=NULL;
+t_iemnet_floatlist*iemnet__chunk2list(t_iemnet_chunk*c, t_iemnet_floatlist*dest) {
unsigned int i;
if(NULL==c)return NULL;
- result=(t_atom*)getbytes(c->size*sizeof(t_atom));
- if(NULL==result)return NULL;
+ dest=iemnet__floatlist_resize(dest, c->size);
+ if(NULL==dest)return NULL;
+
for(i=0; i<c->size; i++) {
- SETFLOAT(result+i, c->data[i]);
+ dest->argv[i].a_w.w_float = c->data[i];
}
- return result;
+ return dest;
}
@@ -181,7 +246,7 @@ int queue_push(
return size;
}
-t_iemnet_chunk* queue_pop(
+t_iemnet_chunk* queue_pop_block(
t_queue* const _this
) {
t_node* head=0;
@@ -192,8 +257,38 @@ t_iemnet_chunk* queue_pop(
pthread_mutex_unlock(&_this->mtx);
return NULL;
}
- else
+ else {
pthread_cond_wait(&_this->cond, &_this->mtx);
+ }
+ }
+
+ if (! (_this->head = head->next)) {
+ _this->tail = 0;
+ }
+ if(head && head->data) {
+ _this->size-=head->data->size;
+ }
+
+ pthread_mutex_unlock(&_this->mtx);
+ if(head) {
+ data=head->data;
+ freebytes(head, sizeof(t_node));
+ head=NULL;
+ }
+ //fprintf(stderr, "popped %d bytes\n", data->size);
+ return data;
+}
+
+t_iemnet_chunk* queue_pop_noblock(
+ t_queue* const _this
+ ) {
+ t_node* head=0;
+ t_iemnet_chunk*data=0;
+ pthread_mutex_lock(&_this->mtx);
+ if (! (head = _this->head)) {
+ // empty head
+ pthread_mutex_unlock(&_this->mtx);
+ return NULL;
}
if (! (_this->head = head->next)) {
_this->tail = 0;
@@ -212,6 +307,11 @@ t_iemnet_chunk* queue_pop(
return data;
}
+t_iemnet_chunk* queue_pop(t_queue* const _this) {
+ return queue_pop_block(_this);
+}
+
+
void queue_finish(t_queue* q) {
if(NULL==q)
return;
@@ -221,13 +321,15 @@ void queue_finish(t_queue* q) {
void queue_destroy(t_queue* q) {
t_iemnet_chunk*c=NULL;
+
+ post("queue_destroy %x", q);
if(NULL==q)
return;
queue_finish(q);
/* remove all the chunks from the queue */
- while(NULL!=(c=queue_pop(q))) {
+ while(NULL!=(c=queue_pop_noblock(q))) {
iemnet__chunk_destroy(c);
}
@@ -239,6 +341,7 @@ void queue_destroy(t_queue* q) {
freebytes(q, sizeof(t_queue));
q=NULL;
+ post("queue_destroyed %x", q);
}
t_queue* queue_create(void) {
@@ -296,6 +399,8 @@ static void*iemnet__sender_sendthread(void*arg) {
// shouldn't we do something with the result here?
iemnet__chunk_destroy(c);
+ } else {
+ break;
}
}
sender->queue=NULL;
@@ -385,6 +490,7 @@ struct _iemnet_receiver {
t_queue*queue;
int running;
t_clock *clock;
+ t_iemnet_floatlist*flist;
};
@@ -402,7 +508,9 @@ static void*iemnet__receiver_readthread(void*arg) {
for(i=0; i<size; i++)data[i]=0;
receiver->running=1;
while(1) {
+ // fprintf(stderr, "reading %d bytes...\n", size);
int result = recv(sockfd, data, size, 0);
+ //fprintf(stderr, "read %d bytes...\n", result);
if(0==result)break;
t_iemnet_chunk*c = iemnet__chunk_create_data(result, data);
@@ -412,19 +520,35 @@ static void*iemnet__receiver_readthread(void*arg) {
}
clock_delay(receiver->clock, 0);
receiver->running=0;
+ fprintf(stderr, "read thread terminated\n");
return NULL;
}
static void iemnet__receiver_tick(t_iemnet_receiver *x)
{
- post("receiver tick");
- if(x->running) {
- // received data
- t_iemnet_chunk*c=queue_pop(x->queue);
- (x->callback)(x->owner, x->sockfd, c);
- } else {
- x->callback(x->owner, x->sockfd, 0);
+ static int ticks=0;
+ static int packets=0;
+ static double totaltime=0;
+
+ double start=sys_getrealtime();
+ // received data
+ t_iemnet_chunk*c=queue_pop_noblock(x->queue);
+ while(NULL!=c) {
+ x->flist = iemnet__chunk2list(c, x->flist);
+ (x->callback)(x->owner, x->sockfd, x->flist->argc, x->flist->argv);
+ iemnet__chunk_destroy(c);
+ c=queue_pop_noblock(x->queue);
+
+ packets++;
+ }
+
+ ticks++;
+ totaltime+=(sys_getrealtime()-start);
+
+ if(!x->running) {
+ // read terminated
+ x->callback(x->owner, x->sockfd, 0, NULL);
}
}
@@ -443,6 +567,7 @@ t_iemnet_receiver*iemnet__receiver_create(int sock, void*owner, t_iemnet_receive
result->owner=owner;
result->data=data;
result->callback=callback;
+ result->flist=iemnet__floatlist_create(1024);
result->queue = queue_create();
result->clock = clock_new(result, (t_method)iemnet__receiver_tick);
@@ -455,15 +580,16 @@ t_iemnet_receiver*iemnet__receiver_create(int sock, void*owner, t_iemnet_receive
}
void iemnet__receiver_destroy(t_iemnet_receiver*r) {
if(NULL==r)return;
- if(r->data)
- iemnet__chunk_destroy(r->data);
+ if(r->data)iemnet__chunk_destroy(r->data);
+ if(r->flist)iemnet__floatlist_destroy(r->flist);
+ clock_free(r->clock);
r->sockfd=0;
r->owner=NULL;
r->data=NULL;
r->callback=NULL;
-
- clock_free(r->clock);
+ r->clock=NULL;
+ r->flist=NULL;
freebytes(r, sizeof(t_iemnet_receiver));
r=NULL;
diff --git a/tcpserver.c b/tcpserver.c
index ba62ef8..9dcf529 100644
--- a/tcpserver.c
+++ b/tcpserver.c
@@ -100,7 +100,7 @@ static void tcpserver_send_bytes(t_tcpserver *x, int sockfd, t_iemnet_chunk*chun
#ifdef SIOCOUTQ
static int tcpserver_send_buffer_avaliable_for_client(t_tcpserver *x, int client);
#endif
-static void tcpserver_datacallback(t_tcpserver *x, int sockfd, t_iemnet_chunk*chunk);
+static void tcpserver_datacallback(t_tcpserver *x, int sockfd, int argc, t_atom*argv);
static void tcpserver_disconnect_client(t_tcpserver *x, t_floatarg fclient);
static void tcpserver_disconnect_socket(t_tcpserver *x, t_floatarg fsocket);
@@ -330,29 +330,24 @@ static void tcpserver_disconnect_socket(t_tcpserver *x, t_floatarg fsocket)
tcpserver_disconnect_client(x, id+1);
}
-/* ---------------- main tcpserver (receive) stuff --------------------- */
-
-static void tcpserver_datacallback(t_tcpserver *x, int sockfd, t_iemnet_chunk*chunk) {
- post("data callback for %x with data @ %x", x, chunk);
- if(NULL!=chunk) {
- t_atom*argv=NULL;
- const int size=chunk->size;
- post("\t%d elements", size);
- argv=iemnet__chunk2list(chunk);
- post("got list at %x", argv);
- outlet_list(x->x_msgout, &s_list, chunk->size, argv);
-
- post("freeing list");
- freebytes(argv, sizeof(t_atom)*chunk->size);
+/* ---------------- main tcpserver (receive) stuff --------------------- */
+static void tcpserver_datacallback(t_tcpserver *x, int sockfd, int argc, t_atom*argv) {
+ static int packetcount=0;
+ static int bytecount=0;
+ if(argc) {
+ outlet_list(x->x_msgout, &s_list, argc, argv);
+ packetcount++;
+ bytecount+=argc;
} else {
// disconnected
tcpserver_disconnect_socket(x, sockfd);
}
- post("callback done");
+
+ // post("tcpserver: %d bytes in %d packets", bytecount, packetcount);
}
static void tcpserver_connectpoll(t_tcpserver *x)