diff options
Diffstat (limited to 'net/tcpclient.c')
-rw-r--r-- | net/tcpclient.c | 145 |
1 files changed, 89 insertions, 56 deletions
diff --git a/net/tcpclient.c b/net/tcpclient.c index 9804db8..d977c86 100644 --- a/net/tcpclient.c +++ b/net/tcpclient.c @@ -62,6 +62,7 @@ typedef struct _tcpclient t_object x_obj; t_clock *x_clock; t_clock *x_poll; + t_clock *x_sendclock; t_outlet *x_msgout; t_outlet *x_addrout; t_outlet *x_connectout; @@ -70,7 +71,6 @@ typedef struct _tcpclient int x_verbosity; // 1 = post connection state changes to main window int x_fd; // the socket int x_fdbuf; // the socket's buffer size - t_int x_timeout_us; /* send timeout in microseconds */ char *x_hostname; // address we want to connect to as text int x_connectstate; // 0 = not connected, 1 = connected int x_port; // port we're connected to @@ -78,12 +78,17 @@ typedef struct _tcpclient t_atom x_addrbytes[4]; // address we're connected to as 4 bytes t_atom x_msgoutbuf[MAX_UDP_RECEIVE]; // received data as float atoms unsigned char x_msginbuf[MAX_UDP_RECEIVE]; // received data as bytes + char *x_sendbuf; // pointer to data to send + int x_sendbuf_len; // number of bytes in sendbuf + int x_sendresult; + int x_blocked; /* multithread stuff */ pthread_t x_threadid; /* id of child thread */ pthread_attr_t x_threadattr; /* attributes of child thread */ + pthread_t x_sendthreadid; /* id of child thread for sending */ + pthread_attr_t x_sendthreadattr; /* attributes of child thread for sending */ } t_tcpclient; -static void tcpclient_timeout(t_tcpclient *x, t_float timeout); static void tcpclient_verbosity(t_tcpclient *x, t_float verbosity); static void tcpclient_dump(t_tcpclient *x, t_float dump); static void tcp_client_hexdump(t_tcpclient *x, long len); @@ -92,30 +97,19 @@ static void *tcpclient_child_connect(void *w); static void tcpclient_connect(t_tcpclient *x, t_symbol *hostname, t_floatarg fportno); static void tcpclient_disconnect(t_tcpclient *x); static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv); -int tcpclient_send_byte(t_tcpclient *x, char byte); +int tcpclient_send_buf(t_tcpclient *x, char *buf, int buf_len); +static void *tcpclient_child_send(void *w); +static void tcpclient_sent(t_tcpclient *x); static int tcpclient_get_socket_send_buf_size(t_tcpclient *x); static int tcpclient_set_socket_send_buf_size(t_tcpclient *x, int size); static void tcpclient_buf_size(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv); static void tcpclient_rcv(t_tcpclient *x); static void tcpclient_poll(t_tcpclient *x); +static void tcpclient_unblock(t_tcpclient *x); static void *tcpclient_new(t_floatarg udpflag); static void tcpclient_free(t_tcpclient *x); void tcpclient_setup(void); -static void tcpclient_timeout(t_tcpclient *x, t_float timeout) -{ - /* set the timeout on the select call in tcpclient_send_byte */ - /* this is the maximum time in microseconds to wait */ - /* before abandoning attempt to send */ - - t_int timeout_us = 0; - if ((timeout >= 0)&&(timeout < 1000000)) - { - timeout_us = (t_int)timeout; - x->x_timeout_us = timeout_us; - } -} - static void tcpclient_dump(t_tcpclient *x, t_float dump) { x->x_dump = (dump == 0)?0:1; @@ -208,6 +202,7 @@ static void *tcpclient_child_connect(void *w) /* outlet_float is not threadsafe ! */ // outlet_float(x->x_obj.ob_outlet, 1); x->x_connectstate = 1; + x->x_blocked = 0; /* use callback instead to set outlet */ clock_delay(x->x_clock, 0); return (x); @@ -240,29 +235,23 @@ static void tcpclient_disconnect(t_tcpclient *x) static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv) { -//#define BYTE_BUF_LEN 65536 // arbitrary maximum similar to max IP packet size -// static char byte_buf[BYTE_BUF_LEN]; +#define BYTE_BUF_LEN 65536 // arbitrary maximum similar to max IP packet size + static char byte_buf[BYTE_BUF_LEN]; int i, j, d; unsigned char c; float f, e; char *bp; int length; - size_t sent; + size_t sent = 0; int result; char fpath[FILENAME_MAX]; FILE *fptr; - t_atom output_atom; #ifdef DEBUG post("s: %s", s->s_name); post("argc: %d", argc); #endif - if (x->x_fd < 0) - { - error("%s: not connected", objName); - return; - } for (i = j = 0; i < argc; ++i) { if (argv[i].a_type == A_FLOAT) @@ -281,8 +270,13 @@ static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv) return; } c = (unsigned char)d; - if (0 == tcpclient_send_byte(x, c)) break; - ++j; +// if (0 == tcpclient_send_byte(x, c)) break; + byte_buf[j++] = c; + if (j >= BYTE_BUF_LEN) + { + sent += tcpclient_send_buf(x, byte_buf, j); + j = 0; + } } else if (argv[i].a_type == A_SYMBOL) { @@ -298,8 +292,14 @@ static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv) while ((d = fgetc(fptr)) != EOF) { c = (char)(d & 0x0FF); - if (0 == tcpclient_send_byte(x, c)) break; - ++j; +// if (0 == tcpclient_send_byte(x, c)) break; + byte_buf[j++] = c; + if (j >= BYTE_BUF_LEN) + { + sent += tcpclient_send_buf(x, byte_buf, j); + j = 0; + } +// ++j; } fclose(fptr); fptr = NULL; @@ -311,38 +311,62 @@ static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv) return; } } - sent = j; - SETFLOAT(&output_atom, sent); - outlet_anything( x->x_statusout, gensym("sent"), 1, &output_atom); + if (j > 0) + sent += tcpclient_send_buf(x, byte_buf, j); + } -int tcpclient_send_byte(t_tcpclient *x, char byte) + +int tcpclient_send_buf(t_tcpclient *x, char *buf, int buf_len) { - int result = 0; fd_set wfds; struct timeval timeout; - FD_ZERO(&wfds); - FD_SET(x->x_fd, &wfds); - timeout.tv_sec = 0; - timeout.tv_usec = x->x_timeout_us; /* give it about a millisecond to clear buffer */ - result = select(x->x_fd+1, NULL, &wfds, NULL, &timeout); - if (result == -1) + if (x->x_blocked) return 0; + if (x->x_fd < 0) { - post("%s_send_byte: select returned error %d", objName, errno); + error("%s: not connected", objName); + x->x_blocked++; return 0; } - if (FD_ISSET(x->x_fd, &wfds)) + x->x_sendbuf = buf; + x->x_sendbuf_len = buf_len; + if((x->x_sendresult = pthread_create(&x->x_sendthreadid, &x->x_sendthreadattr, tcpclient_child_send, x)) < 0) { - result = send(x->x_fd, &byte, 1, 0); - if (result <= 0) - { - sys_sockerror("tcpclient: send"); - post("%s_send_byte: could not send data ", objName); - return 0; - } + post("%s_send_buf: could not create new thread (%d)", objName); + clock_delay(x->x_sendclock, 0); // calls tcpclient_sent + return 0; + } + return buf_len; +} + +/* tcpclient_child_send runs in sendthread */ +static void *tcpclient_child_send(void *w) +{ + t_tcpclient *x = (t_tcpclient*) w; + + x->x_sendresult = send(x->x_fd, x->x_sendbuf, x->x_sendbuf_len, 0); + clock_delay(x->x_sendclock, 0); // calls tcpclient_sent when it's safe to do so + return(x); +} + +static void tcpclient_sent(t_tcpclient *x) +{ + t_atom output_atom; + + if (x->x_sendresult <= 0) + { + sys_sockerror("tcpclient: send"); + post("%s_send_byte: could not send data ", objName); + x->x_blocked++; + SETFLOAT(&output_atom, x->x_sendresult); + outlet_anything( x->x_statusout, gensym("blocked"), 1, &output_atom); + } + else + { + SETFLOAT(&output_atom, x->x_sendresult); + outlet_anything( x->x_statusout, gensym("sent"), 1, &output_atom); } - return result; } /* Return the send buffer size of socket, also output it on status outlet */ @@ -489,6 +513,11 @@ static void tcpclient_poll(t_tcpclient *x) clock_delay(x->x_poll, DEFPOLLTIME); /* see you later */ } +static void tcpclient_unblock(t_tcpclient *x) +{ + x->x_blocked = 0; +} + static void *tcpclient_new(t_floatarg udpflag) { int i; @@ -498,6 +527,7 @@ static void *tcpclient_new(t_floatarg udpflag) x->x_addrout = outlet_new(&x->x_obj, &s_list); x->x_connectout = outlet_new(&x->x_obj, &s_float); /* connection state */ x->x_statusout = outlet_new(&x->x_obj, &s_anything);/* last outlet for everything else */ + x->x_sendclock = clock_new(x, (t_method)tcpclient_sent); x->x_clock = clock_new(x, (t_method)tcpclient_tick); x->x_poll = clock_new(x, (t_method)tcpclient_poll); x->x_verbosity = 1; /* default post status changes to main window */ @@ -514,13 +544,18 @@ static void *tcpclient_new(t_floatarg udpflag) x->x_addrbytes[i].a_w.w_float = 0; } x->x_addr = 0L; - x->x_timeout_us = 1000; /* set a default 1ms send timeout */ - /* prepare child thread */ + x->x_blocked = 1; + /* prepare child threads */ if(pthread_attr_init(&x->x_threadattr) < 0) post("%s: warning: could not prepare child thread", objName); if(pthread_attr_setdetachstate(&x->x_threadattr, PTHREAD_CREATE_DETACHED) < 0) post("%s: warning: could not prepare child thread", objName); + if(pthread_attr_init(&x->x_sendthreadattr) < 0) + post("%s: warning: could not prepare child thread", objName); + if(pthread_attr_setdetachstate(&x->x_sendthreadattr, PTHREAD_CREATE_DETACHED) < 0) + post("%s: warning: could not prepare child thread", objName); clock_delay(x->x_poll, 0); /* start polling the input */ + post("tcpclient 2010 Martin Peach-style"); return (x); } @@ -543,11 +578,9 @@ void tcpclient_setup(void) class_addmethod(tcpclient_class, (t_method)tcpclient_disconnect, gensym("disconnect"), 0); class_addmethod(tcpclient_class, (t_method)tcpclient_send, gensym("send"), A_GIMME, 0); class_addmethod(tcpclient_class, (t_method)tcpclient_buf_size, gensym("buf"), A_GIMME, 0); - class_addmethod(tcpclient_class, (t_method)tcpclient_rcv, gensym("receive"), 0); - class_addmethod(tcpclient_class, (t_method)tcpclient_rcv, gensym("rcv"), 0); + class_addmethod(tcpclient_class, (t_method)tcpclient_unblock, gensym("unblock"), 0); class_addmethod(tcpclient_class, (t_method)tcpclient_verbosity, gensym("verbosity"), A_FLOAT, 0); class_addmethod(tcpclient_class, (t_method)tcpclient_dump, gensym("dump"), A_FLOAT, 0); - class_addmethod(tcpclient_class, (t_method)tcpclient_timeout, gensym("timeout"), A_FLOAT, 0); class_addlist(tcpclient_class, (t_method)tcpclient_send); } |