From e712d1dddbb8fa8d3edf1df63ccc3bc4f672ba19 Mon Sep 17 00:00:00 2001 From: Martin Peach Date: Thu, 6 May 2010 04:51:48 +0000 Subject: Added tcpserver_broadcast_bytes() to speed up broadcast by resending the buffer if < 65536 bytes svn path=/trunk/externals/mrpeach/; revision=13498 --- net/tcpserver.c | 125 +++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 124 insertions(+), 1 deletion(-) diff --git a/net/tcpserver.c b/net/tcpserver.c index 677ba2f..dd2bab9 100644 --- a/net/tcpserver.c +++ b/net/tcpserver.c @@ -117,6 +117,7 @@ static void tcpserver_socketreceiver_read(t_tcpserver_socketreceiver *x, int fd) static void tcpserver_socketreceiver_free(t_tcpserver_socketreceiver *x); static void tcpserver_send(t_tcpserver *x, t_symbol *s, int argc, t_atom *argv); static void tcpserver_send_bytes(int sockfd, t_tcpserver *x, int argc, t_atom *argv); +static void tcpserver_broadcast_bytes(t_tcpserver *x, int argc, t_atom *argv); #ifdef SIOCOUTQ static int tcpserver_send_buffer_avaliable_for_client(t_tcpserver *x, int client); #endif @@ -522,6 +523,124 @@ static void *tcpserver_send_buf_thread(void *arg) return NULL; } +static void tcpserver_broadcast_bytes(t_tcpserver *x, int argc, t_atom *argv) +{ +/* send the same buffer multiple times */ + static char byte_buf[MAX_UDP_RECEIVE];// arbitrary maximum similar to max IP packet size + int i, j, d; + unsigned char c; + float f, e; + int length; + size_t flen = 0; + int sockfd = -1; + char fpath[FILENAME_MAX]; + FILE *fptr; + t_atom output_atom[3]; + t_tcpserver_send_params *ttsp; + pthread_t sender_thread; + pthread_attr_t sender_attr; + int sender_thread_result; + int client, buffer_filled = 0; + + for(client = 0; client < x->x_nconnections; client++) /* check if connection exists */ + { + if((sockfd = x->x_sr[client]->sr_fd) >= 0) + {/* socket exists for this client */ + if (x->x_blocked > 0) goto failed; + /* sender thread should start out detached so its resouces will be freed when it is done */ + if (0!= (sender_thread_result = pthread_attr_init(&sender_attr))) + { + error("%s_broadcast_bytes: pthread_attr_init failed: %d", objName, sender_thread_result); + goto failed; + } + if(0!= (sender_thread_result = pthread_attr_setdetachstate(&sender_attr, PTHREAD_CREATE_DETACHED))) + { + error("%s_broadcast_bytes: pthread_attr_setdetachstate failed: %d", objName, sender_thread_result); + goto failed; + } + if (!buffer_filled) + { /* first client */ + /* process input into byte_buf */ + for (i = j = 0; ((i < argc) && (i < MAX_UDP_RECEIVE)); ++i) + { + if (argv[i].a_type == A_FLOAT) + { /* load floats into buffer as long as they are integers on [0..255]*/ + f = argv[i].a_w.w_float; + d = (int)f; + e = f - d; +#ifdef DEBUG + post("%s_broadcast_bytes: argv[%d]: float:%f int:%d delta:%f", objName, i, f, d, e); +#endif + if (e != 0) + { + error("%s_broadcast_bytes: item %d (%f) is not an integer", objName, i, f); + goto failed; + } + if ((d < 0) || (d > 255)) + { + error("%s_broadcast_bytes: item %d (%f) is not between 0 and 255", objName, i, f); + goto failed; + } + c = (unsigned char)d; /* make sure it doesn't become negative; this only matters for post() */ +#ifdef DEBUG + post("%s_broadcast_bytes: argv[%d]: %d", objName, i, c); +#endif + byte_buf[j++] = c; + } + else + { /* arg was not a float */ + error("%s_broadcast_bytes: item %d is not a float", objName, i); + goto failed; + } + } + length = j; + buffer_filled = 1; + } + if (length > 0) + { /* send the buffer */ +#ifdef SIOCOUTQ + if (tcpserver_send_buffer_avaliable_for_client(x, client) < length) + { + error("%s_broadcast_bytes: buffer too small for client(%d)", objName, client); + goto failed; + } +#endif // SIOCOUTQ + ttsp = (t_tcpserver_send_params *)getbytes(sizeof(t_tcpserver_send_params)); + if (ttsp == NULL) + { + error("%s_broadcast_bytes: unable to allocate %d bytes for t_tcpserver_send_params", objName, sizeof(t_tcpserver_send_params)); + goto failed; + } + ttsp->client = client; + ttsp->sockfd = sockfd; + ttsp->byte_buf = byte_buf; + ttsp->length = length; + if ( 0!= (sender_thread_result = pthread_create(&sender_thread, &sender_attr, tcpserver_send_buf_thread, (void *)ttsp))) + { + ++x->x_blocked; + error("%s_broadcast_bytes: couldn't create sender thread (%d)", objName, sender_thread_result); + freebytes (ttsp, sizeof (t_tcpserver_send_params)); + goto failed; + } + flen = length; + } + SETFLOAT(&output_atom[0], client+1); + SETFLOAT(&output_atom[1], flen); + SETFLOAT(&output_atom[2], sockfd); + if(x->x_blocked) outlet_anything( x->x_status_outlet, gensym("blocked"), 3, output_atom); + else outlet_anything( x->x_status_outlet, gensym("sent"), 3, output_atom); + } + else post("%s_broadcast_bytes: not a valid socket number (%d)", objName, sockfd); + } + return; +failed: + SETFLOAT(&output_atom[0], client+1); + SETFLOAT(&output_atom[1], flen); + SETFLOAT(&output_atom[2], sockfd); + if(x->x_blocked) outlet_anything( x->x_status_outlet, gensym("blocked"), 3, output_atom); + else outlet_anything( x->x_status_outlet, gensym("sent"), 3, output_atom); +} + /* send message to client using socket number */ static void tcpserver_send(t_tcpserver *x, t_symbol *s, int argc, t_atom *argv) { @@ -782,7 +901,11 @@ static void tcpserver_buf_size(t_tcpserver *x, t_symbol *s, int argc, t_atom *ar static void tcpserver_broadcast(t_tcpserver *x, t_symbol *s, int argc, t_atom *argv) { int client; - /* enumerate through the clients and send each the message */ + { + if (argc < MAX_UDP_RECEIVE) tcpserver_broadcast_bytes(x, argc, argv); + return; + } + /* big message: enumerate through the clients and send each the message */ for(client = 0; client < x->x_nconnections; client++) /* check if connection exists */ { if(x->x_sr[client]->sr_fd >= 0) -- cgit v1.2.1