diff options
author | Martin Peach <mrpeach@users.sourceforge.net> | 2011-11-03 18:40:32 +0000 |
---|---|---|
committer | Martin Peach <mrpeach@users.sourceforge.net> | 2011-11-03 18:40:32 +0000 |
commit | c1a4500d6c422f5d4a4302210da758379589b044 (patch) | |
tree | a3da5af73377afe30f83182aa6a9a166b052fa91 | |
parent | 102df6d72865b66e534a7188795fc83f6ae2f7d4 (diff) |
To prevent overwriting the send buffer with multiple simultaneous messages, each sender thread now gets its own buffer.
(Usually a multiple message results in two TCP packets, with the first message in the first packet and the rest in the second.)
The buffers are preallocated and assigned to threads in round-robin fashion. There are MAX_TCPCLIENT_THREADS (32) buffers.
Wait for all threads to terminate before disconnecting a socket.
svn path=/trunk/externals/mrpeach/; revision=15700
-rw-r--r-- | net/tcpclient.c | 139 |
1 files changed, 86 insertions, 53 deletions
diff --git a/net/tcpclient.c b/net/tcpclient.c index dc48fc5..a1656ef 100644 --- a/net/tcpclient.c +++ b/net/tcpclient.c @@ -25,7 +25,7 @@ /* Based on PureData by Miller Puckette and others. */ /* */ /* ---------------------------------------------------------------------------- */ -//define DEBUG +//#define DEBUG #include "m_pd.h" #include "s_stuff.h" @@ -55,38 +55,54 @@ static t_class *tcpclient_class; static char objName[] = "tcpclient"; -#define MAX_UDP_RECEIVE 65536L // longer than data in maximum UDP packet +#define MAX_TCPCLIENT_SEND_BUF 65536L // longer than data in maximum UDP packet + +/* each send is handled by a new thread with a new parameter struct: */ +/* these are stored under x->x_tsp[0..MAX_TCPCLIENT_THREADS-1] */ +/* The buffer is preallocated for speed. */ +typedef struct _tcpclient_sender_params +{ + char x_sendbuf[MAX_TCPCLIENT_SEND_BUF]; /* possibly allocate this dynamically for space over speed */ + int x_buf_len; + int x_sendresult; + pthread_t sendthreadid; + struct _tcpclient *x_x; +} t_tcpclient_sender_params; +#define MAX_TCPCLIENT_THREADS 32 +/* MAX_TCPCLIENT_THREADS is small to avoid wasting space. This is the maximum number of concurrent threads. */ typedef struct _tcpclient { - t_object x_obj; - t_clock *x_clock; - t_clock *x_poll; - t_clock *x_sendclock; - t_outlet *x_msgout; - t_outlet *x_addrout; - t_outlet *x_connectout; - t_outlet *x_statusout; - int x_dump; // 1 = hexdump received bytes - int x_verbosity; // 1 = post connection state changes to main window - int x_fd; // the socket - int x_fdbuf; // the socket's buffer size - char *x_hostname; // address we want to connect to as text - int x_connectstate; // 0 = not connected, 1 = connected - int x_port; // port we're connected to - long x_addr; // address we're connected to as 32bit int - t_atom x_addrbytes[4]; // address we're connected to as 4 bytes - t_atom x_msgoutbuf[MAX_UDP_RECEIVE]; // received data as float atoms - unsigned char x_msginbuf[MAX_UDP_RECEIVE]; // received data as bytes - char *x_sendbuf; // pointer to data to send - int x_sendbuf_len; // number of bytes in sendbuf - int x_sendresult; - int x_blocked; + t_object x_obj; + t_clock *x_clock; + t_clock *x_poll; + t_clock *x_sendclock; + t_outlet *x_msgout; + t_outlet *x_addrout; + t_outlet *x_connectout; + t_outlet *x_statusout; + int x_dump; // 1 = hexdump received bytes + int x_verbosity; // 1 = post connection state changes to main window + int x_fd; // the socket + int x_fdbuf; // the socket's buffer size + char *x_hostname; // address we want to connect to as text + int x_connectstate; // 0 = not connected, 1 = connected + int x_port; // port we're connected to + long x_addr; // address we're connected to as 32bit int + t_atom x_addrbytes[4]; // address we're connected to as 4 bytes + t_atom x_msgoutbuf[MAX_TCPCLIENT_SEND_BUF]; // received data as float atoms + unsigned char x_msginbuf[MAX_TCPCLIENT_SEND_BUF]; // received data as bytes + char *x_sendbuf; // pointer to data to send + int x_sendbuf_len; // number of bytes in sendbuf + int x_sendresult; + int x_blocked; /* multithread stuff */ - pthread_t x_threadid; /* id of child thread */ - pthread_attr_t x_threadattr; /* attributes of child thread */ - pthread_t x_sendthreadid; /* id of child thread for sending */ - pthread_attr_t x_sendthreadattr; /* attributes of child thread for sending */ + pthread_t x_threadid; /* id of connector child thread */ + pthread_attr_t x_threadattr; /* attributes of connector child thread */ + pthread_attr_t x_sendthreadattr; /* attributes of all sender child thread for sending */ + int x_nextthread; /* next unused x_tsp */ + t_tcpclient_sender_params x_tsp[MAX_TCPCLIENT_THREADS]; +/* Thread params are used round-robin to avoid overwriting buffers when doing multiple sends */ } t_tcpclient; static void tcpclient_verbosity(t_tcpclient *x, t_float verbosity); @@ -97,7 +113,7 @@ static void *tcpclient_child_connect(void *w); static void tcpclient_connect(t_tcpclient *x, t_symbol *hostname, t_floatarg fportno); static void tcpclient_disconnect(t_tcpclient *x); static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv); -int tcpclient_send_buf(t_tcpclient *x, char *buf, int buf_len); +static int tcpclient_send_buf(t_tcpclient *x, char *buf, int buf_len); static void *tcpclient_child_send(void *w); static void tcpclient_sent(t_tcpclient *x); static int tcpclient_get_socket_send_buf_size(t_tcpclient *x); @@ -227,8 +243,14 @@ static void tcpclient_connect(t_tcpclient *x, t_symbol *hostname, t_floatarg fpo static void tcpclient_disconnect(t_tcpclient *x) { + int i; + if (x->x_fd >= 0) { + for (i = 0; i < MAX_TCPCLIENT_THREADS;++i) + { /* wait for any sender threads to finish */ + while (x->x_tsp[i].sendthreadid != 0); + } sys_closesocket(x->x_fd); x->x_fd = -1; x->x_connectstate = 0; @@ -245,10 +267,7 @@ static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv) int i, j, d; unsigned char c; float f, e; -// char *bp; -// int length; size_t sent = 0; -// int result; char fpath[FILENAME_MAX]; FILE *fptr; @@ -275,7 +294,6 @@ static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv) return; } c = (unsigned char)d; -// if (0 == tcpclient_send_byte(x, c)) break; byte_buf[j++] = c; if (j >= BYTE_BUF_LEN) { @@ -285,7 +303,6 @@ static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv) } else if (argv[i].a_type == A_SYMBOL) { - atom_string(&argv[i], fpath, FILENAME_MAX); fptr = fopen(fpath, "rb"); if (fptr == NULL) @@ -297,14 +314,12 @@ static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv) while ((d = fgetc(fptr)) != EOF) { c = (char)(d & 0x0FF); -// if (0 == tcpclient_send_byte(x, c)) break; byte_buf[j++] = c; if (j >= BYTE_BUF_LEN) { sent += tcpclient_send_buf(x, byte_buf, j); j = 0; } -// ++j; } fclose(fptr); fptr = NULL; @@ -318,14 +333,13 @@ static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv) } if (j > 0) sent += tcpclient_send_buf(x, byte_buf, j); - } -int tcpclient_send_buf(t_tcpclient *x, char *buf, int buf_len) +static int tcpclient_send_buf(t_tcpclient *x, char *buf, int buf_len) { -// fd_set wfds; -// struct timeval timeout; + t_tcpclient_sender_params *tsp = &x->x_tsp[x->x_nextthread]; + int i, max; if (x->x_blocked) return 0; if (x->x_fd < 0) @@ -334,25 +348,35 @@ int tcpclient_send_buf(t_tcpclient *x, char *buf, int buf_len) x->x_blocked++; return 0; } - x->x_sendbuf = buf; - x->x_sendbuf_len = buf_len; - if((x->x_sendresult = pthread_create(&x->x_sendthreadid, &x->x_sendthreadattr, tcpclient_child_send, x)) < 0) + max = (buf_len > MAX_TCPCLIENT_SEND_BUF)? MAX_TCPCLIENT_SEND_BUF: buf_len; + for (i = 0; i < max; ++i) + { + tsp->x_sendbuf[i] = buf[i]; + } + tsp->x_buf_len = i; + x->x_sendbuf_len += i; + tsp->x_x = x; + + if((tsp->x_sendresult = pthread_create(&tsp->sendthreadid, &x->x_sendthreadattr, tcpclient_child_send, tsp)) < 0) { post("%s_send_buf: could not create new thread (%d)", objName); clock_delay(x->x_sendclock, 0); // calls tcpclient_sent return 0; } + x->x_nextthread++; + if (x->x_nextthread > MAX_TCPCLIENT_THREADS) x->x_nextthread = 0; return buf_len; } /* tcpclient_child_send runs in sendthread */ static void *tcpclient_child_send(void *w) { - t_tcpclient *x = (t_tcpclient*) w; + t_tcpclient_sender_params *tsp = (t_tcpclient_sender_params*) w; - x->x_sendresult = send(x->x_fd, x->x_sendbuf, x->x_sendbuf_len, 0); - clock_delay(x->x_sendclock, 0); // calls tcpclient_sent when it's safe to do so - return(x); + tsp->x_sendresult = send(tsp->x_x->x_fd, tsp->x_sendbuf, tsp->x_buf_len, 0); + clock_delay(tsp->x_x->x_sendclock, 0); // calls tcpclient_sent when it's safe to do so + tsp->sendthreadid = 0; + return(tsp); } static void tcpclient_sent(t_tcpclient *x) @@ -371,6 +395,7 @@ static void tcpclient_sent(t_tcpclient *x) { /* assume the message is queued and will be sent real soon now */ SETFLOAT(&output_atom, x->x_sendbuf_len); outlet_anything( x->x_statusout, gensym("sent"), 1, &output_atom); + x->x_sendbuf_len = 0; /* we might be called only once for multiple calls to tcpclient_send_buf */ } else { @@ -419,9 +444,7 @@ static int tcpclient_set_socket_send_buf_size(t_tcpclient *x, int size) /* Get/set the send buffer size of client socket */ static void tcpclient_buf_size(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv) { -// int client = -1; float buf_size = 0; -// t_atom output_atom[3]; if(x->x_connectstate == 0) { @@ -475,7 +498,7 @@ static void tcpclient_rcv(t_tcpclient *x) if(FD_ISSET(sockfd, &readset) || FD_ISSET(sockfd, &exceptset)) { /* read from server */ - ret = recv(sockfd, x->x_msginbuf, MAX_UDP_RECEIVE, 0); + ret = recv(sockfd, x->x_msginbuf, MAX_TCPCLIENT_SEND_BUF, 0); if(ret > 0) { #ifdef DEBUG @@ -543,7 +566,7 @@ static void *tcpclient_new(void) x->x_verbosity = 1; /* default post status changes to main window */ x->x_fd = -1; /* convert the bytes in the buffer to floats in a list */ - for (i = 0; i < MAX_UDP_RECEIVE; ++i) + for (i = 0; i < MAX_TCPCLIENT_SEND_BUF; ++i) { x->x_msgoutbuf[i].a_type = A_FLOAT; x->x_msgoutbuf[i].a_w.w_float = 0; @@ -556,6 +579,7 @@ static void *tcpclient_new(void) x->x_addr = 0L; x->x_blocked = 1; x->x_connectstate = 0; + x->x_nextthread = 0; /* prepare child threads */ if(pthread_attr_init(&x->x_threadattr) < 0) post("%s: warning: could not prepare child thread", objName); @@ -566,7 +590,6 @@ static void *tcpclient_new(void) if(pthread_attr_setdetachstate(&x->x_sendthreadattr, PTHREAD_CREATE_DETACHED) < 0) post("%s: warning: could not prepare child thread", objName); clock_delay(x->x_poll, 0); /* start polling the input */ - post("tcpclient 20110113 Martin Peach-style"); return (x); } @@ -581,6 +604,16 @@ static void tcpclient_free(t_tcpclient *x) void tcpclient_setup(void) { + char aboutStr[MAXPDSTRING]; + + snprintf(aboutStr, MAXPDSTRING, "%s: (GPL) 20111103 Martin Peach, compiled for pd-%d.%d on %s %s", + objName, PD_MAJOR_VERSION, PD_MINOR_VERSION, __DATE__, __TIME__); + +#if PD_MAJOR_VERSION==0 && PD_MINOR_VERSION<43 + post(aboutStr); +#else + logpost(NULL, 3, aboutStr); +#endif tcpclient_class = class_new(gensym(objName), (t_newmethod)tcpclient_new, (t_method)tcpclient_free, sizeof(t_tcpclient), 0, 0); class_addmethod(tcpclient_class, (t_method)tcpclient_connect, gensym("connect") |