aboutsummaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
authorMartin Peach <mrpeach@users.sourceforge.net>2010-05-06 04:51:48 +0000
committerMartin Peach <mrpeach@users.sourceforge.net>2010-05-06 04:51:48 +0000
commite712d1dddbb8fa8d3edf1df63ccc3bc4f672ba19 (patch)
treef0bdaee7b3cb16c3ea9a47b1970b2dec1a1f8b9f /net
parentb6e942e62e63d96c92e12f7ce624bf8a6a03e782 (diff)
Added tcpserver_broadcast_bytes() to speed up broadcast by resending the buffer if < 65536 bytes
svn path=/trunk/externals/mrpeach/; revision=13498
Diffstat (limited to 'net')
-rw-r--r--net/tcpserver.c125
1 files changed, 124 insertions, 1 deletions
diff --git a/net/tcpserver.c b/net/tcpserver.c
index 677ba2f..dd2bab9 100644
--- a/net/tcpserver.c
+++ b/net/tcpserver.c
@@ -117,6 +117,7 @@ static void tcpserver_socketreceiver_read(t_tcpserver_socketreceiver *x, int fd)
static void tcpserver_socketreceiver_free(t_tcpserver_socketreceiver *x);
static void tcpserver_send(t_tcpserver *x, t_symbol *s, int argc, t_atom *argv);
static void tcpserver_send_bytes(int sockfd, t_tcpserver *x, int argc, t_atom *argv);
+static void tcpserver_broadcast_bytes(t_tcpserver *x, int argc, t_atom *argv);
#ifdef SIOCOUTQ
static int tcpserver_send_buffer_avaliable_for_client(t_tcpserver *x, int client);
#endif
@@ -522,6 +523,124 @@ static void *tcpserver_send_buf_thread(void *arg)
return NULL;
}
+static void tcpserver_broadcast_bytes(t_tcpserver *x, int argc, t_atom *argv)
+{
+/* send the same buffer multiple times */
+ static char byte_buf[MAX_UDP_RECEIVE];// arbitrary maximum similar to max IP packet size
+ int i, j, d;
+ unsigned char c;
+ float f, e;
+ int length;
+ size_t flen = 0;
+ int sockfd = -1;
+ char fpath[FILENAME_MAX];
+ FILE *fptr;
+ t_atom output_atom[3];
+ t_tcpserver_send_params *ttsp;
+ pthread_t sender_thread;
+ pthread_attr_t sender_attr;
+ int sender_thread_result;
+ int client, buffer_filled = 0;
+
+ for(client = 0; client < x->x_nconnections; client++) /* check if connection exists */
+ {
+ if((sockfd = x->x_sr[client]->sr_fd) >= 0)
+ {/* socket exists for this client */
+ if (x->x_blocked > 0) goto failed;
+ /* sender thread should start out detached so its resouces will be freed when it is done */
+ if (0!= (sender_thread_result = pthread_attr_init(&sender_attr)))
+ {
+ error("%s_broadcast_bytes: pthread_attr_init failed: %d", objName, sender_thread_result);
+ goto failed;
+ }
+ if(0!= (sender_thread_result = pthread_attr_setdetachstate(&sender_attr, PTHREAD_CREATE_DETACHED)))
+ {
+ error("%s_broadcast_bytes: pthread_attr_setdetachstate failed: %d", objName, sender_thread_result);
+ goto failed;
+ }
+ if (!buffer_filled)
+ { /* first client */
+ /* process input into byte_buf */
+ for (i = j = 0; ((i < argc) && (i < MAX_UDP_RECEIVE)); ++i)
+ {
+ if (argv[i].a_type == A_FLOAT)
+ { /* load floats into buffer as long as they are integers on [0..255]*/
+ f = argv[i].a_w.w_float;
+ d = (int)f;
+ e = f - d;
+#ifdef DEBUG
+ post("%s_broadcast_bytes: argv[%d]: float:%f int:%d delta:%f", objName, i, f, d, e);
+#endif
+ if (e != 0)
+ {
+ error("%s_broadcast_bytes: item %d (%f) is not an integer", objName, i, f);
+ goto failed;
+ }
+ if ((d < 0) || (d > 255))
+ {
+ error("%s_broadcast_bytes: item %d (%f) is not between 0 and 255", objName, i, f);
+ goto failed;
+ }
+ c = (unsigned char)d; /* make sure it doesn't become negative; this only matters for post() */
+#ifdef DEBUG
+ post("%s_broadcast_bytes: argv[%d]: %d", objName, i, c);
+#endif
+ byte_buf[j++] = c;
+ }
+ else
+ { /* arg was not a float */
+ error("%s_broadcast_bytes: item %d is not a float", objName, i);
+ goto failed;
+ }
+ }
+ length = j;
+ buffer_filled = 1;
+ }
+ if (length > 0)
+ { /* send the buffer */
+#ifdef SIOCOUTQ
+ if (tcpserver_send_buffer_avaliable_for_client(x, client) < length)
+ {
+ error("%s_broadcast_bytes: buffer too small for client(%d)", objName, client);
+ goto failed;
+ }
+#endif // SIOCOUTQ
+ ttsp = (t_tcpserver_send_params *)getbytes(sizeof(t_tcpserver_send_params));
+ if (ttsp == NULL)
+ {
+ error("%s_broadcast_bytes: unable to allocate %d bytes for t_tcpserver_send_params", objName, sizeof(t_tcpserver_send_params));
+ goto failed;
+ }
+ ttsp->client = client;
+ ttsp->sockfd = sockfd;
+ ttsp->byte_buf = byte_buf;
+ ttsp->length = length;
+ if ( 0!= (sender_thread_result = pthread_create(&sender_thread, &sender_attr, tcpserver_send_buf_thread, (void *)ttsp)))
+ {
+ ++x->x_blocked;
+ error("%s_broadcast_bytes: couldn't create sender thread (%d)", objName, sender_thread_result);
+ freebytes (ttsp, sizeof (t_tcpserver_send_params));
+ goto failed;
+ }
+ flen = length;
+ }
+ SETFLOAT(&output_atom[0], client+1);
+ SETFLOAT(&output_atom[1], flen);
+ SETFLOAT(&output_atom[2], sockfd);
+ if(x->x_blocked) outlet_anything( x->x_status_outlet, gensym("blocked"), 3, output_atom);
+ else outlet_anything( x->x_status_outlet, gensym("sent"), 3, output_atom);
+ }
+ else post("%s_broadcast_bytes: not a valid socket number (%d)", objName, sockfd);
+ }
+ return;
+failed:
+ SETFLOAT(&output_atom[0], client+1);
+ SETFLOAT(&output_atom[1], flen);
+ SETFLOAT(&output_atom[2], sockfd);
+ if(x->x_blocked) outlet_anything( x->x_status_outlet, gensym("blocked"), 3, output_atom);
+ else outlet_anything( x->x_status_outlet, gensym("sent"), 3, output_atom);
+}
+
/* send message to client using socket number */
static void tcpserver_send(t_tcpserver *x, t_symbol *s, int argc, t_atom *argv)
{
@@ -782,7 +901,11 @@ static void tcpserver_buf_size(t_tcpserver *x, t_symbol *s, int argc, t_atom *ar
static void tcpserver_broadcast(t_tcpserver *x, t_symbol *s, int argc, t_atom *argv)
{
int client;
- /* enumerate through the clients and send each the message */
+ {
+ if (argc < MAX_UDP_RECEIVE) tcpserver_broadcast_bytes(x, argc, argv);
+ return;
+ }
+ /* big message: enumerate through the clients and send each the message */
for(client = 0; client < x->x_nconnections; client++) /* check if connection exists */
{
if(x->x_sr[client]->sr_fd >= 0)