aboutsummaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
Diffstat (limited to 'net')
-rw-r--r--net/tcpclient.c139
1 files changed, 86 insertions, 53 deletions
diff --git a/net/tcpclient.c b/net/tcpclient.c
index dc48fc5..a1656ef 100644
--- a/net/tcpclient.c
+++ b/net/tcpclient.c
@@ -25,7 +25,7 @@
/* Based on PureData by Miller Puckette and others. */
/* */
/* ---------------------------------------------------------------------------- */
-//define DEBUG
+//#define DEBUG
#include "m_pd.h"
#include "s_stuff.h"
@@ -55,38 +55,54 @@
static t_class *tcpclient_class;
static char objName[] = "tcpclient";
-#define MAX_UDP_RECEIVE 65536L // longer than data in maximum UDP packet
+#define MAX_TCPCLIENT_SEND_BUF 65536L // longer than data in maximum UDP packet
+
+/* each send is handled by a new thread with a new parameter struct: */
+/* these are stored under x->x_tsp[0..MAX_TCPCLIENT_THREADS-1] */
+/* The buffer is preallocated for speed. */
+typedef struct _tcpclient_sender_params
+{
+ char x_sendbuf[MAX_TCPCLIENT_SEND_BUF]; /* possibly allocate this dynamically for space over speed */
+ int x_buf_len;
+ int x_sendresult;
+ pthread_t sendthreadid;
+ struct _tcpclient *x_x;
+} t_tcpclient_sender_params;
+#define MAX_TCPCLIENT_THREADS 32
+/* MAX_TCPCLIENT_THREADS is small to avoid wasting space. This is the maximum number of concurrent threads. */
typedef struct _tcpclient
{
- t_object x_obj;
- t_clock *x_clock;
- t_clock *x_poll;
- t_clock *x_sendclock;
- t_outlet *x_msgout;
- t_outlet *x_addrout;
- t_outlet *x_connectout;
- t_outlet *x_statusout;
- int x_dump; // 1 = hexdump received bytes
- int x_verbosity; // 1 = post connection state changes to main window
- int x_fd; // the socket
- int x_fdbuf; // the socket's buffer size
- char *x_hostname; // address we want to connect to as text
- int x_connectstate; // 0 = not connected, 1 = connected
- int x_port; // port we're connected to
- long x_addr; // address we're connected to as 32bit int
- t_atom x_addrbytes[4]; // address we're connected to as 4 bytes
- t_atom x_msgoutbuf[MAX_UDP_RECEIVE]; // received data as float atoms
- unsigned char x_msginbuf[MAX_UDP_RECEIVE]; // received data as bytes
- char *x_sendbuf; // pointer to data to send
- int x_sendbuf_len; // number of bytes in sendbuf
- int x_sendresult;
- int x_blocked;
+ t_object x_obj;
+ t_clock *x_clock;
+ t_clock *x_poll;
+ t_clock *x_sendclock;
+ t_outlet *x_msgout;
+ t_outlet *x_addrout;
+ t_outlet *x_connectout;
+ t_outlet *x_statusout;
+ int x_dump; // 1 = hexdump received bytes
+ int x_verbosity; // 1 = post connection state changes to main window
+ int x_fd; // the socket
+ int x_fdbuf; // the socket's buffer size
+ char *x_hostname; // address we want to connect to as text
+ int x_connectstate; // 0 = not connected, 1 = connected
+ int x_port; // port we're connected to
+ long x_addr; // address we're connected to as 32bit int
+ t_atom x_addrbytes[4]; // address we're connected to as 4 bytes
+ t_atom x_msgoutbuf[MAX_TCPCLIENT_SEND_BUF]; // received data as float atoms
+ unsigned char x_msginbuf[MAX_TCPCLIENT_SEND_BUF]; // received data as bytes
+ char *x_sendbuf; // pointer to data to send
+ int x_sendbuf_len; // number of bytes in sendbuf
+ int x_sendresult;
+ int x_blocked;
/* multithread stuff */
- pthread_t x_threadid; /* id of child thread */
- pthread_attr_t x_threadattr; /* attributes of child thread */
- pthread_t x_sendthreadid; /* id of child thread for sending */
- pthread_attr_t x_sendthreadattr; /* attributes of child thread for sending */
+ pthread_t x_threadid; /* id of connector child thread */
+ pthread_attr_t x_threadattr; /* attributes of connector child thread */
+ pthread_attr_t x_sendthreadattr; /* attributes of all sender child thread for sending */
+ int x_nextthread; /* next unused x_tsp */
+ t_tcpclient_sender_params x_tsp[MAX_TCPCLIENT_THREADS];
+/* Thread params are used round-robin to avoid overwriting buffers when doing multiple sends */
} t_tcpclient;
static void tcpclient_verbosity(t_tcpclient *x, t_float verbosity);
@@ -97,7 +113,7 @@ static void *tcpclient_child_connect(void *w);
static void tcpclient_connect(t_tcpclient *x, t_symbol *hostname, t_floatarg fportno);
static void tcpclient_disconnect(t_tcpclient *x);
static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv);
-int tcpclient_send_buf(t_tcpclient *x, char *buf, int buf_len);
+static int tcpclient_send_buf(t_tcpclient *x, char *buf, int buf_len);
static void *tcpclient_child_send(void *w);
static void tcpclient_sent(t_tcpclient *x);
static int tcpclient_get_socket_send_buf_size(t_tcpclient *x);
@@ -227,8 +243,14 @@ static void tcpclient_connect(t_tcpclient *x, t_symbol *hostname, t_floatarg fpo
static void tcpclient_disconnect(t_tcpclient *x)
{
+ int i;
+
if (x->x_fd >= 0)
{
+ for (i = 0; i < MAX_TCPCLIENT_THREADS;++i)
+ { /* wait for any sender threads to finish */
+ while (x->x_tsp[i].sendthreadid != 0);
+ }
sys_closesocket(x->x_fd);
x->x_fd = -1;
x->x_connectstate = 0;
@@ -245,10 +267,7 @@ static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv)
int i, j, d;
unsigned char c;
float f, e;
-// char *bp;
-// int length;
size_t sent = 0;
-// int result;
char fpath[FILENAME_MAX];
FILE *fptr;
@@ -275,7 +294,6 @@ static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv)
return;
}
c = (unsigned char)d;
-// if (0 == tcpclient_send_byte(x, c)) break;
byte_buf[j++] = c;
if (j >= BYTE_BUF_LEN)
{
@@ -285,7 +303,6 @@ static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv)
}
else if (argv[i].a_type == A_SYMBOL)
{
-
atom_string(&argv[i], fpath, FILENAME_MAX);
fptr = fopen(fpath, "rb");
if (fptr == NULL)
@@ -297,14 +314,12 @@ static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv)
while ((d = fgetc(fptr)) != EOF)
{
c = (char)(d & 0x0FF);
-// if (0 == tcpclient_send_byte(x, c)) break;
byte_buf[j++] = c;
if (j >= BYTE_BUF_LEN)
{
sent += tcpclient_send_buf(x, byte_buf, j);
j = 0;
}
-// ++j;
}
fclose(fptr);
fptr = NULL;
@@ -318,14 +333,13 @@ static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv)
}
if (j > 0)
sent += tcpclient_send_buf(x, byte_buf, j);
-
}
-int tcpclient_send_buf(t_tcpclient *x, char *buf, int buf_len)
+static int tcpclient_send_buf(t_tcpclient *x, char *buf, int buf_len)
{
-// fd_set wfds;
-// struct timeval timeout;
+ t_tcpclient_sender_params *tsp = &x->x_tsp[x->x_nextthread];
+ int i, max;
if (x->x_blocked) return 0;
if (x->x_fd < 0)
@@ -334,25 +348,35 @@ int tcpclient_send_buf(t_tcpclient *x, char *buf, int buf_len)
x->x_blocked++;
return 0;
}
- x->x_sendbuf = buf;
- x->x_sendbuf_len = buf_len;
- if((x->x_sendresult = pthread_create(&x->x_sendthreadid, &x->x_sendthreadattr, tcpclient_child_send, x)) < 0)
+ max = (buf_len > MAX_TCPCLIENT_SEND_BUF)? MAX_TCPCLIENT_SEND_BUF: buf_len;
+ for (i = 0; i < max; ++i)
+ {
+ tsp->x_sendbuf[i] = buf[i];
+ }
+ tsp->x_buf_len = i;
+ x->x_sendbuf_len += i;
+ tsp->x_x = x;
+
+ if((tsp->x_sendresult = pthread_create(&tsp->sendthreadid, &x->x_sendthreadattr, tcpclient_child_send, tsp)) < 0)
{
post("%s_send_buf: could not create new thread (%d)", objName);
clock_delay(x->x_sendclock, 0); // calls tcpclient_sent
return 0;
}
+ x->x_nextthread++;
+ if (x->x_nextthread > MAX_TCPCLIENT_THREADS) x->x_nextthread = 0;
return buf_len;
}
/* tcpclient_child_send runs in sendthread */
static void *tcpclient_child_send(void *w)
{
- t_tcpclient *x = (t_tcpclient*) w;
+ t_tcpclient_sender_params *tsp = (t_tcpclient_sender_params*) w;
- x->x_sendresult = send(x->x_fd, x->x_sendbuf, x->x_sendbuf_len, 0);
- clock_delay(x->x_sendclock, 0); // calls tcpclient_sent when it's safe to do so
- return(x);
+ tsp->x_sendresult = send(tsp->x_x->x_fd, tsp->x_sendbuf, tsp->x_buf_len, 0);
+ clock_delay(tsp->x_x->x_sendclock, 0); // calls tcpclient_sent when it's safe to do so
+ tsp->sendthreadid = 0;
+ return(tsp);
}
static void tcpclient_sent(t_tcpclient *x)
@@ -371,6 +395,7 @@ static void tcpclient_sent(t_tcpclient *x)
{ /* assume the message is queued and will be sent real soon now */
SETFLOAT(&output_atom, x->x_sendbuf_len);
outlet_anything( x->x_statusout, gensym("sent"), 1, &output_atom);
+ x->x_sendbuf_len = 0; /* we might be called only once for multiple calls to tcpclient_send_buf */
}
else
{
@@ -419,9 +444,7 @@ static int tcpclient_set_socket_send_buf_size(t_tcpclient *x, int size)
/* Get/set the send buffer size of client socket */
static void tcpclient_buf_size(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv)
{
-// int client = -1;
float buf_size = 0;
-// t_atom output_atom[3];
if(x->x_connectstate == 0)
{
@@ -475,7 +498,7 @@ static void tcpclient_rcv(t_tcpclient *x)
if(FD_ISSET(sockfd, &readset) || FD_ISSET(sockfd, &exceptset))
{
/* read from server */
- ret = recv(sockfd, x->x_msginbuf, MAX_UDP_RECEIVE, 0);
+ ret = recv(sockfd, x->x_msginbuf, MAX_TCPCLIENT_SEND_BUF, 0);
if(ret > 0)
{
#ifdef DEBUG
@@ -543,7 +566,7 @@ static void *tcpclient_new(void)
x->x_verbosity = 1; /* default post status changes to main window */
x->x_fd = -1;
/* convert the bytes in the buffer to floats in a list */
- for (i = 0; i < MAX_UDP_RECEIVE; ++i)
+ for (i = 0; i < MAX_TCPCLIENT_SEND_BUF; ++i)
{
x->x_msgoutbuf[i].a_type = A_FLOAT;
x->x_msgoutbuf[i].a_w.w_float = 0;
@@ -556,6 +579,7 @@ static void *tcpclient_new(void)
x->x_addr = 0L;
x->x_blocked = 1;
x->x_connectstate = 0;
+ x->x_nextthread = 0;
/* prepare child threads */
if(pthread_attr_init(&x->x_threadattr) < 0)
post("%s: warning: could not prepare child thread", objName);
@@ -566,7 +590,6 @@ static void *tcpclient_new(void)
if(pthread_attr_setdetachstate(&x->x_sendthreadattr, PTHREAD_CREATE_DETACHED) < 0)
post("%s: warning: could not prepare child thread", objName);
clock_delay(x->x_poll, 0); /* start polling the input */
- post("tcpclient 20110113 Martin Peach-style");
return (x);
}
@@ -581,6 +604,16 @@ static void tcpclient_free(t_tcpclient *x)
void tcpclient_setup(void)
{
+ char aboutStr[MAXPDSTRING];
+
+ snprintf(aboutStr, MAXPDSTRING, "%s: (GPL) 20111103 Martin Peach, compiled for pd-%d.%d on %s %s",
+ objName, PD_MAJOR_VERSION, PD_MINOR_VERSION, __DATE__, __TIME__);
+
+#if PD_MAJOR_VERSION==0 && PD_MINOR_VERSION<43
+ post(aboutStr);
+#else
+ logpost(NULL, 3, aboutStr);
+#endif
tcpclient_class = class_new(gensym(objName), (t_newmethod)tcpclient_new,
(t_method)tcpclient_free, sizeof(t_tcpclient), 0, 0);
class_addmethod(tcpclient_class, (t_method)tcpclient_connect, gensym("connect")