aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIOhannes m zmölnig <zmoelnig@users.sourceforge.net>2010-03-23 19:05:53 +0000
committerIOhannes m zmölnig <zmoelnig@users.sourceforge.net>2010-03-23 19:05:53 +0000
commit5cdf7066e1ceb823afa572fa596c37a935911cde (patch)
tree12b5aa808a8d548be4a76ebc5f63138fc4bfe8fa
parentc171430ffc61d77ace607d17e7daffb58de096e6 (diff)
hmm, less crashes; threads hang
svn path=/trunk/externals/iem/iemnet/; revision=13245
-rw-r--r--iemnet.h1
-rw-r--r--shared.c79
-rw-r--r--tcpserver.c68
3 files changed, 104 insertions, 44 deletions
diff --git a/iemnet.h b/iemnet.h
index 33b2113..390e1ea 100644
--- a/iemnet.h
+++ b/iemnet.h
@@ -9,6 +9,7 @@ typedef struct _iemnet_chunk {
void iemnet__chunk_destroy(t_iemnet_chunk*);
t_iemnet_chunk*iemnet__chunk_create_empty(int);
+t_iemnet_chunk*iemnet__chunk_create_data(int, unsigned char*);
t_iemnet_chunk*iemnet__chunk_create_list(int, t_atom*);
t_iemnet_chunk*iemnet__chunk_create_chunk(t_iemnet_chunk*);
diff --git a/shared.c b/shared.c
index 14311d8..819afb3 100644
--- a/shared.c
+++ b/shared.c
@@ -53,6 +53,24 @@ t_iemnet_chunk* iemnet__chunk_create_empty(int size) {
return result;
}
+t_iemnet_chunk* iemnet__chunk_create_data(int size, unsigned char*data) {
+ t_iemnet_chunk*result=(t_iemnet_chunk*)getbytes(sizeof(t_iemnet_chunk));
+ if(result) {
+ result->size=size;
+ result->data=(unsigned char*)getbytes(sizeof(unsigned char)*size);
+
+ if(NULL == result->data) {
+ result->size=0;
+ iemnet__chunk_destroy(result);
+ return NULL;
+ }
+
+ memcpy(result->data, data, result->size);
+ }
+ return result;
+}
+
+
t_iemnet_chunk* iemnet__chunk_create_list(int argc, t_atom*argv) {
t_iemnet_chunk*result=(t_iemnet_chunk*)getbytes(sizeof(t_iemnet_chunk));
int i;
@@ -136,7 +154,7 @@ int queue_push(
int size=_this->size;
if(NULL == data) return size;
- fprintf(stderr, "pushing %d bytes\n", data->size);
+ //fprintf(stderr, "pushing %d bytes\n", data->size);
n=(t_node*)getbytes(sizeof(t_node));
@@ -154,7 +172,7 @@ int queue_push(
size=_this->size;
- fprintf(stderr, "pushed %d bytes\n", data->size);
+ //fprintf(stderr, "pushed %d bytes\n", data->size);
pthread_mutex_unlock(&_this->mtx);
pthread_cond_signal(&_this->cond);
@@ -189,7 +207,7 @@ t_iemnet_chunk* queue_pop(
freebytes(head, sizeof(t_node));
head=NULL;
}
- fprintf(stderr, "popped %d bytes\n", data->size);
+ //fprintf(stderr, "popped %d bytes\n", data->size);
return data;
}
@@ -281,16 +299,17 @@ static void*iemnet__sender_sendthread(void*arg) {
}
sender->queue=NULL;
queue_destroy(q);
+ fprintf(stderr, "write thread terminated\n");
return NULL;
}
int iemnet__sender_send(t_iemnet_sender*s, t_iemnet_chunk*c) {
- post("send %x %x", s, c);
+ //post("send %x %x", s, c);
t_queue*q=s->queue;
int size=0;
- post("queue=%x", q);
+ //post("queue=%x", q);
if(q) {
- post("sending data with %d bytes using %x", c->size, s);
+ // post("sending data with %d bytes using %x", c->size, s);
size = queue_push(q, c);
}
return size;
@@ -357,18 +376,54 @@ int iemnet__sender_setsockopt(t_iemnet_sender*s, int level, int optname, const v
struct _iemnet_receiver {
+ pthread_t thread;
int sockfd; /* owned outside; you must call iemnet__receiver_destroy() before freeing socket yourself */
void*owner;
t_iemnet_chunk*data;
t_iemnet_receivecallback callback;
+ t_queue*queue;
+ int cont;
};
+
+/* the workhorse of the family */
+static void*iemnet__receiver_readthread(void*arg) {
+ t_iemnet_receiver*receiver=(t_iemnet_receiver*)arg;
+
+ int sockfd=receiver->sockfd;
+ t_queue*q=receiver->queue;
+
+ unsigned char data[INBUFSIZE];
+ unsigned int size=INBUFSIZE;
+
+ fprintf(stderr, "read thread started\n");
+
+ int i=0;
+ for(i=0; i<size; i++)data[i]=0;
+
+ while(receiver->cont) {
+ int result = recv(sockfd, data, size, 0);
+
+ if(0==result)break;
+ t_iemnet_chunk*c = iemnet__chunk_create_data(result, data);
+
+ queue_push(q, c);
+
+ // shouldn't we do something with the result here?
+ }
+ fprintf(stderr, "read thread terminated\n");
+ return NULL;
+}
+
+
static void iemnet__receiver_pollfn(t_iemnet_receiver*x, int fd) {
int ret = recv(fd, /* socket */
x->data->data, /* buf */
x->data->size, /* len */
0); /* flags */
+ post("pollfn");
+ return;
if(ret<=0) {
sys_rmpollfn(fd);
x->callback(x->owner, fd, 0);
@@ -384,9 +439,10 @@ static void iemnet__receiver_pollfn(t_iemnet_receiver*x, int fd) {
t_iemnet_receiver*iemnet__receiver_create(int sock, void*owner, t_iemnet_receivecallback callback) {
t_iemnet_receiver*result=(t_iemnet_receiver*)getbytes(sizeof(t_iemnet_receiver));
- fprintf(stderr, "new receiver for %d\t%x\t%x\n", sock, owner, callback);
+ //fprintf(stderr, "new receiver for %d\t%x\t%x\n", sock, owner, callback);
if(result) {
t_iemnet_chunk*data=iemnet__chunk_create_empty(INBUFSIZE);
+ int res=0;
if(NULL==data) {
iemnet__receiver_destroy(result);
return NULL;
@@ -396,9 +452,12 @@ t_iemnet_receiver*iemnet__receiver_create(int sock, void*owner, t_iemnet_receive
result->data=data;
result->callback=callback;
- sys_addpollfn(sock, (t_fdpollfn)iemnet__receiver_pollfn, result);
+ result->queue = queue_create();
+
+ result->cont = 1;
+ res=pthread_create(&result->thread, 0, iemnet__receiver_readthread, result);
}
- fprintf(stderr, "new receiver created\n");
+ //fprintf(stderr, "new receiver created\n");
return result;
}
@@ -414,6 +473,8 @@ void iemnet__receiver_destroy(t_iemnet_receiver*r) {
r->data=NULL;
r->callback=NULL;
+ r->cont = 0;
+
freebytes(r, sizeof(t_iemnet_receiver));
r=NULL;
}
diff --git a/tcpserver.c b/tcpserver.c
index 70f11ae..ba62ef8 100644
--- a/tcpserver.c
+++ b/tcpserver.c
@@ -91,7 +91,7 @@ typedef struct _tcpserver
t_atom x_addrbytes[4];
} t_tcpserver;
-static t_tcpserver_socketreceiver *tcpserver_socketreceiver_new(void *owner, int sockfd);
+static t_tcpserver_socketreceiver *tcpserver_socketreceiver_new(void *owner, int sockfd, t_symbol*host);
static void tcpserver_socketreceiver_free(t_tcpserver_socketreceiver *x);
static void tcpserver_send_client(t_tcpserver *x, t_symbol *s, int argc, t_atom *argv);
@@ -105,20 +105,22 @@ static void tcpserver_datacallback(t_tcpserver *x, int sockfd, t_iemnet_chunk*ch
static void tcpserver_disconnect_client(t_tcpserver *x, t_floatarg fclient);
static void tcpserver_disconnect_socket(t_tcpserver *x, t_floatarg fsocket);
static void tcpserver_broadcast(t_tcpserver *x, t_symbol *s, int argc, t_atom *argv);
-static void tcpserver_notify(t_tcpserver *x, int socket);
static void tcpserver_connectpoll(t_tcpserver *x);
static void *tcpserver_new(t_floatarg fportno);
static void tcpserver_free(t_tcpserver *x);
void tcpserver_setup(void);
-static t_tcpserver_socketreceiver *tcpserver_socketreceiver_new(void *owner, int sockfd)
+static t_tcpserver_socketreceiver *tcpserver_socketreceiver_new(void *owner, int sockfd, t_symbol*host)
{
t_tcpserver_socketreceiver *x = (t_tcpserver_socketreceiver *)getbytes(sizeof(*x));
if(NULL==x) {
error("%s_socketreceiver: unable to allocate %d bytes", objName, sizeof(*x));
return NULL;
} else {
+ x->sr_host=host;
+ x->sr_fd=sockfd;
+
x->sr_sender=iemnet__sender_create(sockfd);
x->sr_receiver=iemnet__receiver_create(sockfd, owner, (t_iemnet_receivecallback)tcpserver_datacallback);
}
@@ -152,7 +154,7 @@ static int tcpserver_socket2index(t_tcpserver*x, int sockfd)
/* ---------------- main tcpserver (send) stuff --------------------- */
-static void tcpserver_send_bytes(t_tcpserver*x,int client, t_iemnet_chunk*chunk)
+static void tcpserver_send_bytes(t_tcpserver*x, int client, t_iemnet_chunk*chunk)
{
if(x && x->x_sr && x->x_sr[client]) {
t_atom output_atom[3];
@@ -273,14 +275,36 @@ static void tcpserver_send_socket(t_tcpserver *x, t_symbol *s, int argc, t_atom
+static void tcpserver_disconnect(t_tcpserver *x, int client)
+{
+ t_tcpserver_socketreceiver *y=NULL;
+ int fd=0;
+ int k;
+
+ y = x->x_sr[client];
+ fd = y->sr_fd;
+ post("closing fd[%d]=%d", client, fd);
+
+ tcpserver_socketreceiver_free(x->x_sr[client]);
+ x->x_sr[client]=NULL;
+ sys_closesocket(fd);
+
+ /* rearrange list now: move entries to close the gap */
+ for(k = client; k < x->x_nconnections; k++)
+ {
+ x->x_sr[k] = x->x_sr[k + 1];
+ }
+ x->x_sr[k + 1]=NULL;
+ x->x_nconnections--;
+
+ outlet_float(x->x_connectout, x->x_nconnections);
+}
/* disconnect a client by number */
static void tcpserver_disconnect_client(t_tcpserver *x, t_floatarg fclient)
{
int client = (int)fclient;
- t_tcpserver_socketreceiver *y=NULL;
- int fd=0;
if(x->x_nconnections <= 0)
{
@@ -294,11 +318,7 @@ static void tcpserver_disconnect_client(t_tcpserver *x, t_floatarg fclient)
}
--client; /* zero based index*/
- y = x->x_sr[client];
- fd = y->sr_fd;
- tcpserver_notify(x, fd);
- sys_rmpollfn(fd);
- sys_closesocket(fd);
+ tcpserver_disconnect(x, client);
}
@@ -312,29 +332,7 @@ static void tcpserver_disconnect_socket(t_tcpserver *x, t_floatarg fsocket)
/* ---------------- main tcpserver (receive) stuff --------------------- */
-static void tcpserver_notify(t_tcpserver *x, int sockfd)
-{
- int i, k;
- /* remove connection from list */
- for(i = 0; i < x->x_nconnections; i++)
- {
- if(x->x_sr[i]->sr_fd == sockfd)
- {
- x->x_nconnections--;
- post("%s: \"%s\" removed from list of clients", objName, x->x_sr[i]->sr_host->s_name);
- tcpserver_socketreceiver_free(x->x_sr[i]);
- x->x_sr[i] = NULL;
-
- /* rearrange list now: move entries to close the gap */
- for(k = i; k < x->x_nconnections; k++)
- {
- x->x_sr[k] = x->x_sr[k + 1];
- }
- }
- }
- outlet_float(x->x_connectout, x->x_nconnections);
-}
static void tcpserver_datacallback(t_tcpserver *x, int sockfd, t_iemnet_chunk*chunk) {
post("data callback for %x with data @ %x", x, chunk);
@@ -367,7 +365,8 @@ static void tcpserver_connectpoll(t_tcpserver *x)
if (fd < 0) post("%s: accept failed", objName);
else
{
- t_tcpserver_socketreceiver *y = tcpserver_socketreceiver_new((void *)x, fd);
+ t_symbol*host=gensym(inet_ntoa(incomer_address.sin_addr));
+ t_tcpserver_socketreceiver *y = tcpserver_socketreceiver_new((void *)x, fd, host);
if (!y)
{
sys_closesocket(fd);
@@ -456,7 +455,6 @@ static void tcpserver_free(t_tcpserver *x)
tcpserver_socketreceiver_free(x->x_sr[i]);
if (x->x_sr[i]->sr_fd >= 0)
{
- sys_rmpollfn(x->x_sr[i]->sr_fd);
sys_closesocket(x->x_sr[i]->sr_fd);
}
}