From 5cdf7066e1ceb823afa572fa596c37a935911cde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?IOhannes=20m=20zm=C3=B6lnig?= Date: Tue, 23 Mar 2010 19:05:53 +0000 Subject: hmm, less crashes; threads hang svn path=/trunk/externals/iem/iemnet/; revision=13245 --- iemnet.h | 1 + shared.c | 79 ++++++++++++++++++++++++++++++++++++++++++++++++++++++------- tcpserver.c | 68 ++++++++++++++++++++++++++-------------------------- 3 files changed, 104 insertions(+), 44 deletions(-) diff --git a/iemnet.h b/iemnet.h index 33b2113..390e1ea 100644 --- a/iemnet.h +++ b/iemnet.h @@ -9,6 +9,7 @@ typedef struct _iemnet_chunk { void iemnet__chunk_destroy(t_iemnet_chunk*); t_iemnet_chunk*iemnet__chunk_create_empty(int); +t_iemnet_chunk*iemnet__chunk_create_data(int, unsigned char*); t_iemnet_chunk*iemnet__chunk_create_list(int, t_atom*); t_iemnet_chunk*iemnet__chunk_create_chunk(t_iemnet_chunk*); diff --git a/shared.c b/shared.c index 14311d8..819afb3 100644 --- a/shared.c +++ b/shared.c @@ -53,6 +53,24 @@ t_iemnet_chunk* iemnet__chunk_create_empty(int size) { return result; } +t_iemnet_chunk* iemnet__chunk_create_data(int size, unsigned char*data) { + t_iemnet_chunk*result=(t_iemnet_chunk*)getbytes(sizeof(t_iemnet_chunk)); + if(result) { + result->size=size; + result->data=(unsigned char*)getbytes(sizeof(unsigned char)*size); + + if(NULL == result->data) { + result->size=0; + iemnet__chunk_destroy(result); + return NULL; + } + + memcpy(result->data, data, result->size); + } + return result; +} + + t_iemnet_chunk* iemnet__chunk_create_list(int argc, t_atom*argv) { t_iemnet_chunk*result=(t_iemnet_chunk*)getbytes(sizeof(t_iemnet_chunk)); int i; @@ -136,7 +154,7 @@ int queue_push( int size=_this->size; if(NULL == data) return size; - fprintf(stderr, "pushing %d bytes\n", data->size); + //fprintf(stderr, "pushing %d bytes\n", data->size); n=(t_node*)getbytes(sizeof(t_node)); @@ -154,7 +172,7 @@ int queue_push( size=_this->size; - fprintf(stderr, "pushed %d bytes\n", data->size); + //fprintf(stderr, "pushed %d bytes\n", data->size); pthread_mutex_unlock(&_this->mtx); pthread_cond_signal(&_this->cond); @@ -189,7 +207,7 @@ t_iemnet_chunk* queue_pop( freebytes(head, sizeof(t_node)); head=NULL; } - fprintf(stderr, "popped %d bytes\n", data->size); + //fprintf(stderr, "popped %d bytes\n", data->size); return data; } @@ -281,16 +299,17 @@ static void*iemnet__sender_sendthread(void*arg) { } sender->queue=NULL; queue_destroy(q); + fprintf(stderr, "write thread terminated\n"); return NULL; } int iemnet__sender_send(t_iemnet_sender*s, t_iemnet_chunk*c) { - post("send %x %x", s, c); + //post("send %x %x", s, c); t_queue*q=s->queue; int size=0; - post("queue=%x", q); + //post("queue=%x", q); if(q) { - post("sending data with %d bytes using %x", c->size, s); + // post("sending data with %d bytes using %x", c->size, s); size = queue_push(q, c); } return size; @@ -357,18 +376,54 @@ int iemnet__sender_setsockopt(t_iemnet_sender*s, int level, int optname, const v struct _iemnet_receiver { + pthread_t thread; int sockfd; /* owned outside; you must call iemnet__receiver_destroy() before freeing socket yourself */ void*owner; t_iemnet_chunk*data; t_iemnet_receivecallback callback; + t_queue*queue; + int cont; }; + +/* the workhorse of the family */ +static void*iemnet__receiver_readthread(void*arg) { + t_iemnet_receiver*receiver=(t_iemnet_receiver*)arg; + + int sockfd=receiver->sockfd; + t_queue*q=receiver->queue; + + unsigned char data[INBUFSIZE]; + unsigned int size=INBUFSIZE; + + fprintf(stderr, "read thread started\n"); + + int i=0; + for(i=0; icont) { + int result = recv(sockfd, data, size, 0); + + if(0==result)break; + t_iemnet_chunk*c = iemnet__chunk_create_data(result, data); + + queue_push(q, c); + + // shouldn't we do something with the result here? + } + fprintf(stderr, "read thread terminated\n"); + return NULL; +} + + static void iemnet__receiver_pollfn(t_iemnet_receiver*x, int fd) { int ret = recv(fd, /* socket */ x->data->data, /* buf */ x->data->size, /* len */ 0); /* flags */ + post("pollfn"); + return; if(ret<=0) { sys_rmpollfn(fd); x->callback(x->owner, fd, 0); @@ -384,9 +439,10 @@ static void iemnet__receiver_pollfn(t_iemnet_receiver*x, int fd) { t_iemnet_receiver*iemnet__receiver_create(int sock, void*owner, t_iemnet_receivecallback callback) { t_iemnet_receiver*result=(t_iemnet_receiver*)getbytes(sizeof(t_iemnet_receiver)); - fprintf(stderr, "new receiver for %d\t%x\t%x\n", sock, owner, callback); + //fprintf(stderr, "new receiver for %d\t%x\t%x\n", sock, owner, callback); if(result) { t_iemnet_chunk*data=iemnet__chunk_create_empty(INBUFSIZE); + int res=0; if(NULL==data) { iemnet__receiver_destroy(result); return NULL; @@ -396,9 +452,12 @@ t_iemnet_receiver*iemnet__receiver_create(int sock, void*owner, t_iemnet_receive result->data=data; result->callback=callback; - sys_addpollfn(sock, (t_fdpollfn)iemnet__receiver_pollfn, result); + result->queue = queue_create(); + + result->cont = 1; + res=pthread_create(&result->thread, 0, iemnet__receiver_readthread, result); } - fprintf(stderr, "new receiver created\n"); + //fprintf(stderr, "new receiver created\n"); return result; } @@ -414,6 +473,8 @@ void iemnet__receiver_destroy(t_iemnet_receiver*r) { r->data=NULL; r->callback=NULL; + r->cont = 0; + freebytes(r, sizeof(t_iemnet_receiver)); r=NULL; } diff --git a/tcpserver.c b/tcpserver.c index 70f11ae..ba62ef8 100644 --- a/tcpserver.c +++ b/tcpserver.c @@ -91,7 +91,7 @@ typedef struct _tcpserver t_atom x_addrbytes[4]; } t_tcpserver; -static t_tcpserver_socketreceiver *tcpserver_socketreceiver_new(void *owner, int sockfd); +static t_tcpserver_socketreceiver *tcpserver_socketreceiver_new(void *owner, int sockfd, t_symbol*host); static void tcpserver_socketreceiver_free(t_tcpserver_socketreceiver *x); static void tcpserver_send_client(t_tcpserver *x, t_symbol *s, int argc, t_atom *argv); @@ -105,20 +105,22 @@ static void tcpserver_datacallback(t_tcpserver *x, int sockfd, t_iemnet_chunk*ch static void tcpserver_disconnect_client(t_tcpserver *x, t_floatarg fclient); static void tcpserver_disconnect_socket(t_tcpserver *x, t_floatarg fsocket); static void tcpserver_broadcast(t_tcpserver *x, t_symbol *s, int argc, t_atom *argv); -static void tcpserver_notify(t_tcpserver *x, int socket); static void tcpserver_connectpoll(t_tcpserver *x); static void *tcpserver_new(t_floatarg fportno); static void tcpserver_free(t_tcpserver *x); void tcpserver_setup(void); -static t_tcpserver_socketreceiver *tcpserver_socketreceiver_new(void *owner, int sockfd) +static t_tcpserver_socketreceiver *tcpserver_socketreceiver_new(void *owner, int sockfd, t_symbol*host) { t_tcpserver_socketreceiver *x = (t_tcpserver_socketreceiver *)getbytes(sizeof(*x)); if(NULL==x) { error("%s_socketreceiver: unable to allocate %d bytes", objName, sizeof(*x)); return NULL; } else { + x->sr_host=host; + x->sr_fd=sockfd; + x->sr_sender=iemnet__sender_create(sockfd); x->sr_receiver=iemnet__receiver_create(sockfd, owner, (t_iemnet_receivecallback)tcpserver_datacallback); } @@ -152,7 +154,7 @@ static int tcpserver_socket2index(t_tcpserver*x, int sockfd) /* ---------------- main tcpserver (send) stuff --------------------- */ -static void tcpserver_send_bytes(t_tcpserver*x,int client, t_iemnet_chunk*chunk) +static void tcpserver_send_bytes(t_tcpserver*x, int client, t_iemnet_chunk*chunk) { if(x && x->x_sr && x->x_sr[client]) { t_atom output_atom[3]; @@ -273,14 +275,36 @@ static void tcpserver_send_socket(t_tcpserver *x, t_symbol *s, int argc, t_atom +static void tcpserver_disconnect(t_tcpserver *x, int client) +{ + t_tcpserver_socketreceiver *y=NULL; + int fd=0; + int k; + + y = x->x_sr[client]; + fd = y->sr_fd; + post("closing fd[%d]=%d", client, fd); + + tcpserver_socketreceiver_free(x->x_sr[client]); + x->x_sr[client]=NULL; + sys_closesocket(fd); + + /* rearrange list now: move entries to close the gap */ + for(k = client; k < x->x_nconnections; k++) + { + x->x_sr[k] = x->x_sr[k + 1]; + } + x->x_sr[k + 1]=NULL; + x->x_nconnections--; + + outlet_float(x->x_connectout, x->x_nconnections); +} /* disconnect a client by number */ static void tcpserver_disconnect_client(t_tcpserver *x, t_floatarg fclient) { int client = (int)fclient; - t_tcpserver_socketreceiver *y=NULL; - int fd=0; if(x->x_nconnections <= 0) { @@ -294,11 +318,7 @@ static void tcpserver_disconnect_client(t_tcpserver *x, t_floatarg fclient) } --client; /* zero based index*/ - y = x->x_sr[client]; - fd = y->sr_fd; - tcpserver_notify(x, fd); - sys_rmpollfn(fd); - sys_closesocket(fd); + tcpserver_disconnect(x, client); } @@ -312,29 +332,7 @@ static void tcpserver_disconnect_socket(t_tcpserver *x, t_floatarg fsocket) /* ---------------- main tcpserver (receive) stuff --------------------- */ -static void tcpserver_notify(t_tcpserver *x, int sockfd) -{ - int i, k; - /* remove connection from list */ - for(i = 0; i < x->x_nconnections; i++) - { - if(x->x_sr[i]->sr_fd == sockfd) - { - x->x_nconnections--; - post("%s: \"%s\" removed from list of clients", objName, x->x_sr[i]->sr_host->s_name); - tcpserver_socketreceiver_free(x->x_sr[i]); - x->x_sr[i] = NULL; - - /* rearrange list now: move entries to close the gap */ - for(k = i; k < x->x_nconnections; k++) - { - x->x_sr[k] = x->x_sr[k + 1]; - } - } - } - outlet_float(x->x_connectout, x->x_nconnections); -} static void tcpserver_datacallback(t_tcpserver *x, int sockfd, t_iemnet_chunk*chunk) { post("data callback for %x with data @ %x", x, chunk); @@ -367,7 +365,8 @@ static void tcpserver_connectpoll(t_tcpserver *x) if (fd < 0) post("%s: accept failed", objName); else { - t_tcpserver_socketreceiver *y = tcpserver_socketreceiver_new((void *)x, fd); + t_symbol*host=gensym(inet_ntoa(incomer_address.sin_addr)); + t_tcpserver_socketreceiver *y = tcpserver_socketreceiver_new((void *)x, fd, host); if (!y) { sys_closesocket(fd); @@ -456,7 +455,6 @@ static void tcpserver_free(t_tcpserver *x) tcpserver_socketreceiver_free(x->x_sr[i]); if (x->x_sr[i]->sr_fd >= 0) { - sys_rmpollfn(x->x_sr[i]->sr_fd); sys_closesocket(x->x_sr[i]->sr_fd); } } -- cgit v1.2.1