From a84d0bd1965c0b1ce7d9b9d9803cc6b734d33bcb Mon Sep 17 00:00:00 2001 From: Martin Peach Date: Fri, 26 Mar 2010 18:41:09 +0000 Subject: Sends all bytes in one block in a single thread, blocks further sends if one doesn't work until [unblock( message received. Removed [receive(, [recv(, [timeout( message methods as they are not relevant. svn path=/trunk/externals/mrpeach/; revision=13290 --- net/tcpclient-help.pd | 122 +++++++++++++++++++++--------------------- net/tcpclient.c | 145 +++++++++++++++++++++++++++++++------------------- 2 files changed, 150 insertions(+), 117 deletions(-) diff --git a/net/tcpclient-help.pd b/net/tcpclient-help.pd index 1803d01..ff55c9a 100644 --- a/net/tcpclient-help.pd +++ b/net/tcpclient-help.pd @@ -6,15 +6,12 @@ #X floatatom 282 462 3 0 0 0 - - -; #X floatatom 314 462 3 0 0 0 - - -; #X text 179 461 from; -#X msg -175 -7 connect 132.205.142.12 80; #X obj 199 387 tcpclient; -#X obj 239 413 tgl 15 0 empty empty connected 18 7 0 8 -24198 -241291 --1 0 1; +#X obj 239 413 tgl 15 0 empty empty connected 18 7 0 8 -24198 -13381 +-1 1 1; #X msg -88 80 dump \$1; -#X obj -140 65 tgl 15 0 empty empty empty 0 -6 0 8 -4034 -257985 -1 -1 1; -#X msg 11 179 receive; -#X msg 35 203 recv; +#X obj -140 65 tgl 15 0 empty empty empty 0 -6 0 8 -4034 -13381 -1 +0 1; #X text -59 -58 connect with an IP address and port number; #X msg -200 -32 connect www.concordia.ca 80; #X text -17 79 print received messages to main window in hexdump format @@ -28,11 +25,7 @@ on; #X text -214 374 can receive messages from tcpclient; #X text -136 328 is what tcpclient is based on; #X text 250 513 Received messages are output as a list of bytes; -#X text 77 176 get any received data (not useful unless you need it -faster than once per 20ms); -#X text 220 225 semicolon-terminated string for netserver or netreceive -; -#X msg 59 227 send 49 127 128 51 59; +#X msg 83 251 send 49 127 128 51 59; #X obj -84 352 tcpserver; #X text -118 351 and; #X text 347 -55 tcpclient can connect to a server and send and receive @@ -41,68 +34,75 @@ be transmitted or received.; #X msg -63 105 send ../doc/5.reference/test.txt; #X obj -15 129 openpanel; #X msg -15 153 send \$1; -#X obj -101 114 bng 15 250 50 0 empty empty empty 17 7 0 10 -24198 --241291 -1; +#X obj -101 114 bng 15 250 50 0 empty empty empty 17 7 0 10 -4034 -13381 +-1; #X text 172 105 send a file; #X text 62 128 ...any file; -#X msg 99 251 71 69 84 32 104 116 116 112 58 47 47 47 105 110 100 101 -120 46 104 116 109 108 13 10; -#X text 529 257 'send' prefix is optional; +#X msg 122 274 71 69 84 32 104 116 116 112 58 47 47 47 105 110 100 +101 120 46 104 116 109 108 13 10; +#X text 545 281 'send' prefix is optional; #X obj 199 514 spigot; -#X obj 238 491 tgl 15 0 empty empty enable_print 18 7 0 8 -24198 -241291 --1 0 1; +#X obj 238 491 tgl 15 0 empty empty enable_print 18 7 0 8 -24198 -13381 +-1 1 1; #X obj 199 543 print >>>; #X text 272 24 GET http:///index.phpCRLF; -#X floatatom 374 433 9 0 0 0 - - -; -#X text 173 291 set send-buffer size; -#X obj 374 407 route sent buf; -#X floatatom 421 457 9 0 0 0 - - -; -#X text 491 456 Size of the send buffer; -#X text 448 432 Number of bytes sent (may still be in buffer); -#X msg 147 315 buf; -#X text 177 314 get send-buffer size; -#X msg 123 291 buf 10; -#X msg 170 338 timeout 100; -#X text 258 337 set send timeout in microseconds (default is 1000) -; +#X floatatom 374 483 9 0 0 0 - - -; +#X text 196 314 set send-buffer size; +#X floatatom 424 457 9 0 0 0 - - -; +#X text 493 456 Size of the send buffer; +#X msg 170 338 buf; +#X text 200 337 get send-buffer size; +#X msg 146 314 buf 10; #X msg 194 362 verbosity \$1; -#X obj 139 347 tgl 15 1 empty empty empty 0 -6 0 8 -4034 -257985 -1 +#X obj 139 347 tgl 15 1 empty empty empty 0 -6 0 8 -4034 -13381 -1 0 1; #X text 289 362 print connection status messages to main window (default) ; #X msg -136 16 send 71 69 84 32 104 116 116 112 58 47 47 47 105 110 100 101 120 46 112 104 112 13 10 13 10; -#X text -208 540 2010/03/01 Martin Peach; #X text 271 542 Attempting to print long messages can hang Pd!; -#X connect 0 0 8 0; +#X msg 11 179 unblock; +#X obj 374 407 route sent buf blocked; +#X text -208 540 2010/03/26 Martin Peach; +#X msg -175 -7 connect 132.205.142.11 9997; +#X msg -34 207 hello \;; +#X obj 59 227 str to_list; +#X text 158 225 semicolon-terminated strings for netserver or netreceive +; +#X text 73 175 if the tcpclient sender blocks for any reason \, it +must be unblocked manually; +#X text 444 482 Number of bytes sent; +#X obj 474 433 print sender_blocked!; +#X connect 0 0 7 0; #X connect 1 0 2 0; #X connect 1 1 3 0; #X connect 1 2 4 0; #X connect 1 3 5 0; -#X connect 7 0 8 0; -#X connect 8 0 39 0; -#X connect 8 1 1 0; -#X connect 8 2 9 0; -#X connect 8 3 45 0; -#X connect 10 0 8 0; -#X connect 11 0 10 0; -#X connect 12 0 8 0; -#X connect 13 0 8 0; -#X connect 15 0 8 0; -#X connect 20 0 8 0; -#X connect 27 0 8 0; -#X connect 31 0 8 0; -#X connect 32 0 33 0; -#X connect 33 0 8 0; -#X connect 34 0 32 0; -#X connect 37 0 8 0; -#X connect 39 0 41 0; -#X connect 40 0 39 1; -#X connect 45 0 43 0; -#X connect 45 1 46 0; -#X connect 49 0 8 0; -#X connect 51 0 8 0; -#X connect 52 0 8 0; -#X connect 54 0 8 0; -#X connect 55 0 54 0; -#X connect 57 0 8 0; +#X connect 7 0 34 0; +#X connect 7 1 1 0; +#X connect 7 2 8 0; +#X connect 7 3 51 0; +#X connect 9 0 7 0; +#X connect 10 0 9 0; +#X connect 12 0 7 0; +#X connect 17 0 7 0; +#X connect 22 0 7 0; +#X connect 26 0 7 0; +#X connect 27 0 28 0; +#X connect 28 0 7 0; +#X connect 29 0 27 0; +#X connect 32 0 7 0; +#X connect 34 0 36 0; +#X connect 35 0 34 1; +#X connect 42 0 7 0; +#X connect 44 0 7 0; +#X connect 45 0 7 0; +#X connect 46 0 45 0; +#X connect 48 0 7 0; +#X connect 50 0 7 0; +#X connect 51 0 38 0; +#X connect 51 1 40 0; +#X connect 51 2 59 0; +#X connect 53 0 7 0; +#X connect 54 0 55 0; +#X connect 55 0 7 0; diff --git a/net/tcpclient.c b/net/tcpclient.c index 9804db8..d977c86 100644 --- a/net/tcpclient.c +++ b/net/tcpclient.c @@ -62,6 +62,7 @@ typedef struct _tcpclient t_object x_obj; t_clock *x_clock; t_clock *x_poll; + t_clock *x_sendclock; t_outlet *x_msgout; t_outlet *x_addrout; t_outlet *x_connectout; @@ -70,7 +71,6 @@ typedef struct _tcpclient int x_verbosity; // 1 = post connection state changes to main window int x_fd; // the socket int x_fdbuf; // the socket's buffer size - t_int x_timeout_us; /* send timeout in microseconds */ char *x_hostname; // address we want to connect to as text int x_connectstate; // 0 = not connected, 1 = connected int x_port; // port we're connected to @@ -78,12 +78,17 @@ typedef struct _tcpclient t_atom x_addrbytes[4]; // address we're connected to as 4 bytes t_atom x_msgoutbuf[MAX_UDP_RECEIVE]; // received data as float atoms unsigned char x_msginbuf[MAX_UDP_RECEIVE]; // received data as bytes + char *x_sendbuf; // pointer to data to send + int x_sendbuf_len; // number of bytes in sendbuf + int x_sendresult; + int x_blocked; /* multithread stuff */ pthread_t x_threadid; /* id of child thread */ pthread_attr_t x_threadattr; /* attributes of child thread */ + pthread_t x_sendthreadid; /* id of child thread for sending */ + pthread_attr_t x_sendthreadattr; /* attributes of child thread for sending */ } t_tcpclient; -static void tcpclient_timeout(t_tcpclient *x, t_float timeout); static void tcpclient_verbosity(t_tcpclient *x, t_float verbosity); static void tcpclient_dump(t_tcpclient *x, t_float dump); static void tcp_client_hexdump(t_tcpclient *x, long len); @@ -92,30 +97,19 @@ static void *tcpclient_child_connect(void *w); static void tcpclient_connect(t_tcpclient *x, t_symbol *hostname, t_floatarg fportno); static void tcpclient_disconnect(t_tcpclient *x); static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv); -int tcpclient_send_byte(t_tcpclient *x, char byte); +int tcpclient_send_buf(t_tcpclient *x, char *buf, int buf_len); +static void *tcpclient_child_send(void *w); +static void tcpclient_sent(t_tcpclient *x); static int tcpclient_get_socket_send_buf_size(t_tcpclient *x); static int tcpclient_set_socket_send_buf_size(t_tcpclient *x, int size); static void tcpclient_buf_size(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv); static void tcpclient_rcv(t_tcpclient *x); static void tcpclient_poll(t_tcpclient *x); +static void tcpclient_unblock(t_tcpclient *x); static void *tcpclient_new(t_floatarg udpflag); static void tcpclient_free(t_tcpclient *x); void tcpclient_setup(void); -static void tcpclient_timeout(t_tcpclient *x, t_float timeout) -{ - /* set the timeout on the select call in tcpclient_send_byte */ - /* this is the maximum time in microseconds to wait */ - /* before abandoning attempt to send */ - - t_int timeout_us = 0; - if ((timeout >= 0)&&(timeout < 1000000)) - { - timeout_us = (t_int)timeout; - x->x_timeout_us = timeout_us; - } -} - static void tcpclient_dump(t_tcpclient *x, t_float dump) { x->x_dump = (dump == 0)?0:1; @@ -208,6 +202,7 @@ static void *tcpclient_child_connect(void *w) /* outlet_float is not threadsafe ! */ // outlet_float(x->x_obj.ob_outlet, 1); x->x_connectstate = 1; + x->x_blocked = 0; /* use callback instead to set outlet */ clock_delay(x->x_clock, 0); return (x); @@ -240,29 +235,23 @@ static void tcpclient_disconnect(t_tcpclient *x) static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv) { -//#define BYTE_BUF_LEN 65536 // arbitrary maximum similar to max IP packet size -// static char byte_buf[BYTE_BUF_LEN]; +#define BYTE_BUF_LEN 65536 // arbitrary maximum similar to max IP packet size + static char byte_buf[BYTE_BUF_LEN]; int i, j, d; unsigned char c; float f, e; char *bp; int length; - size_t sent; + size_t sent = 0; int result; char fpath[FILENAME_MAX]; FILE *fptr; - t_atom output_atom; #ifdef DEBUG post("s: %s", s->s_name); post("argc: %d", argc); #endif - if (x->x_fd < 0) - { - error("%s: not connected", objName); - return; - } for (i = j = 0; i < argc; ++i) { if (argv[i].a_type == A_FLOAT) @@ -281,8 +270,13 @@ static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv) return; } c = (unsigned char)d; - if (0 == tcpclient_send_byte(x, c)) break; - ++j; +// if (0 == tcpclient_send_byte(x, c)) break; + byte_buf[j++] = c; + if (j >= BYTE_BUF_LEN) + { + sent += tcpclient_send_buf(x, byte_buf, j); + j = 0; + } } else if (argv[i].a_type == A_SYMBOL) { @@ -298,8 +292,14 @@ static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv) while ((d = fgetc(fptr)) != EOF) { c = (char)(d & 0x0FF); - if (0 == tcpclient_send_byte(x, c)) break; - ++j; +// if (0 == tcpclient_send_byte(x, c)) break; + byte_buf[j++] = c; + if (j >= BYTE_BUF_LEN) + { + sent += tcpclient_send_buf(x, byte_buf, j); + j = 0; + } +// ++j; } fclose(fptr); fptr = NULL; @@ -311,38 +311,62 @@ static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv) return; } } - sent = j; - SETFLOAT(&output_atom, sent); - outlet_anything( x->x_statusout, gensym("sent"), 1, &output_atom); + if (j > 0) + sent += tcpclient_send_buf(x, byte_buf, j); + } -int tcpclient_send_byte(t_tcpclient *x, char byte) + +int tcpclient_send_buf(t_tcpclient *x, char *buf, int buf_len) { - int result = 0; fd_set wfds; struct timeval timeout; - FD_ZERO(&wfds); - FD_SET(x->x_fd, &wfds); - timeout.tv_sec = 0; - timeout.tv_usec = x->x_timeout_us; /* give it about a millisecond to clear buffer */ - result = select(x->x_fd+1, NULL, &wfds, NULL, &timeout); - if (result == -1) + if (x->x_blocked) return 0; + if (x->x_fd < 0) { - post("%s_send_byte: select returned error %d", objName, errno); + error("%s: not connected", objName); + x->x_blocked++; return 0; } - if (FD_ISSET(x->x_fd, &wfds)) + x->x_sendbuf = buf; + x->x_sendbuf_len = buf_len; + if((x->x_sendresult = pthread_create(&x->x_sendthreadid, &x->x_sendthreadattr, tcpclient_child_send, x)) < 0) { - result = send(x->x_fd, &byte, 1, 0); - if (result <= 0) - { - sys_sockerror("tcpclient: send"); - post("%s_send_byte: could not send data ", objName); - return 0; - } + post("%s_send_buf: could not create new thread (%d)", objName); + clock_delay(x->x_sendclock, 0); // calls tcpclient_sent + return 0; + } + return buf_len; +} + +/* tcpclient_child_send runs in sendthread */ +static void *tcpclient_child_send(void *w) +{ + t_tcpclient *x = (t_tcpclient*) w; + + x->x_sendresult = send(x->x_fd, x->x_sendbuf, x->x_sendbuf_len, 0); + clock_delay(x->x_sendclock, 0); // calls tcpclient_sent when it's safe to do so + return(x); +} + +static void tcpclient_sent(t_tcpclient *x) +{ + t_atom output_atom; + + if (x->x_sendresult <= 0) + { + sys_sockerror("tcpclient: send"); + post("%s_send_byte: could not send data ", objName); + x->x_blocked++; + SETFLOAT(&output_atom, x->x_sendresult); + outlet_anything( x->x_statusout, gensym("blocked"), 1, &output_atom); + } + else + { + SETFLOAT(&output_atom, x->x_sendresult); + outlet_anything( x->x_statusout, gensym("sent"), 1, &output_atom); } - return result; } /* Return the send buffer size of socket, also output it on status outlet */ @@ -489,6 +513,11 @@ static void tcpclient_poll(t_tcpclient *x) clock_delay(x->x_poll, DEFPOLLTIME); /* see you later */ } +static void tcpclient_unblock(t_tcpclient *x) +{ + x->x_blocked = 0; +} + static void *tcpclient_new(t_floatarg udpflag) { int i; @@ -498,6 +527,7 @@ static void *tcpclient_new(t_floatarg udpflag) x->x_addrout = outlet_new(&x->x_obj, &s_list); x->x_connectout = outlet_new(&x->x_obj, &s_float); /* connection state */ x->x_statusout = outlet_new(&x->x_obj, &s_anything);/* last outlet for everything else */ + x->x_sendclock = clock_new(x, (t_method)tcpclient_sent); x->x_clock = clock_new(x, (t_method)tcpclient_tick); x->x_poll = clock_new(x, (t_method)tcpclient_poll); x->x_verbosity = 1; /* default post status changes to main window */ @@ -514,13 +544,18 @@ static void *tcpclient_new(t_floatarg udpflag) x->x_addrbytes[i].a_w.w_float = 0; } x->x_addr = 0L; - x->x_timeout_us = 1000; /* set a default 1ms send timeout */ - /* prepare child thread */ + x->x_blocked = 1; + /* prepare child threads */ if(pthread_attr_init(&x->x_threadattr) < 0) post("%s: warning: could not prepare child thread", objName); if(pthread_attr_setdetachstate(&x->x_threadattr, PTHREAD_CREATE_DETACHED) < 0) post("%s: warning: could not prepare child thread", objName); + if(pthread_attr_init(&x->x_sendthreadattr) < 0) + post("%s: warning: could not prepare child thread", objName); + if(pthread_attr_setdetachstate(&x->x_sendthreadattr, PTHREAD_CREATE_DETACHED) < 0) + post("%s: warning: could not prepare child thread", objName); clock_delay(x->x_poll, 0); /* start polling the input */ + post("tcpclient 2010 Martin Peach-style"); return (x); } @@ -543,11 +578,9 @@ void tcpclient_setup(void) class_addmethod(tcpclient_class, (t_method)tcpclient_disconnect, gensym("disconnect"), 0); class_addmethod(tcpclient_class, (t_method)tcpclient_send, gensym("send"), A_GIMME, 0); class_addmethod(tcpclient_class, (t_method)tcpclient_buf_size, gensym("buf"), A_GIMME, 0); - class_addmethod(tcpclient_class, (t_method)tcpclient_rcv, gensym("receive"), 0); - class_addmethod(tcpclient_class, (t_method)tcpclient_rcv, gensym("rcv"), 0); + class_addmethod(tcpclient_class, (t_method)tcpclient_unblock, gensym("unblock"), 0); class_addmethod(tcpclient_class, (t_method)tcpclient_verbosity, gensym("verbosity"), A_FLOAT, 0); class_addmethod(tcpclient_class, (t_method)tcpclient_dump, gensym("dump"), A_FLOAT, 0); - class_addmethod(tcpclient_class, (t_method)tcpclient_timeout, gensym("timeout"), A_FLOAT, 0); class_addlist(tcpclient_class, (t_method)tcpclient_send); } -- cgit v1.2.1