aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Peach <mrpeach@users.sourceforge.net>2010-05-08 19:01:24 +0000
committerMartin Peach <mrpeach@users.sourceforge.net>2010-05-08 19:01:24 +0000
commitb1faea3c5f7e832c9e389d712b92131ca86df898 (patch)
treea7ffb2e01b0fd672e0d1dfff87465a9021f73ff4
parente712d1dddbb8fa8d3edf1df63ccc3bc4f672ba19 (diff)
Incorporated ico's patch for threaded broadcasting.
svn path=/trunk/externals/mrpeach/; revision=13510
-rw-r--r--net/tcpserver.c376
1 files changed, 238 insertions, 138 deletions
diff --git a/net/tcpserver.c b/net/tcpserver.c
index dd2bab9..837ea26 100644
--- a/net/tcpserver.c
+++ b/net/tcpserver.c
@@ -1,5 +1,7 @@
-/* tcpserver.c Martin Peach 20100501 */
-/* tcpserver.c is based on netserver: */
+/* tcpserver.c Martin Peach 20100508 */
+/* with contributions from Ivica Ico Bukvic <ico@bukvic.net> */
+/* and IOhannes Zmoelnig zmoelnig@iem.at */
+/* tcpserver.c is based on netserver by Olaf Matthes <olaf.matthes@gmx.de>: */
/* -------------------------- netserver ------------------------------------- */
/* */
/* A server for bidirectional communication from within Pd. */
@@ -110,6 +112,13 @@ typedef struct _tcpserver
char x_msginbuf[MAX_UDP_RECEIVE];
} t_tcpserver;
+typedef struct _tcpserver_broadcast_params
+{
+ t_tcpserver *x;
+ t_int argc;
+ t_atom argv[MAX_UDP_RECEIVE];
+} t_tcpserver_broadcast_params;
+
static t_tcpserver_socketreceiver *tcpserver_socketreceiver_new(void *owner, t_tcpserver_socketnotifier notifier,
t_tcpserver_socketreceivefn socketreceivefn);
static int tcpserver_socketreceiver_doread(t_tcpserver_socketreceiver *x);
@@ -117,11 +126,11 @@ static void tcpserver_socketreceiver_read(t_tcpserver_socketreceiver *x, int fd)
static void tcpserver_socketreceiver_free(t_tcpserver_socketreceiver *x);
static void tcpserver_send(t_tcpserver *x, t_symbol *s, int argc, t_atom *argv);
static void tcpserver_send_bytes(int sockfd, t_tcpserver *x, int argc, t_atom *argv);
-static void tcpserver_broadcast_bytes(t_tcpserver *x, int argc, t_atom *argv);
#ifdef SIOCOUTQ
static int tcpserver_send_buffer_avaliable_for_client(t_tcpserver *x, int client);
#endif
static void *tcpserver_send_buf_thread(void *arg);
+static void *tcpserver_broadcast_thread(void *arg);
static void tcpserver_client_send(t_tcpserver *x, t_symbol *s, int argc, t_atom *argv);
static void tcpserver_output_client_state(t_tcpserver *x, int client);
static int tcpserver_get_socket_send_buf_size(int sockfd);
@@ -523,124 +532,6 @@ static void *tcpserver_send_buf_thread(void *arg)
return NULL;
}
-static void tcpserver_broadcast_bytes(t_tcpserver *x, int argc, t_atom *argv)
-{
-/* send the same buffer multiple times */
- static char byte_buf[MAX_UDP_RECEIVE];// arbitrary maximum similar to max IP packet size
- int i, j, d;
- unsigned char c;
- float f, e;
- int length;
- size_t flen = 0;
- int sockfd = -1;
- char fpath[FILENAME_MAX];
- FILE *fptr;
- t_atom output_atom[3];
- t_tcpserver_send_params *ttsp;
- pthread_t sender_thread;
- pthread_attr_t sender_attr;
- int sender_thread_result;
- int client, buffer_filled = 0;
-
- for(client = 0; client < x->x_nconnections; client++) /* check if connection exists */
- {
- if((sockfd = x->x_sr[client]->sr_fd) >= 0)
- {/* socket exists for this client */
- if (x->x_blocked > 0) goto failed;
- /* sender thread should start out detached so its resouces will be freed when it is done */
- if (0!= (sender_thread_result = pthread_attr_init(&sender_attr)))
- {
- error("%s_broadcast_bytes: pthread_attr_init failed: %d", objName, sender_thread_result);
- goto failed;
- }
- if(0!= (sender_thread_result = pthread_attr_setdetachstate(&sender_attr, PTHREAD_CREATE_DETACHED)))
- {
- error("%s_broadcast_bytes: pthread_attr_setdetachstate failed: %d", objName, sender_thread_result);
- goto failed;
- }
- if (!buffer_filled)
- { /* first client */
- /* process input into byte_buf */
- for (i = j = 0; ((i < argc) && (i < MAX_UDP_RECEIVE)); ++i)
- {
- if (argv[i].a_type == A_FLOAT)
- { /* load floats into buffer as long as they are integers on [0..255]*/
- f = argv[i].a_w.w_float;
- d = (int)f;
- e = f - d;
-#ifdef DEBUG
- post("%s_broadcast_bytes: argv[%d]: float:%f int:%d delta:%f", objName, i, f, d, e);
-#endif
- if (e != 0)
- {
- error("%s_broadcast_bytes: item %d (%f) is not an integer", objName, i, f);
- goto failed;
- }
- if ((d < 0) || (d > 255))
- {
- error("%s_broadcast_bytes: item %d (%f) is not between 0 and 255", objName, i, f);
- goto failed;
- }
- c = (unsigned char)d; /* make sure it doesn't become negative; this only matters for post() */
-#ifdef DEBUG
- post("%s_broadcast_bytes: argv[%d]: %d", objName, i, c);
-#endif
- byte_buf[j++] = c;
- }
- else
- { /* arg was not a float */
- error("%s_broadcast_bytes: item %d is not a float", objName, i);
- goto failed;
- }
- }
- length = j;
- buffer_filled = 1;
- }
- if (length > 0)
- { /* send the buffer */
-#ifdef SIOCOUTQ
- if (tcpserver_send_buffer_avaliable_for_client(x, client) < length)
- {
- error("%s_broadcast_bytes: buffer too small for client(%d)", objName, client);
- goto failed;
- }
-#endif // SIOCOUTQ
- ttsp = (t_tcpserver_send_params *)getbytes(sizeof(t_tcpserver_send_params));
- if (ttsp == NULL)
- {
- error("%s_broadcast_bytes: unable to allocate %d bytes for t_tcpserver_send_params", objName, sizeof(t_tcpserver_send_params));
- goto failed;
- }
- ttsp->client = client;
- ttsp->sockfd = sockfd;
- ttsp->byte_buf = byte_buf;
- ttsp->length = length;
- if ( 0!= (sender_thread_result = pthread_create(&sender_thread, &sender_attr, tcpserver_send_buf_thread, (void *)ttsp)))
- {
- ++x->x_blocked;
- error("%s_broadcast_bytes: couldn't create sender thread (%d)", objName, sender_thread_result);
- freebytes (ttsp, sizeof (t_tcpserver_send_params));
- goto failed;
- }
- flen = length;
- }
- SETFLOAT(&output_atom[0], client+1);
- SETFLOAT(&output_atom[1], flen);
- SETFLOAT(&output_atom[2], sockfd);
- if(x->x_blocked) outlet_anything( x->x_status_outlet, gensym("blocked"), 3, output_atom);
- else outlet_anything( x->x_status_outlet, gensym("sent"), 3, output_atom);
- }
- else post("%s_broadcast_bytes: not a valid socket number (%d)", objName, sockfd);
- }
- return;
-failed:
- SETFLOAT(&output_atom[0], client+1);
- SETFLOAT(&output_atom[1], flen);
- SETFLOAT(&output_atom[2], sockfd);
- if(x->x_blocked) outlet_anything( x->x_status_outlet, gensym("blocked"), 3, output_atom);
- else outlet_anything( x->x_status_outlet, gensym("sent"), 3, output_atom);
-}
-
/* send message to client using socket number */
static void tcpserver_send(t_tcpserver *x, t_symbol *s, int argc, t_atom *argv)
{
@@ -897,23 +788,232 @@ static void tcpserver_buf_size(t_tcpserver *x, t_symbol *s, int argc, t_atom *ar
return;
}
-/* broadcasts a message to all connected clients */
-static void tcpserver_broadcast(t_tcpserver *x, t_symbol *s, int argc, t_atom *argv)
-{
- int client;
- {
- if (argc < MAX_UDP_RECEIVE) tcpserver_broadcast_bytes(x, argc, argv);
- return;
- }
- /* big message: enumerate through the clients and send each the message */
- for(client = 0; client < x->x_nconnections; client++) /* check if connection exists */
- {
- if(x->x_sr[client]->sr_fd >= 0)
- { /* socket exists for this client */
- tcpserver_send_bytes(client, x, argc, argv);
- }
- }
-}
+/* broadcasts messages to all clients */
+/* Ivica Ico Bukvic <ico@bukvic.net> 5/6/10 rewrote broadcast to be xrun-free */
+static void *tcpserver_broadcast_thread(void *arg)
+{
+ t_tcpserver_broadcast_params *ttbp = (t_tcpserver_broadcast_params *)arg;
+ int client;
+ static char byte_buf[MAX_UDP_RECEIVE];// arbitrary maximum similar to max IP packet size
+ int i, j, d;
+ unsigned char c;
+ float f, e;
+ int length;
+ size_t flen = 0;
+ int sockfd = 0;
+ char fpath[FILENAME_MAX];
+ FILE *fptr;
+ t_atom output_atom[3];
+ t_tcpserver_send_params *ttsp;
+ pthread_t sender_thread;
+ pthread_attr_t sender_attr;
+ int sender_thread_result;
+ int result;
+
+ for (i = j = 0; i < ttbp->argc; ++i)
+ {
+ if (ttbp->argv[i].a_type == A_FLOAT)
+ { /* load floats into buffer as long as they are integers on [0..255]*/
+ f = ttbp->argv[i].a_w.w_float;
+ d = (int)f;
+ e = f - d;
+#ifdef DEBUG
+ post("%s: argv[%d]: float:%f int:%d delta:%f", objName, i, f, d, e);
+#endif
+ if (e != 0)
+ {
+ error("%s_broadcast_thread: item %d (%f) is not an integer", objName, i, f);
+ freebytes (arg, sizeof (t_tcpserver_broadcast_params));
+ return NULL;
+ }
+ if ((d < 0) || (d > 255))
+ {
+ error("%s_broadcast_thread: item %d (%f) is not between 0 and 255", objName, i, f);
+ freebytes (arg, sizeof (t_tcpserver_broadcast_params));
+ return NULL;
+ }
+ c = (unsigned char)d; /* make sure it doesn't become negative; this only matters for post() */
+#ifdef DEBUG
+ post("%s_broadcast_thread: argv[%d]: %d", objName, i, c);
+#endif
+ byte_buf[j++] = c;
+ if (j >= MAX_UDP_RECEIVE)
+ { /* if the argument list is longer than our buffer, send the buffer whenever it's full */
+//LOOP
+ /* enumerate through the clients and send each the message */
+ for(client = 0; client < ttbp->x->x_nconnections; client++) /* check if connection exists */
+ {
+#ifdef SIOCOUTQ
+ if (tcpserver_send_buffer_avaliable_for_client(ttbp->x, client) < j)
+ {
+ error("%s_broadcast_thread: buffer too small for client(%d)", objName, client);
+ freebytes (arg, sizeof (t_tcpserver_broadcast_params));
+ return NULL;
+ }
+#endif // SIOCOUTQ
+ if(ttbp->x->x_sr[client]->sr_fd >= 0)
+ { /* socket exists for this client */
+ sockfd = ttbp->x->x_sr[client]->sr_fd;
+ result = send(sockfd, byte_buf, j, 0);
+ if (result <= 0)
+ {
+ sys_sockerror("tcpserver_broadcast_thread: send");
+ post("%s_broadcast_thread: could not send data to client %d", objName, client+1);
+ freebytes (arg, sizeof (t_tcpserver_broadcast_params));
+ return NULL;
+ }
+ }
+ }
+ flen += j;
+ j = 0;
+ }
+ }
+ else if (ttbp->argv[i].a_type == A_SYMBOL)
+ { /* symbols are interpreted to be file names; attempt to load the file and send it */
+ atom_string(&ttbp->argv[i], fpath, FILENAME_MAX);
+#ifdef DEBUG
+ post ("%s_broadcast_thread: fname: %s", objName, fpath);
+#endif
+ fptr = fopen(fpath, "rb");
+ if (fptr == NULL)
+ {
+ error("%s_broadcast_thread: unable to open \"%s\"", objName, fpath);
+ freebytes (arg, sizeof (t_tcpserver_broadcast_params));
+ return NULL;
+ }
+ rewind(fptr);
+#ifdef DEBUG
+ post("%s_broadcast_thread: d is %d", objName, d);
+#endif
+ while ((d = fgetc(fptr)) != EOF)
+ {
+ byte_buf[j++] = (char)(d & 0x0FF);
+#ifdef DEBUG
+ post("%s_broadcast_thread: byte_buf[%d] = %d", objName, j-1, byte_buf[j-1]);
+#endif
+ if (j >= MAX_UDP_RECEIVE)
+ { /* if the file is longer than our buffer, send the buffer whenever it's full */
+//LOOP
+ /* enumerate through the clients and send each the message */
+ for(client = 0; client < ttbp->x->x_nconnections; client++) /* check if connection exists */
+ {
+ /* this might be better than allocating huge amounts of memory */
+#ifdef SIOCOUTQ
+ if (tcpserver_send_buffer_avaliable_for_client(x, client) < j)
+ {
+ error("%s_broadcast_thread: buffer too small for client(%d)", objName, client);
+ freebytes (arg, sizeof (t_tcpserver_broadcast_params));
+ return NULL;
+ }
+#endif // SIOCOUTQ
+ if(ttbp->x->x_sr[client]->sr_fd >= 0)
+ { /* socket exists for this client */
+ sockfd = ttbp->x->x_sr[client]->sr_fd;
+ result = send(sockfd, byte_buf, j, 0);
+ if (result <= 0)
+ {
+ sys_sockerror("tcpserver_broadcast_thread: send");
+ post("%s_broadcast_thread: could not send data to client %d", objName, client+1);
+ freebytes (arg, sizeof (t_tcpserver_broadcast_params));
+ return NULL;
+ }
+ }
+ }
+ flen += j;
+ j = 0;
+ }
+ }
+ flen += j;
+ fclose(fptr);
+ fptr = NULL;
+ post("%s_broadcast_thread: read \"%s\" length %d byte%s", objName, fpath, flen, ((d==1)?"":"s"));
+ }
+ else
+ { /* arg was neither a float nor a valid file name */
+ error("%s_broadcast_thread: item %d is not a float or a file name", objName, i);
+ freebytes (arg, sizeof (t_tcpserver_broadcast_params));
+ return NULL;
+ }
+ }
+ length = j;
+ if (length > 0)
+ { /* send whatever remains in our buffer */
+ /* enumerate through the clients and send each the message */
+ for(client = 0; client < ttbp->x->x_nconnections; client++) /* check if connection exists */
+ {
+#ifdef SIOCOUTQ
+ if (tcpserver_send_buffer_avaliable_for_client(x, client) < length)
+ {
+ error("%s_broadcast_thread: buffer too small for client(%d)", objName, client);
+ freebytes (arg, sizeof (t_tcpserver_broadcast_params));
+ return NULL;
+ }
+#endif // SIOCOUTQ
+ if(ttbp->x->x_sr[client]->sr_fd >= 0)
+ { /* socket exists for this client */
+ sockfd = ttbp->x->x_sr[client]->sr_fd;
+ result = send(sockfd, byte_buf, length, 0);
+ if (result <= 0)
+ {
+ sys_sockerror("tcpserver_broadcast_thread: send");
+ post("%s_broadcast_thread: could not send data to client %d", objName, client+1);
+ freebytes (arg, sizeof (t_tcpserver_broadcast_params));
+ return NULL;
+ }
+ }
+ }
+ flen += length;
+ }
+ else post("%s_broadcast_thread: not a valid socket number (%d)", objName, sockfd);
+ freebytes (arg, sizeof (t_tcpserver_broadcast_params));
+ return NULL;
+}
+
+static void tcpserver_broadcast(t_tcpserver *x, t_symbol *s, int argc, t_atom *argv)
+/* initializes separate thread that broadcasts to all clients */
+/* Ivica Ico Bukvic <ico@bukvic.net> 5/6/10 rewrote broadcast to be xrun-free */
+{
+ int i;
+ t_tcpserver_broadcast_params *ttbp;
+ pthread_t broadcast_thread;
+ pthread_attr_t broadcast_attr;
+ int broadcast_thread_result;
+
+ if (x->x_nconnections != 0)
+ {
+ /* sender thread should start out detached so its resouces will be freed when it is done */
+ if (0!= (broadcast_thread_result = pthread_attr_init(&broadcast_attr)))
+ {
+ error("%s_broadcast: pthread_attr_init failed: %d", objName, broadcast_thread_result);
+ }
+ else
+ {
+ if(0 != (broadcast_thread_result = pthread_attr_setdetachstate(&broadcast_attr, PTHREAD_CREATE_DETACHED)))
+ {
+ error("%s_broadcast: pthread_attr_setdetachstate failed: %d", objName, broadcast_thread_result);
+ }
+ else
+ {
+ ttbp = (t_tcpserver_broadcast_params *)getbytes(sizeof(t_tcpserver_broadcast_params));
+ if (ttbp == NULL)
+ {
+ error("%s_broadcast: unable to allocate %d bytes for t_tcpserver_broadcast_params", objName, sizeof(t_tcpserver_broadcast_params));
+ }
+ else
+ {
+ ttbp->x = x;
+ ttbp->argc = argc;
+ for (i = 0; i < argc; i++) ttbp->argv[i] = argv[i];
+ if (0 != (broadcast_thread_result = pthread_create(&broadcast_thread, &broadcast_attr, tcpserver_broadcast_thread, (void *)ttbp)))
+ {
+ error("%s_broadcast: couldn't create broadcast thread (%d)", objName, broadcast_thread_result);
+ freebytes (ttbp, sizeof (t_tcpserver_broadcast_params));
+ }
+ }
+ }
+ }
+ }
+}
/* ---------------- main tcpserver (receive) stuff --------------------- */
/* tcpserver_notify is called by */