aboutsummaryrefslogtreecommitdiff
path: root/shared.c
diff options
context:
space:
mode:
Diffstat (limited to 'shared.c')
-rw-r--r--shared.c156
1 files changed, 81 insertions, 75 deletions
diff --git a/shared.c b/shared.c
index c172ceb..09faaad 100644
--- a/shared.c
+++ b/shared.c
@@ -13,8 +13,13 @@
#include <errno.h>
#include <sys/types.h>
-#include <sys/socket.h>
+#ifdef _WIN32
+# include <winsock2.h>
+# include <ws2tcpip.h> /* for socklen_t */
+#else
+# include <sys/socket.h>
+#endif
#include <pthread.h>
@@ -275,7 +280,6 @@ t_iemnet_chunk* queue_pop_block(
freebytes(head, sizeof(t_node));
head=NULL;
}
- //fprintf(stderr, "popped %d bytes\n", data->size);
return data;
}
@@ -303,7 +307,6 @@ t_iemnet_chunk* queue_pop_noblock(
freebytes(head, sizeof(t_node));
head=NULL;
}
- //fprintf(stderr, "popped %d bytes\n", data->size);
return data;
}
@@ -321,8 +324,6 @@ void queue_finish(t_queue* q) {
void queue_destroy(t_queue* q) {
t_iemnet_chunk*c=NULL;
-
- post("queue_destroy %x", q);
if(NULL==q)
return;
@@ -341,7 +342,6 @@ void queue_destroy(t_queue* q) {
freebytes(q, sizeof(t_queue));
q=NULL;
- post("queue_destroyed %x", q);
}
t_queue* queue_create(void) {
@@ -381,6 +381,24 @@ struct _iemnet_sender {
};
/* the workhorse of the family */
+
+static int iemnet__sender_dosend(int sockfd, t_queue*q) {
+ t_iemnet_chunk*c=queue_pop(q);
+ if(c) {
+ unsigned char*data=c->data;
+ unsigned int size=c->size;
+
+ fprintf(stderr, "sending %d bytes at %x to %d\n", size, data, sockfd);
+
+ int result = send(sockfd, data, size, 0);
+ // shouldn't we do something with the result here?
+ iemnet__chunk_destroy(c);
+ } else {
+ return 0;
+ }
+ return 1;
+}
+
static void*iemnet__sender_sendthread(void*arg) {
t_iemnet_sender*sender=(t_iemnet_sender*)arg;
@@ -388,47 +406,28 @@ static void*iemnet__sender_sendthread(void*arg) {
t_queue*q=sender->queue;
while(sender->cont) {
- t_iemnet_chunk*c=NULL;
- c=queue_pop(q);
- if(c) {
- unsigned char*data=c->data;
- unsigned int size=c->size;
-
- int result = send(sockfd, data, size, 0);
-
- // shouldn't we do something with the result here?
-
- iemnet__chunk_destroy(c);
- } else {
- break;
- }
+ if(!iemnet__sender_dosend(sockfd, q))break;
}
- sender->queue=NULL;
- queue_destroy(q);
fprintf(stderr, "write thread terminated\n");
return NULL;
}
int iemnet__sender_send(t_iemnet_sender*s, t_iemnet_chunk*c) {
- //post("send %x %x", s, c);
t_queue*q=s->queue;
int size=0;
- //post("queue=%x", q);
if(q) {
- // post("sending data with %d bytes using %x", c->size, s);
- size = queue_push(q, c);
+ t_iemnet_chunk*chunk=iemnet__chunk_create_chunk(c);
+ size = queue_push(q, chunk);
}
return size;
}
void iemnet__sender_destroy(t_iemnet_sender*s) {
s->cont=0;
- if(s->queue)
- queue_finish(s->queue);
-
+ queue_finish(s->queue);
s->sockfd = -1;
-
pthread_join(s->thread, NULL);
+ queue_destroy(s->queue);
freebytes(s, sizeof(t_iemnet_sender));
s=NULL;
@@ -496,6 +495,7 @@ struct _iemnet_receiver {
/* the workhorse of the family */
static void*iemnet__receiver_readthread(void*arg) {
+ int result = 0;
t_iemnet_receiver*receiver=(t_iemnet_receiver*)arg;
int sockfd=receiver->sockfd;
@@ -508,89 +508,95 @@ static void*iemnet__receiver_readthread(void*arg) {
for(i=0; i<size; i++)data[i]=0;
receiver->running=1;
while(1) {
- // fprintf(stderr, "reading %d bytes...\n", size);
- int result = recv(sockfd, data, size, 0);
- //fprintf(stderr, "read %d bytes...\n", result);
+ fprintf(stderr, "reading %d bytes...\n", size);
+ result = recv(sockfd, data, size, 0);
+ fprintf(stderr, "read %d bytes...\n", result);
- if(0==result)break;
+ if(result<=0)break;
t_iemnet_chunk*c = iemnet__chunk_create_data(result, data);
queue_push(q, c);
- clock_delay(receiver->clock, 0);
+
+ if(receiver->clock)clock_delay(receiver->clock, 0);
}
- clock_delay(receiver->clock, 0);
+
+
+ if(result>=0)
+ if(receiver->clock)clock_delay(receiver->clock, 0);
+
receiver->running=0;
+
+
fprintf(stderr, "read thread terminated\n");
return NULL;
}
-
+#define WHERE fprintf(stderr, "%s:%d", __FUNCTION__, __LINE__)
static void iemnet__receiver_tick(t_iemnet_receiver *x)
{
- static int ticks=0;
- static int packets=0;
- static double totaltime=0;
-
- double start=sys_getrealtime();
+ WHERE; fprintf(stderr, "\treceiver=%x", x);
// received data
t_iemnet_chunk*c=queue_pop_noblock(x->queue);
+ WHERE; fprintf(stderr, "\tchunk=%x", c);
while(NULL!=c) {
x->flist = iemnet__chunk2list(c, x->flist);
(x->callback)(x->owner, x->sockfd, x->flist->argc, x->flist->argv);
iemnet__chunk_destroy(c);
c=queue_pop_noblock(x->queue);
-
- packets++;
}
- ticks++;
- totaltime+=(sys_getrealtime()-start);
+ WHERE; fprintf(stderr, "\trunning=%d", x->running);
if(!x->running) {
// read terminated
x->callback(x->owner, x->sockfd, 0, NULL);
}
+ WHERE; fprintf(stderr, "\ttick done\n");
}
t_iemnet_receiver*iemnet__receiver_create(int sock, void*owner, t_iemnet_receivecallback callback) {
- t_iemnet_receiver*result=(t_iemnet_receiver*)getbytes(sizeof(t_iemnet_receiver));
+ t_iemnet_receiver*rec=(t_iemnet_receiver*)getbytes(sizeof(t_iemnet_receiver));
//fprintf(stderr, "new receiver for %d\t%x\t%x\n", sock, owner, callback);
- if(result) {
+ if(rec) {
t_iemnet_chunk*data=iemnet__chunk_create_empty(INBUFSIZE);
int res=0;
if(NULL==data) {
- iemnet__receiver_destroy(result);
+ iemnet__receiver_destroy(rec);
return NULL;
}
- result->sockfd=sock;
- result->owner=owner;
- result->data=data;
- result->callback=callback;
- result->flist=iemnet__floatlist_create(1024);
-
- result->queue = queue_create();
- result->clock = clock_new(result, (t_method)iemnet__receiver_tick);
- result->running=1;
- res=pthread_create(&result->thread, 0, iemnet__receiver_readthread, result);
+ rec->sockfd=sock;
+ rec->owner=owner;
+ rec->data=data;
+ rec->callback=callback;
+ rec->flist=iemnet__floatlist_create(1024);
+
+ rec->queue = queue_create();
+ rec->clock = clock_new(rec, (t_method)iemnet__receiver_tick);
+ rec->running=1;
+ res=pthread_create(&rec->thread, 0, iemnet__receiver_readthread, rec);
}
//fprintf(stderr, "new receiver created\n");
- return result;
+ return rec;
}
-void iemnet__receiver_destroy(t_iemnet_receiver*r) {
- if(NULL==r)return;
- if(r->data)iemnet__chunk_destroy(r->data);
- if(r->flist)iemnet__floatlist_destroy(r->flist);
- clock_free(r->clock);
-
- r->sockfd=0;
- r->owner=NULL;
- r->data=NULL;
- r->callback=NULL;
- r->clock=NULL;
- r->flist=NULL;
-
- freebytes(r, sizeof(t_iemnet_receiver));
- r=NULL;
+void iemnet__receiver_destroy(t_iemnet_receiver*rec) {
+ if(NULL==rec)return;
+ if(rec->data)iemnet__chunk_destroy(rec->data);
+ if(rec->flist)iemnet__floatlist_destroy(rec->flist);
+ clock_free(rec->clock);
+ sys_closesocket(rec->sockfd);
+
+ rec->sockfd=0;
+ fprintf(stderr, "receiverdestroy join thread\n");
+ pthread_join(rec->thread, NULL);
+ fprintf(stderr, "receiverdestroy joined thread\n");
+ rec->owner=NULL;
+ rec->data=NULL;
+ rec->callback=NULL;
+ rec->clock=NULL;
+ rec->flist=NULL;
+
+ freebytes(rec, sizeof(t_iemnet_receiver));
+ rec=NULL;
}