diff options
Diffstat (limited to 'net')
-rw-r--r-- | net/tcpclient.c | 139 |
1 files changed, 86 insertions, 53 deletions
diff --git a/net/tcpclient.c b/net/tcpclient.c index dc48fc5..a1656ef 100644 --- a/net/tcpclient.c +++ b/net/tcpclient.c @@ -25,7 +25,7 @@ /* Based on PureData by Miller Puckette and others. */ /* */ /* ---------------------------------------------------------------------------- */ -//define DEBUG +//#define DEBUG #include "m_pd.h" #include "s_stuff.h" @@ -55,38 +55,54 @@ static t_class *tcpclient_class; static char objName[] = "tcpclient"; -#define MAX_UDP_RECEIVE 65536L // longer than data in maximum UDP packet +#define MAX_TCPCLIENT_SEND_BUF 65536L // longer than data in maximum UDP packet + +/* each send is handled by a new thread with a new parameter struct: */ +/* these are stored under x->x_tsp[0..MAX_TCPCLIENT_THREADS-1] */ +/* The buffer is preallocated for speed. */ +typedef struct _tcpclient_sender_params +{ + char x_sendbuf[MAX_TCPCLIENT_SEND_BUF]; /* possibly allocate this dynamically for space over speed */ + int x_buf_len; + int x_sendresult; + pthread_t sendthreadid; + struct _tcpclient *x_x; +} t_tcpclient_sender_params; +#define MAX_TCPCLIENT_THREADS 32 +/* MAX_TCPCLIENT_THREADS is small to avoid wasting space. This is the maximum number of concurrent threads. */ typedef struct _tcpclient { - t_object x_obj; - t_clock *x_clock; - t_clock *x_poll; - t_clock *x_sendclock; - t_outlet *x_msgout; - t_outlet *x_addrout; - t_outlet *x_connectout; - t_outlet *x_statusout; - int x_dump; // 1 = hexdump received bytes - int x_verbosity; // 1 = post connection state changes to main window - int x_fd; // the socket - int x_fdbuf; // the socket's buffer size - char *x_hostname; // address we want to connect to as text - int x_connectstate; // 0 = not connected, 1 = connected - int x_port; // port we're connected to - long x_addr; // address we're connected to as 32bit int - t_atom x_addrbytes[4]; // address we're connected to as 4 bytes - t_atom x_msgoutbuf[MAX_UDP_RECEIVE]; // received data as float atoms - unsigned char x_msginbuf[MAX_UDP_RECEIVE]; // received data as bytes - char *x_sendbuf; // pointer to data to send - int x_sendbuf_len; // number of bytes in sendbuf - int x_sendresult; - int x_blocked; + t_object x_obj; + t_clock *x_clock; + t_clock *x_poll; + t_clock *x_sendclock; + t_outlet *x_msgout; + t_outlet *x_addrout; + t_outlet *x_connectout; + t_outlet *x_statusout; + int x_dump; // 1 = hexdump received bytes + int x_verbosity; // 1 = post connection state changes to main window + int x_fd; // the socket + int x_fdbuf; // the socket's buffer size + char *x_hostname; // address we want to connect to as text + int x_connectstate; // 0 = not connected, 1 = connected + int x_port; // port we're connected to + long x_addr; // address we're connected to as 32bit int + t_atom x_addrbytes[4]; // address we're connected to as 4 bytes + t_atom x_msgoutbuf[MAX_TCPCLIENT_SEND_BUF]; // received data as float atoms + unsigned char x_msginbuf[MAX_TCPCLIENT_SEND_BUF]; // received data as bytes + char *x_sendbuf; // pointer to data to send + int x_sendbuf_len; // number of bytes in sendbuf + int x_sendresult; + int x_blocked; /* multithread stuff */ - pthread_t x_threadid; /* id of child thread */ - pthread_attr_t x_threadattr; /* attributes of child thread */ - pthread_t x_sendthreadid; /* id of child thread for sending */ - pthread_attr_t x_sendthreadattr; /* attributes of child thread for sending */ + pthread_t x_threadid; /* id of connector child thread */ + pthread_attr_t x_threadattr; /* attributes of connector child thread */ + pthread_attr_t x_sendthreadattr; /* attributes of all sender child thread for sending */ + int x_nextthread; /* next unused x_tsp */ + t_tcpclient_sender_params x_tsp[MAX_TCPCLIENT_THREADS]; +/* Thread params are used round-robin to avoid overwriting buffers when doing multiple sends */ } t_tcpclient; static void tcpclient_verbosity(t_tcpclient *x, t_float verbosity); @@ -97,7 +113,7 @@ static void *tcpclient_child_connect(void *w); static void tcpclient_connect(t_tcpclient *x, t_symbol *hostname, t_floatarg fportno); static void tcpclient_disconnect(t_tcpclient *x); static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv); -int tcpclient_send_buf(t_tcpclient *x, char *buf, int buf_len); +static int tcpclient_send_buf(t_tcpclient *x, char *buf, int buf_len); static void *tcpclient_child_send(void *w); static void tcpclient_sent(t_tcpclient *x); static int tcpclient_get_socket_send_buf_size(t_tcpclient *x); @@ -227,8 +243,14 @@ static void tcpclient_connect(t_tcpclient *x, t_symbol *hostname, t_floatarg fpo static void tcpclient_disconnect(t_tcpclient *x) { + int i; + if (x->x_fd >= 0) { + for (i = 0; i < MAX_TCPCLIENT_THREADS;++i) + { /* wait for any sender threads to finish */ + while (x->x_tsp[i].sendthreadid != 0); + } sys_closesocket(x->x_fd); x->x_fd = -1; x->x_connectstate = 0; @@ -245,10 +267,7 @@ static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv) int i, j, d; unsigned char c; float f, e; -// char *bp; -// int length; size_t sent = 0; -// int result; char fpath[FILENAME_MAX]; FILE *fptr; @@ -275,7 +294,6 @@ static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv) return; } c = (unsigned char)d; -// if (0 == tcpclient_send_byte(x, c)) break; byte_buf[j++] = c; if (j >= BYTE_BUF_LEN) { @@ -285,7 +303,6 @@ static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv) } else if (argv[i].a_type == A_SYMBOL) { - atom_string(&argv[i], fpath, FILENAME_MAX); fptr = fopen(fpath, "rb"); if (fptr == NULL) @@ -297,14 +314,12 @@ static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv) while ((d = fgetc(fptr)) != EOF) { c = (char)(d & 0x0FF); -// if (0 == tcpclient_send_byte(x, c)) break; byte_buf[j++] = c; if (j >= BYTE_BUF_LEN) { sent += tcpclient_send_buf(x, byte_buf, j); j = 0; } -// ++j; } fclose(fptr); fptr = NULL; @@ -318,14 +333,13 @@ static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv) } if (j > 0) sent += tcpclient_send_buf(x, byte_buf, j); - } -int tcpclient_send_buf(t_tcpclient *x, char *buf, int buf_len) +static int tcpclient_send_buf(t_tcpclient *x, char *buf, int buf_len) { -// fd_set wfds; -// struct timeval timeout; + t_tcpclient_sender_params *tsp = &x->x_tsp[x->x_nextthread]; + int i, max; if (x->x_blocked) return 0; if (x->x_fd < 0) @@ -334,25 +348,35 @@ int tcpclient_send_buf(t_tcpclient *x, char *buf, int buf_len) x->x_blocked++; return 0; } - x->x_sendbuf = buf; - x->x_sendbuf_len = buf_len; - if((x->x_sendresult = pthread_create(&x->x_sendthreadid, &x->x_sendthreadattr, tcpclient_child_send, x)) < 0) + max = (buf_len > MAX_TCPCLIENT_SEND_BUF)? MAX_TCPCLIENT_SEND_BUF: buf_len; + for (i = 0; i < max; ++i) + { + tsp->x_sendbuf[i] = buf[i]; + } + tsp->x_buf_len = i; + x->x_sendbuf_len += i; + tsp->x_x = x; + + if((tsp->x_sendresult = pthread_create(&tsp->sendthreadid, &x->x_sendthreadattr, tcpclient_child_send, tsp)) < 0) { post("%s_send_buf: could not create new thread (%d)", objName); clock_delay(x->x_sendclock, 0); // calls tcpclient_sent return 0; } + x->x_nextthread++; + if (x->x_nextthread > MAX_TCPCLIENT_THREADS) x->x_nextthread = 0; return buf_len; } /* tcpclient_child_send runs in sendthread */ static void *tcpclient_child_send(void *w) { - t_tcpclient *x = (t_tcpclient*) w; + t_tcpclient_sender_params *tsp = (t_tcpclient_sender_params*) w; - x->x_sendresult = send(x->x_fd, x->x_sendbuf, x->x_sendbuf_len, 0); - clock_delay(x->x_sendclock, 0); // calls tcpclient_sent when it's safe to do so - return(x); + tsp->x_sendresult = send(tsp->x_x->x_fd, tsp->x_sendbuf, tsp->x_buf_len, 0); + clock_delay(tsp->x_x->x_sendclock, 0); // calls tcpclient_sent when it's safe to do so + tsp->sendthreadid = 0; + return(tsp); } static void tcpclient_sent(t_tcpclient *x) @@ -371,6 +395,7 @@ static void tcpclient_sent(t_tcpclient *x) { /* assume the message is queued and will be sent real soon now */ SETFLOAT(&output_atom, x->x_sendbuf_len); outlet_anything( x->x_statusout, gensym("sent"), 1, &output_atom); + x->x_sendbuf_len = 0; /* we might be called only once for multiple calls to tcpclient_send_buf */ } else { @@ -419,9 +444,7 @@ static int tcpclient_set_socket_send_buf_size(t_tcpclient *x, int size) /* Get/set the send buffer size of client socket */ static void tcpclient_buf_size(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv) { -// int client = -1; float buf_size = 0; -// t_atom output_atom[3]; if(x->x_connectstate == 0) { @@ -475,7 +498,7 @@ static void tcpclient_rcv(t_tcpclient *x) if(FD_ISSET(sockfd, &readset) || FD_ISSET(sockfd, &exceptset)) { /* read from server */ - ret = recv(sockfd, x->x_msginbuf, MAX_UDP_RECEIVE, 0); + ret = recv(sockfd, x->x_msginbuf, MAX_TCPCLIENT_SEND_BUF, 0); if(ret > 0) { #ifdef DEBUG @@ -543,7 +566,7 @@ static void *tcpclient_new(void) x->x_verbosity = 1; /* default post status changes to main window */ x->x_fd = -1; /* convert the bytes in the buffer to floats in a list */ - for (i = 0; i < MAX_UDP_RECEIVE; ++i) + for (i = 0; i < MAX_TCPCLIENT_SEND_BUF; ++i) { x->x_msgoutbuf[i].a_type = A_FLOAT; x->x_msgoutbuf[i].a_w.w_float = 0; @@ -556,6 +579,7 @@ static void *tcpclient_new(void) x->x_addr = 0L; x->x_blocked = 1; x->x_connectstate = 0; + x->x_nextthread = 0; /* prepare child threads */ if(pthread_attr_init(&x->x_threadattr) < 0) post("%s: warning: could not prepare child thread", objName); @@ -566,7 +590,6 @@ static void *tcpclient_new(void) if(pthread_attr_setdetachstate(&x->x_sendthreadattr, PTHREAD_CREATE_DETACHED) < 0) post("%s: warning: could not prepare child thread", objName); clock_delay(x->x_poll, 0); /* start polling the input */ - post("tcpclient 20110113 Martin Peach-style"); return (x); } @@ -581,6 +604,16 @@ static void tcpclient_free(t_tcpclient *x) void tcpclient_setup(void) { + char aboutStr[MAXPDSTRING]; + + snprintf(aboutStr, MAXPDSTRING, "%s: (GPL) 20111103 Martin Peach, compiled for pd-%d.%d on %s %s", + objName, PD_MAJOR_VERSION, PD_MINOR_VERSION, __DATE__, __TIME__); + +#if PD_MAJOR_VERSION==0 && PD_MINOR_VERSION<43 + post(aboutStr); +#else + logpost(NULL, 3, aboutStr); +#endif tcpclient_class = class_new(gensym(objName), (t_newmethod)tcpclient_new, (t_method)tcpclient_free, sizeof(t_tcpclient), 0, 0); class_addmethod(tcpclient_class, (t_method)tcpclient_connect, gensym("connect") |