aboutsummaryrefslogtreecommitdiff
path: root/net/tcpclient.c
diff options
context:
space:
mode:
authorMartin Peach <mrpeach@users.sourceforge.net>2010-03-26 18:41:09 +0000
committerMartin Peach <mrpeach@users.sourceforge.net>2010-03-26 18:41:09 +0000
commita84d0bd1965c0b1ce7d9b9d9803cc6b734d33bcb (patch)
treebb23cf5399748cdb5329aabde2ec56ea5548e8c6 /net/tcpclient.c
parent993426ec04f943ce9e1c0d611d41271a1da8b64e (diff)
Sends all bytes in one block in a single thread, blocks further sends if one doesn't work until [unblock( message received.
Removed [receive(, [recv(, [timeout( message methods as they are not relevant. svn path=/trunk/externals/mrpeach/; revision=13290
Diffstat (limited to 'net/tcpclient.c')
-rw-r--r--net/tcpclient.c145
1 files changed, 89 insertions, 56 deletions
diff --git a/net/tcpclient.c b/net/tcpclient.c
index 9804db8..d977c86 100644
--- a/net/tcpclient.c
+++ b/net/tcpclient.c
@@ -62,6 +62,7 @@ typedef struct _tcpclient
t_object x_obj;
t_clock *x_clock;
t_clock *x_poll;
+ t_clock *x_sendclock;
t_outlet *x_msgout;
t_outlet *x_addrout;
t_outlet *x_connectout;
@@ -70,7 +71,6 @@ typedef struct _tcpclient
int x_verbosity; // 1 = post connection state changes to main window
int x_fd; // the socket
int x_fdbuf; // the socket's buffer size
- t_int x_timeout_us; /* send timeout in microseconds */
char *x_hostname; // address we want to connect to as text
int x_connectstate; // 0 = not connected, 1 = connected
int x_port; // port we're connected to
@@ -78,12 +78,17 @@ typedef struct _tcpclient
t_atom x_addrbytes[4]; // address we're connected to as 4 bytes
t_atom x_msgoutbuf[MAX_UDP_RECEIVE]; // received data as float atoms
unsigned char x_msginbuf[MAX_UDP_RECEIVE]; // received data as bytes
+ char *x_sendbuf; // pointer to data to send
+ int x_sendbuf_len; // number of bytes in sendbuf
+ int x_sendresult;
+ int x_blocked;
/* multithread stuff */
pthread_t x_threadid; /* id of child thread */
pthread_attr_t x_threadattr; /* attributes of child thread */
+ pthread_t x_sendthreadid; /* id of child thread for sending */
+ pthread_attr_t x_sendthreadattr; /* attributes of child thread for sending */
} t_tcpclient;
-static void tcpclient_timeout(t_tcpclient *x, t_float timeout);
static void tcpclient_verbosity(t_tcpclient *x, t_float verbosity);
static void tcpclient_dump(t_tcpclient *x, t_float dump);
static void tcp_client_hexdump(t_tcpclient *x, long len);
@@ -92,30 +97,19 @@ static void *tcpclient_child_connect(void *w);
static void tcpclient_connect(t_tcpclient *x, t_symbol *hostname, t_floatarg fportno);
static void tcpclient_disconnect(t_tcpclient *x);
static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv);
-int tcpclient_send_byte(t_tcpclient *x, char byte);
+int tcpclient_send_buf(t_tcpclient *x, char *buf, int buf_len);
+static void *tcpclient_child_send(void *w);
+static void tcpclient_sent(t_tcpclient *x);
static int tcpclient_get_socket_send_buf_size(t_tcpclient *x);
static int tcpclient_set_socket_send_buf_size(t_tcpclient *x, int size);
static void tcpclient_buf_size(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv);
static void tcpclient_rcv(t_tcpclient *x);
static void tcpclient_poll(t_tcpclient *x);
+static void tcpclient_unblock(t_tcpclient *x);
static void *tcpclient_new(t_floatarg udpflag);
static void tcpclient_free(t_tcpclient *x);
void tcpclient_setup(void);
-static void tcpclient_timeout(t_tcpclient *x, t_float timeout)
-{
- /* set the timeout on the select call in tcpclient_send_byte */
- /* this is the maximum time in microseconds to wait */
- /* before abandoning attempt to send */
-
- t_int timeout_us = 0;
- if ((timeout >= 0)&&(timeout < 1000000))
- {
- timeout_us = (t_int)timeout;
- x->x_timeout_us = timeout_us;
- }
-}
-
static void tcpclient_dump(t_tcpclient *x, t_float dump)
{
x->x_dump = (dump == 0)?0:1;
@@ -208,6 +202,7 @@ static void *tcpclient_child_connect(void *w)
/* outlet_float is not threadsafe ! */
// outlet_float(x->x_obj.ob_outlet, 1);
x->x_connectstate = 1;
+ x->x_blocked = 0;
/* use callback instead to set outlet */
clock_delay(x->x_clock, 0);
return (x);
@@ -240,29 +235,23 @@ static void tcpclient_disconnect(t_tcpclient *x)
static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv)
{
-//#define BYTE_BUF_LEN 65536 // arbitrary maximum similar to max IP packet size
-// static char byte_buf[BYTE_BUF_LEN];
+#define BYTE_BUF_LEN 65536 // arbitrary maximum similar to max IP packet size
+ static char byte_buf[BYTE_BUF_LEN];
int i, j, d;
unsigned char c;
float f, e;
char *bp;
int length;
- size_t sent;
+ size_t sent = 0;
int result;
char fpath[FILENAME_MAX];
FILE *fptr;
- t_atom output_atom;
#ifdef DEBUG
post("s: %s", s->s_name);
post("argc: %d", argc);
#endif
- if (x->x_fd < 0)
- {
- error("%s: not connected", objName);
- return;
- }
for (i = j = 0; i < argc; ++i)
{
if (argv[i].a_type == A_FLOAT)
@@ -281,8 +270,13 @@ static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv)
return;
}
c = (unsigned char)d;
- if (0 == tcpclient_send_byte(x, c)) break;
- ++j;
+// if (0 == tcpclient_send_byte(x, c)) break;
+ byte_buf[j++] = c;
+ if (j >= BYTE_BUF_LEN)
+ {
+ sent += tcpclient_send_buf(x, byte_buf, j);
+ j = 0;
+ }
}
else if (argv[i].a_type == A_SYMBOL)
{
@@ -298,8 +292,14 @@ static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv)
while ((d = fgetc(fptr)) != EOF)
{
c = (char)(d & 0x0FF);
- if (0 == tcpclient_send_byte(x, c)) break;
- ++j;
+// if (0 == tcpclient_send_byte(x, c)) break;
+ byte_buf[j++] = c;
+ if (j >= BYTE_BUF_LEN)
+ {
+ sent += tcpclient_send_buf(x, byte_buf, j);
+ j = 0;
+ }
+// ++j;
}
fclose(fptr);
fptr = NULL;
@@ -311,38 +311,62 @@ static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv)
return;
}
}
- sent = j;
- SETFLOAT(&output_atom, sent);
- outlet_anything( x->x_statusout, gensym("sent"), 1, &output_atom);
+ if (j > 0)
+ sent += tcpclient_send_buf(x, byte_buf, j);
+
}
-int tcpclient_send_byte(t_tcpclient *x, char byte)
+
+int tcpclient_send_buf(t_tcpclient *x, char *buf, int buf_len)
{
- int result = 0;
fd_set wfds;
struct timeval timeout;
- FD_ZERO(&wfds);
- FD_SET(x->x_fd, &wfds);
- timeout.tv_sec = 0;
- timeout.tv_usec = x->x_timeout_us; /* give it about a millisecond to clear buffer */
- result = select(x->x_fd+1, NULL, &wfds, NULL, &timeout);
- if (result == -1)
+ if (x->x_blocked) return 0;
+ if (x->x_fd < 0)
{
- post("%s_send_byte: select returned error %d", objName, errno);
+ error("%s: not connected", objName);
+ x->x_blocked++;
return 0;
}
- if (FD_ISSET(x->x_fd, &wfds))
+ x->x_sendbuf = buf;
+ x->x_sendbuf_len = buf_len;
+ if((x->x_sendresult = pthread_create(&x->x_sendthreadid, &x->x_sendthreadattr, tcpclient_child_send, x)) < 0)
{
- result = send(x->x_fd, &byte, 1, 0);
- if (result <= 0)
- {
- sys_sockerror("tcpclient: send");
- post("%s_send_byte: could not send data ", objName);
- return 0;
- }
+ post("%s_send_buf: could not create new thread (%d)", objName);
+ clock_delay(x->x_sendclock, 0); // calls tcpclient_sent
+ return 0;
+ }
+ return buf_len;
+}
+
+/* tcpclient_child_send runs in sendthread */
+static void *tcpclient_child_send(void *w)
+{
+ t_tcpclient *x = (t_tcpclient*) w;
+
+ x->x_sendresult = send(x->x_fd, x->x_sendbuf, x->x_sendbuf_len, 0);
+ clock_delay(x->x_sendclock, 0); // calls tcpclient_sent when it's safe to do so
+ return(x);
+}
+
+static void tcpclient_sent(t_tcpclient *x)
+{
+ t_atom output_atom;
+
+ if (x->x_sendresult <= 0)
+ {
+ sys_sockerror("tcpclient: send");
+ post("%s_send_byte: could not send data ", objName);
+ x->x_blocked++;
+ SETFLOAT(&output_atom, x->x_sendresult);
+ outlet_anything( x->x_statusout, gensym("blocked"), 1, &output_atom);
+ }
+ else
+ {
+ SETFLOAT(&output_atom, x->x_sendresult);
+ outlet_anything( x->x_statusout, gensym("sent"), 1, &output_atom);
}
- return result;
}
/* Return the send buffer size of socket, also output it on status outlet */
@@ -489,6 +513,11 @@ static void tcpclient_poll(t_tcpclient *x)
clock_delay(x->x_poll, DEFPOLLTIME); /* see you later */
}
+static void tcpclient_unblock(t_tcpclient *x)
+{
+ x->x_blocked = 0;
+}
+
static void *tcpclient_new(t_floatarg udpflag)
{
int i;
@@ -498,6 +527,7 @@ static void *tcpclient_new(t_floatarg udpflag)
x->x_addrout = outlet_new(&x->x_obj, &s_list);
x->x_connectout = outlet_new(&x->x_obj, &s_float); /* connection state */
x->x_statusout = outlet_new(&x->x_obj, &s_anything);/* last outlet for everything else */
+ x->x_sendclock = clock_new(x, (t_method)tcpclient_sent);
x->x_clock = clock_new(x, (t_method)tcpclient_tick);
x->x_poll = clock_new(x, (t_method)tcpclient_poll);
x->x_verbosity = 1; /* default post status changes to main window */
@@ -514,13 +544,18 @@ static void *tcpclient_new(t_floatarg udpflag)
x->x_addrbytes[i].a_w.w_float = 0;
}
x->x_addr = 0L;
- x->x_timeout_us = 1000; /* set a default 1ms send timeout */
- /* prepare child thread */
+ x->x_blocked = 1;
+ /* prepare child threads */
if(pthread_attr_init(&x->x_threadattr) < 0)
post("%s: warning: could not prepare child thread", objName);
if(pthread_attr_setdetachstate(&x->x_threadattr, PTHREAD_CREATE_DETACHED) < 0)
post("%s: warning: could not prepare child thread", objName);
+ if(pthread_attr_init(&x->x_sendthreadattr) < 0)
+ post("%s: warning: could not prepare child thread", objName);
+ if(pthread_attr_setdetachstate(&x->x_sendthreadattr, PTHREAD_CREATE_DETACHED) < 0)
+ post("%s: warning: could not prepare child thread", objName);
clock_delay(x->x_poll, 0); /* start polling the input */
+ post("tcpclient 2010 Martin Peach-style");
return (x);
}
@@ -543,11 +578,9 @@ void tcpclient_setup(void)
class_addmethod(tcpclient_class, (t_method)tcpclient_disconnect, gensym("disconnect"), 0);
class_addmethod(tcpclient_class, (t_method)tcpclient_send, gensym("send"), A_GIMME, 0);
class_addmethod(tcpclient_class, (t_method)tcpclient_buf_size, gensym("buf"), A_GIMME, 0);
- class_addmethod(tcpclient_class, (t_method)tcpclient_rcv, gensym("receive"), 0);
- class_addmethod(tcpclient_class, (t_method)tcpclient_rcv, gensym("rcv"), 0);
+ class_addmethod(tcpclient_class, (t_method)tcpclient_unblock, gensym("unblock"), 0);
class_addmethod(tcpclient_class, (t_method)tcpclient_verbosity, gensym("verbosity"), A_FLOAT, 0);
class_addmethod(tcpclient_class, (t_method)tcpclient_dump, gensym("dump"), A_FLOAT, 0);
- class_addmethod(tcpclient_class, (t_method)tcpclient_timeout, gensym("timeout"), A_FLOAT, 0);
class_addlist(tcpclient_class, (t_method)tcpclient_send);
}