From 55f063b243f282a9c7f8dceffd898d14fdb7a39f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?IOhannes=20m=20zm=C3=B6lnig?= Date: Wed, 24 Mar 2010 15:24:32 +0000 Subject: a first client svn path=/trunk/externals/iem/iemnet/; revision=13252 --- Makefile | 2 +- shared.c | 156 +++++++-------- tcpclient.c | 620 ++++++++++++++++-------------------------------------------- tcpserver.c | 249 ++++++++++-------------- 4 files changed, 345 insertions(+), 682 deletions(-) diff --git a/Makefile b/Makefile index 314e2da..ce9b4e5 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ LIBRARY_VERSION = 0.1 # Next, add your .c source files to the SOURCES variable. The help files will # be included automatically -SOURCES = tcpserver.c +SOURCES = tcpserver.c tcpclient.c #SOURCES = tcpclient.c tcpreceive.c tcpsend.c tcpserver.c udpreceive~.c udpreceive.c udpsend~.c udpsend.c # For objects that only build on certain platforms, add those to the SOURCES diff --git a/shared.c b/shared.c index c172ceb..09faaad 100644 --- a/shared.c +++ b/shared.c @@ -13,8 +13,13 @@ #include #include -#include +#ifdef _WIN32 +# include +# include /* for socklen_t */ +#else +# include +#endif #include @@ -275,7 +280,6 @@ t_iemnet_chunk* queue_pop_block( freebytes(head, sizeof(t_node)); head=NULL; } - //fprintf(stderr, "popped %d bytes\n", data->size); return data; } @@ -303,7 +307,6 @@ t_iemnet_chunk* queue_pop_noblock( freebytes(head, sizeof(t_node)); head=NULL; } - //fprintf(stderr, "popped %d bytes\n", data->size); return data; } @@ -321,8 +324,6 @@ void queue_finish(t_queue* q) { void queue_destroy(t_queue* q) { t_iemnet_chunk*c=NULL; - - post("queue_destroy %x", q); if(NULL==q) return; @@ -341,7 +342,6 @@ void queue_destroy(t_queue* q) { freebytes(q, sizeof(t_queue)); q=NULL; - post("queue_destroyed %x", q); } t_queue* queue_create(void) { @@ -381,6 +381,24 @@ struct _iemnet_sender { }; /* the workhorse of the family */ + +static int iemnet__sender_dosend(int sockfd, t_queue*q) { + t_iemnet_chunk*c=queue_pop(q); + if(c) { + unsigned char*data=c->data; + unsigned int size=c->size; + + fprintf(stderr, "sending %d bytes at %x to %d\n", size, data, sockfd); + + int result = send(sockfd, data, size, 0); + // shouldn't we do something with the result here? + iemnet__chunk_destroy(c); + } else { + return 0; + } + return 1; +} + static void*iemnet__sender_sendthread(void*arg) { t_iemnet_sender*sender=(t_iemnet_sender*)arg; @@ -388,47 +406,28 @@ static void*iemnet__sender_sendthread(void*arg) { t_queue*q=sender->queue; while(sender->cont) { - t_iemnet_chunk*c=NULL; - c=queue_pop(q); - if(c) { - unsigned char*data=c->data; - unsigned int size=c->size; - - int result = send(sockfd, data, size, 0); - - // shouldn't we do something with the result here? - - iemnet__chunk_destroy(c); - } else { - break; - } + if(!iemnet__sender_dosend(sockfd, q))break; } - sender->queue=NULL; - queue_destroy(q); fprintf(stderr, "write thread terminated\n"); return NULL; } int iemnet__sender_send(t_iemnet_sender*s, t_iemnet_chunk*c) { - //post("send %x %x", s, c); t_queue*q=s->queue; int size=0; - //post("queue=%x", q); if(q) { - // post("sending data with %d bytes using %x", c->size, s); - size = queue_push(q, c); + t_iemnet_chunk*chunk=iemnet__chunk_create_chunk(c); + size = queue_push(q, chunk); } return size; } void iemnet__sender_destroy(t_iemnet_sender*s) { s->cont=0; - if(s->queue) - queue_finish(s->queue); - + queue_finish(s->queue); s->sockfd = -1; - pthread_join(s->thread, NULL); + queue_destroy(s->queue); freebytes(s, sizeof(t_iemnet_sender)); s=NULL; @@ -496,6 +495,7 @@ struct _iemnet_receiver { /* the workhorse of the family */ static void*iemnet__receiver_readthread(void*arg) { + int result = 0; t_iemnet_receiver*receiver=(t_iemnet_receiver*)arg; int sockfd=receiver->sockfd; @@ -508,89 +508,95 @@ static void*iemnet__receiver_readthread(void*arg) { for(i=0; irunning=1; while(1) { - // fprintf(stderr, "reading %d bytes...\n", size); - int result = recv(sockfd, data, size, 0); - //fprintf(stderr, "read %d bytes...\n", result); + fprintf(stderr, "reading %d bytes...\n", size); + result = recv(sockfd, data, size, 0); + fprintf(stderr, "read %d bytes...\n", result); - if(0==result)break; + if(result<=0)break; t_iemnet_chunk*c = iemnet__chunk_create_data(result, data); queue_push(q, c); - clock_delay(receiver->clock, 0); + + if(receiver->clock)clock_delay(receiver->clock, 0); } - clock_delay(receiver->clock, 0); + + + if(result>=0) + if(receiver->clock)clock_delay(receiver->clock, 0); + receiver->running=0; + + fprintf(stderr, "read thread terminated\n"); return NULL; } - +#define WHERE fprintf(stderr, "%s:%d", __FUNCTION__, __LINE__) static void iemnet__receiver_tick(t_iemnet_receiver *x) { - static int ticks=0; - static int packets=0; - static double totaltime=0; - - double start=sys_getrealtime(); + WHERE; fprintf(stderr, "\treceiver=%x", x); // received data t_iemnet_chunk*c=queue_pop_noblock(x->queue); + WHERE; fprintf(stderr, "\tchunk=%x", c); while(NULL!=c) { x->flist = iemnet__chunk2list(c, x->flist); (x->callback)(x->owner, x->sockfd, x->flist->argc, x->flist->argv); iemnet__chunk_destroy(c); c=queue_pop_noblock(x->queue); - - packets++; } - ticks++; - totaltime+=(sys_getrealtime()-start); + WHERE; fprintf(stderr, "\trunning=%d", x->running); if(!x->running) { // read terminated x->callback(x->owner, x->sockfd, 0, NULL); } + WHERE; fprintf(stderr, "\ttick done\n"); } t_iemnet_receiver*iemnet__receiver_create(int sock, void*owner, t_iemnet_receivecallback callback) { - t_iemnet_receiver*result=(t_iemnet_receiver*)getbytes(sizeof(t_iemnet_receiver)); + t_iemnet_receiver*rec=(t_iemnet_receiver*)getbytes(sizeof(t_iemnet_receiver)); //fprintf(stderr, "new receiver for %d\t%x\t%x\n", sock, owner, callback); - if(result) { + if(rec) { t_iemnet_chunk*data=iemnet__chunk_create_empty(INBUFSIZE); int res=0; if(NULL==data) { - iemnet__receiver_destroy(result); + iemnet__receiver_destroy(rec); return NULL; } - result->sockfd=sock; - result->owner=owner; - result->data=data; - result->callback=callback; - result->flist=iemnet__floatlist_create(1024); - - result->queue = queue_create(); - result->clock = clock_new(result, (t_method)iemnet__receiver_tick); - result->running=1; - res=pthread_create(&result->thread, 0, iemnet__receiver_readthread, result); + rec->sockfd=sock; + rec->owner=owner; + rec->data=data; + rec->callback=callback; + rec->flist=iemnet__floatlist_create(1024); + + rec->queue = queue_create(); + rec->clock = clock_new(rec, (t_method)iemnet__receiver_tick); + rec->running=1; + res=pthread_create(&rec->thread, 0, iemnet__receiver_readthread, rec); } //fprintf(stderr, "new receiver created\n"); - return result; + return rec; } -void iemnet__receiver_destroy(t_iemnet_receiver*r) { - if(NULL==r)return; - if(r->data)iemnet__chunk_destroy(r->data); - if(r->flist)iemnet__floatlist_destroy(r->flist); - clock_free(r->clock); - - r->sockfd=0; - r->owner=NULL; - r->data=NULL; - r->callback=NULL; - r->clock=NULL; - r->flist=NULL; - - freebytes(r, sizeof(t_iemnet_receiver)); - r=NULL; +void iemnet__receiver_destroy(t_iemnet_receiver*rec) { + if(NULL==rec)return; + if(rec->data)iemnet__chunk_destroy(rec->data); + if(rec->flist)iemnet__floatlist_destroy(rec->flist); + clock_free(rec->clock); + sys_closesocket(rec->sockfd); + + rec->sockfd=0; + fprintf(stderr, "receiverdestroy join thread\n"); + pthread_join(rec->thread, NULL); + fprintf(stderr, "receiverdestroy joined thread\n"); + rec->owner=NULL; + rec->data=NULL; + rec->callback=NULL; + rec->clock=NULL; + rec->flist=NULL; + + freebytes(rec, sizeof(t_iemnet_receiver)); + rec=NULL; } diff --git a/tcpclient.c b/tcpclient.c index 93fbb91..3800b47 100644 --- a/tcpclient.c +++ b/tcpclient.c @@ -1,12 +1,11 @@ -/* tcpclient.c Martin Peach 20060508, working version 20060512 */ -/* linux version 20060515 */ -/* tcpclient.c is based on netclient: */ -/* -------------------------- netclient ------------------------------------- */ +/* tcpclient.c + * copyright (c) 2010 IOhannes m zmölnig, IEM + * copyright (c) 2006-2010 Martin Peach + * copyright (c) 2004 Olaf Matthes + */ + /* */ -/* Extended 'netsend', connects to 'netserver'. */ -/* Uses child thread to connect to server. Thus needs pd0.35-test17 or later. */ -/* Written by Olaf Matthes (olaf.matthes@gmx.de) */ -/* Get source at http://www.akustische-kunst.org/puredata/maxlib/ */ +/* A client for bidirectional communication from within Pd. */ /* */ /* This program is free software; you can redistribute it and/or */ /* modify it under the terms of the GNU General Public License */ @@ -22,531 +21,234 @@ /* along with this program; if not, write to the Free Software */ /* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */ /* */ -/* Based on PureData by Miller Puckette and others. */ -/* */ + /* ---------------------------------------------------------------------------- */ -//define DEBUG -#include "m_pd.h" +#include "iemnet.h" + #include "s_stuff.h" -#include -#include +#include /* gethostbyname, htons... */ #include + + + //#include + //#include + + #include -#if defined(UNIX) || defined(unix) -#include -#include -#include -#include -#include -#include -#define SOCKET_ERROR -1 -#else -#include -#include /* for socklen_t */ -#endif -#ifdef _MSC_VER -#define snprintf sprintf_s -#endif -#define DEFPOLLTIME 20 /* check for input every 20 ms */ static t_class *tcpclient_class; static char objName[] = "tcpclient"; -#define MAX_UDP_RECEIVE 65536L // longer than data in maximum UDP packet + typedef struct _tcpclient { - t_object x_obj; - t_clock *x_clock; - t_clock *x_poll; - t_outlet *x_msgout; - t_outlet *x_addrout; - t_outlet *x_connectout; - t_outlet *x_statusout; - int x_dump; // 1 = hexdump received bytes - int x_verbosity; // 1 = post connection state changes to main window - int x_fd; // the socket - int x_fdbuf; // the socket's buffer size - t_int x_timeout_us; /* send timeout in microseconds */ - char *x_hostname; // address we want to connect to as text - int x_connectstate; // 0 = not connected, 1 = connected - int x_port; // port we're connected to - long x_addr; // address we're connected to as 32bit int - t_atom x_addrbytes[4]; // address we're connected to as 4 bytes - t_atom x_msgoutbuf[MAX_UDP_RECEIVE]; // received data as float atoms - unsigned char x_msginbuf[MAX_UDP_RECEIVE]; // received data as bytes - /* multithread stuff */ - pthread_t x_threadid; /* id of child thread */ - pthread_attr_t x_threadattr; /* attributes of child thread */ -} t_tcpclient; + t_object x_obj; + t_clock *x_clock; + t_clock *x_poll; + t_outlet *x_msgout; + t_outlet *x_addrout; + t_outlet *x_connectout; + t_outlet *x_statusout; -static void tcpclient_timeout(t_tcpclient *x, t_float timeout); -static void tcpclient_verbosity(t_tcpclient *x, t_float verbosity); -static void tcpclient_dump(t_tcpclient *x, t_float dump); -static void tcp_client_hexdump(t_tcpclient *x, long len); -static void tcpclient_tick(t_tcpclient *x); -static void *tcpclient_child_connect(void *w); -static void tcpclient_connect(t_tcpclient *x, t_symbol *hostname, t_floatarg fportno); -static void tcpclient_disconnect(t_tcpclient *x); -static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv); -int tcpclient_send_byte(t_tcpclient *x, char byte); -static int tcpclient_get_socket_send_buf_size(t_tcpclient *x); -static int tcpclient_set_socket_send_buf_size(t_tcpclient *x, int size); -static void tcpclient_buf_size(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv); -static void tcpclient_rcv(t_tcpclient *x); -static void tcpclient_poll(t_tcpclient *x); -static void *tcpclient_new(t_floatarg udpflag); -static void tcpclient_free(t_tcpclient *x); -void tcpclient_setup(void); - -static void tcpclient_timeout(t_tcpclient *x, t_float timeout) -{ - /* set the timeout on the select call in tcpclient_send_byte */ - /* this is the maximum time in microseconds to wait */ - /* before abandoning attempt to send */ + t_iemnet_sender*x_sender; + t_iemnet_receiver*x_receiver; - t_int timeout_us = 0; - if ((timeout >= 0)&&(timeout < 1000000)) - { - timeout_us = (t_int)timeout; - x->x_timeout_us = timeout_us; - } -} -static void tcpclient_dump(t_tcpclient *x, t_float dump) -{ - x->x_dump = (dump == 0)?0:1; -} + int x_fd; // the socket + char *x_hostname; // address we want to connect to as text + int x_connectstate; // 0 = not connected, 1 = connected + int x_port; // port we're connected to + long x_addr; // address we're connected to as 32bit int + t_atom x_addrbytes[4]; // address we're connected to as 4 bytes -static void tcpclient_verbosity(t_tcpclient *x, t_float verbosity) -{ - x->x_verbosity = (verbosity == 0)?0:1; /* only two states so far */ -} -static void tcp_client_hexdump(t_tcpclient *x, long len) -{ -#define BYTES_PER_LINE 16 - char hexStr[(3*BYTES_PER_LINE)+1]; - char ascStr[BYTES_PER_LINE+1]; - long i, j, k = 0L; - unsigned char *buf = x->x_msginbuf; - - if (x->x_verbosity) post("%s_hexdump %d:", objName, len); - while (k < len) - { - for (i = j = 0; i < BYTES_PER_LINE; ++i, ++k, j+=3) - { - if (k < len) - { - snprintf(&hexStr[j], 4, "%02X ", buf[k]); - snprintf(&ascStr[i], 2, "%c", ((buf[k] >= 32) && (buf[k] <= 126))? buf[k]: '.'); - } - else - { // the last line - snprintf(&hexStr[j], 4, " "); - snprintf(&ascStr[i], 2, " "); - } - } - post ("%s%s", hexStr, ascStr); - } -} + /* multithread stuff */ + pthread_t x_threadid; /* id of child thread */ + pthread_attr_t x_threadattr; /* attributes of child thread */ +} t_tcpclient; -static void tcpclient_tick(t_tcpclient *x) -{ - outlet_float(x->x_connectout, 1); -} + +static void tcpclient_receive_callback(t_tcpclient *x, int sockfd, int argc, t_atom*argv); + + + +/* connection handling */ static void *tcpclient_child_connect(void *w) { - t_tcpclient *x = (t_tcpclient*) w; - struct sockaddr_in server; - struct hostent *hp; - int sockfd; + t_tcpclient *x = (t_tcpclient*) w; + struct sockaddr_in server; + struct hostent *hp; + int sockfd; - if (x->x_fd >= 0) + if (x->x_fd >= 0) { - error("%s_connect: already connected", objName); - return (x); + error("%s_connect: already connected", objName); + return (x); } - /* create a socket */ - sockfd = socket(AF_INET, SOCK_STREAM, 0); + /* create a socket */ + sockfd = socket(AF_INET, SOCK_STREAM, 0); #ifdef DEBUG - post("%s: send socket %d\n", objName, sockfd); + post("%s: send socket %d\n", objName, sockfd); #endif - if (sockfd < 0) + if (sockfd < 0) { - sys_sockerror("tcpclient: socket"); - return (x); + sys_sockerror("tcpclient: socket"); + return (x); } - /* connect socket using hostname provided in command line */ - server.sin_family = AF_INET; - hp = gethostbyname(x->x_hostname); - if (hp == 0) + /* connect socket using hostname provided in command line */ + server.sin_family = AF_INET; + hp = gethostbyname(x->x_hostname); + if (hp == 0) { - sys_sockerror("tcpclient: bad host?\n"); - return (x); + sys_sockerror("tcpclient: bad host?\n"); + return (x); } - memcpy((char *)&server.sin_addr, (char *)hp->h_addr, hp->h_length); + memcpy((char *)&server.sin_addr, (char *)hp->h_addr, hp->h_length); - /* assign client port number */ - server.sin_port = htons((u_short)x->x_port); + /* assign client port number */ + server.sin_port = htons((u_short)x->x_port); - if (x->x_verbosity) post("%s: connecting socket %d to port %d", objName, sockfd, x->x_port); - /* try to connect */ - if (connect(sockfd, (struct sockaddr *) &server, sizeof (server)) < 0) + /* try to connect */ + if (connect(sockfd, (struct sockaddr *) &server, sizeof (server)) < 0) { - sys_sockerror("tcpclient: connecting stream socket"); - sys_closesocket(sockfd); - return (x); + sys_sockerror("tcpclient: connecting stream socket"); + sys_closesocket(sockfd); + return (x); } - x->x_fd = sockfd; - x->x_addr = ntohl(*(long *)hp->h_addr); - /* outlet_float is not threadsafe ! */ - // outlet_float(x->x_obj.ob_outlet, 1); - x->x_connectstate = 1; - /* use callback instead to set outlet */ - clock_delay(x->x_clock, 0); - return (x); + x->x_fd = sockfd; + x->x_addr = ntohl(*(long *)hp->h_addr); + + x->x_sender=iemnet__sender_create(sockfd); + x->x_receiver=iemnet__receiver_create(sockfd, x, (t_iemnet_receivecallback)tcpclient_receive_callback); + + x->x_connectstate = 1; + + /* use callback to set outlet in main thread */ + clock_delay(x->x_clock, 0); + return (x); +} +static void tcpclient_tick(t_tcpclient *x) +{ + outlet_float(x->x_connectout, 1); } + + static void tcpclient_connect(t_tcpclient *x, t_symbol *hostname, t_floatarg fportno) { - /* we get hostname and port and pass them on - to the child thread that establishes the connection */ - x->x_hostname = hostname->s_name; - x->x_port = fportno; - x->x_connectstate = 0; - /* start child thread */ - if(pthread_create(&x->x_threadid, &x->x_threadattr, tcpclient_child_connect, x) < 0) - post("%s: could not create new thread", objName); + /* we get hostname and port and pass them on + to the child thread that establishes the connection */ + x->x_hostname = hostname->s_name; + x->x_port = fportno; + x->x_connectstate = 0; + /* start child thread */ + if(pthread_create(&x->x_threadid, &x->x_threadattr, tcpclient_child_connect, x) < 0) + post("%s: could not create new thread", objName); } static void tcpclient_disconnect(t_tcpclient *x) { - if (x->x_fd >= 0) + if(x->x_sender)iemnet__sender_destroy(x->x_sender); + if(x->x_receiver)iemnet__receiver_destroy(x->x_receiver); + + if (x->x_fd >= 0) { - sys_closesocket(x->x_fd); - x->x_fd = -1; - x->x_connectstate = 0; - outlet_float(x->x_connectout, 0); - if (x->x_verbosity) post("%s: disconnected", objName); + sys_closesocket(x->x_fd); + x->x_fd = -1; + x->x_connectstate = 0; + outlet_float(x->x_connectout, 0); } - else post("%s: not connected", objName); + else pd_error(x, "%s: not connected", objName); } +/* sending/receiving */ + static void tcpclient_send(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv) { -//#define BYTE_BUF_LEN 65536 // arbitrary maximum similar to max IP packet size -// static char byte_buf[BYTE_BUF_LEN]; - int i, j, d; - unsigned char c; - float f, e; - char *bp; - int length; - size_t sent; - int result; - char fpath[FILENAME_MAX]; - FILE *fptr; - t_atom output_atom; + int size=0; + t_atom output_atom; + t_iemnet_sender*sender=x->x_sender; -#ifdef DEBUG - post("s: %s", s->s_name); - post("argc: %d", argc); -#endif + t_iemnet_chunk*chunk=iemnet__chunk_create_list(argc, argv); + if(sender && chunk) { + size=iemnet__sender_send(sender, chunk); + } + iemnet__chunk_destroy(chunk); - if (x->x_fd < 0) - { - error("%s: not connected", objName); - return; - } - for (i = j = 0; i < argc; ++i) - { - if (argv[i].a_type == A_FLOAT) - { - f = argv[i].a_w.w_float; - d = (int)f; - e = f - d; - if (e != 0) - { - error("%s_send: item %d (%f) is not an integer", objName, i, f); - return; - } - if ((d < 0) || (d > 255)) - { - error("%s: item %d (%f) is not between 0 and 255", objName, i, f); - return; - } - c = (unsigned char)d; - if (0 == tcpclient_send_byte(x, c)) break; - ++j; - } - else if (argv[i].a_type == A_SYMBOL) - { - - atom_string(&argv[i], fpath, FILENAME_MAX); - fptr = fopen(fpath, "rb"); - if (fptr == NULL) - { - post("%s_send: unable to open \"%s\"", objName, fpath); - return; - } - rewind(fptr); - while ((d = fgetc(fptr)) != EOF) - { - c = (char)(d & 0x0FF); - if (0 == tcpclient_send_byte(x, c)) break; - ++j; - } - fclose(fptr); - fptr = NULL; - if (x->x_verbosity) post("%s_send: read \"%s\" length %d byte%s", objName, fpath, j, ((d==1)?"":"s")); - } - else - { - error("%s_send: item %d is not a float or a file name", objName, i); - return; - } - } - sent = j; - SETFLOAT(&output_atom, sent); - outlet_anything( x->x_statusout, gensym("sent"), 1, &output_atom); + SETFLOAT(&output_atom, size); + outlet_anything( x->x_statusout, gensym("sent"), 1, &output_atom); } -int tcpclient_send_byte(t_tcpclient *x, char byte) -{ - int result = 0; - fd_set wfds; - struct timeval timeout; - - FD_ZERO(&wfds); - FD_SET(x->x_fd, &wfds); - timeout.tv_sec = 0; - timeout.tv_usec = x->x_timeout_us; /* give it about a millisecond to clear buffer */ - result = select(x->x_fd+1, NULL, &wfds, NULL, &timeout); - if (result == -1) - { - post("%s_send_byte: select returned error %d", objName, errno); - return 0; - } - if (FD_ISSET(x->x_fd, &wfds)) - { - result = send(x->x_fd, &byte, 1, 0); - if (result <= 0) - { - sys_sockerror("tcpclient: send"); - post("%s_send_byte: could not send data ", objName); - return 0; - } - } - return result; -} +static void tcpclient_receive_callback(t_tcpclient *x, int sockfd, int argc, t_atom*argv) { + // ignore sockfd -/* Return the send buffer size of socket, also output it on status outlet */ -static int tcpclient_get_socket_send_buf_size(t_tcpclient *x) -{ - int optVal = 0; - socklen_t optLen = sizeof(int); - t_atom output_atom; -#ifdef _WIN32 - if (getsockopt(x->x_fd, SOL_SOCKET, SO_SNDBUF, (char*)&optVal, &optLen) == SOCKET_ERROR) - post("%_get_socket_send_buf_size: getsockopt returned %d\n", objName, WSAGetLastError()); -#else - if (getsockopt(x->x_fd, SOL_SOCKET, SO_SNDBUF, (char*)&optVal, &optLen) == -1) - post("%_get_socket_send_buf_size: getsockopt returned %d\n", objName, errno); -#endif - SETFLOAT(&output_atom, optVal); - outlet_anything( x->x_statusout, gensym("buf"), 1, &output_atom); - return optVal; + if(argc) { + outlet_list(x->x_msgout, &s_list, argc, argv); + } else { + // disconnected + tcpclient_disconnect(x); + } } -/* Set the send buffer size of socket, returns actual size */ -static int tcpclient_set_socket_send_buf_size(t_tcpclient *x, int size) -{ - int optVal = size; - int optLen = sizeof(int); -#ifdef _WIN32 - if (setsockopt(x->x_fd, SOL_SOCKET, SO_SNDBUF, (char*)&optVal, optLen) == SOCKET_ERROR) - { - post("%s_set_socket_send_buf_size: setsockopt returned %d\n", objName, WSAGetLastError()); -#else - if (setsockopt(x->x_fd, SOL_SOCKET, SO_SNDBUF, (char*)&optVal, optLen) == -1) - { - post("%s_set_socket_send_buf_size: setsockopt returned %d\n", objName, errno); -#endif - return 0; - } - else return (tcpclient_get_socket_send_buf_size(x)); -} +/* constructor/destructor */ -/* Get/set the send buffer size of client socket */ -static void tcpclient_buf_size(t_tcpclient *x, t_symbol *s, int argc, t_atom *argv) +static void *tcpclient_new(void) { - int client = -1; - float buf_size = 0; - t_atom output_atom[3]; + int i; - if(x->x_connectstate == 0) - { - post("%s_buf_size: no clients connected", objName); - return; - } - /* get size of buffer (first element in list) */ - if (argc > 0) - { - if (argv[0].a_type != A_FLOAT) - { - post("%s_buf_size: specify buffer size with a float", objName); - return; - } - buf_size = atom_getfloatarg(0, argc, argv); - x->x_fdbuf = tcpclient_set_socket_send_buf_size(x, (int)buf_size); - if (x->x_verbosity) post("%s_buf_size: set to %d", objName, x->x_fdbuf); - return; - } - x->x_fdbuf = tcpclient_get_socket_send_buf_size(x); - return; -} + t_tcpclient *x = (t_tcpclient *)pd_new(tcpclient_class); + x->x_msgout = outlet_new(&x->x_obj, &s_anything); /* received data */ + x->x_addrout = outlet_new(&x->x_obj, &s_list); + x->x_connectout = outlet_new(&x->x_obj, &s_float); /* connection state */ + x->x_statusout = outlet_new(&x->x_obj, &s_anything);/* last outlet for everything else */ -static void tcpclient_rcv(t_tcpclient *x) -{ - int sockfd = x->x_fd; - int ret; - int i; - fd_set readset; - fd_set exceptset; - struct timeval ztout; - - if(x->x_connectstate) + x->x_fd = -1; + + for (i = 0; i < 4; ++i) { - /* check if we can read/write from/to the socket */ - FD_ZERO(&readset); - FD_ZERO(&exceptset); - FD_SET(x->x_fd, &readset ); - FD_SET(x->x_fd, &exceptset ); - - ztout.tv_sec = 0; - ztout.tv_usec = 0; - - ret = select(sockfd+1, &readset, NULL, &exceptset, &ztout); - if(ret < 0) - { - error("%s: unable to read from socket", objName); - sys_closesocket(sockfd); - return; - } - if(FD_ISSET(sockfd, &readset) || FD_ISSET(sockfd, &exceptset)) - { - /* read from server */ - ret = recv(sockfd, x->x_msginbuf, MAX_UDP_RECEIVE, 0); - if(ret > 0) - { -#ifdef DEBUG - x->x_msginbuf[ret] = 0; - post("%s: received %d bytes ", objName, ret); -#endif - if (x->x_dump)tcp_client_hexdump(x, ret); - for (i = 0; i < ret; ++i) - { - /* convert the bytes in the buffer to floats in a list */ - x->x_msgoutbuf[i].a_w.w_float = (float)x->x_msginbuf[i]; - } - /* find sender's ip address and output it */ - x->x_addrbytes[0].a_w.w_float = (x->x_addr & 0xFF000000)>>24; - x->x_addrbytes[1].a_w.w_float = (x->x_addr & 0x0FF0000)>>16; - x->x_addrbytes[2].a_w.w_float = (x->x_addr & 0x0FF00)>>8; - x->x_addrbytes[3].a_w.w_float = (x->x_addr & 0x0FF); - outlet_list(x->x_addrout, &s_list, 4L, x->x_addrbytes); - /* send the list out the outlet */ - if (ret > 1) outlet_list(x->x_msgout, &s_list, ret, x->x_msgoutbuf); - else outlet_float(x->x_msgout, x->x_msgoutbuf[0].a_w.w_float); - } - else - { - if (ret < 0) - { - sys_sockerror("tcpclient: recv"); - tcpclient_disconnect(x); - } - else - { - if (x->x_verbosity) post("%s: connection closed for socket %d\n", objName, sockfd); - tcpclient_disconnect(x); - } - } - } + SETFLOAT(x->x_addrbytes+i, 0); } - else post("%s: not connected", objName); -} + x->x_addr = 0L; -static void tcpclient_poll(t_tcpclient *x) -{ - if(x->x_connectstate) - tcpclient_rcv(x); /* try to read in case we're connected */ - clock_delay(x->x_poll, DEFPOLLTIME); /* see you later */ -} + x->x_sender=NULL; + x->x_receiver=NULL; -static void *tcpclient_new(t_floatarg udpflag) -{ - int i; - - t_tcpclient *x = (t_tcpclient *)pd_new(tcpclient_class); - x->x_msgout = outlet_new(&x->x_obj, &s_anything); /* received data */ - x->x_addrout = outlet_new(&x->x_obj, &s_list); - x->x_connectout = outlet_new(&x->x_obj, &s_float); /* connection state */ - x->x_statusout = outlet_new(&x->x_obj, &s_anything);/* last outlet for everything else */ - x->x_clock = clock_new(x, (t_method)tcpclient_tick); - x->x_poll = clock_new(x, (t_method)tcpclient_poll); - x->x_verbosity = 1; /* default post status changes to main window */ - x->x_fd = -1; - /* convert the bytes in the buffer to floats in a list */ - for (i = 0; i < MAX_UDP_RECEIVE; ++i) - { - x->x_msgoutbuf[i].a_type = A_FLOAT; - x->x_msgoutbuf[i].a_w.w_float = 0; - } - for (i = 0; i < 4; ++i) - { - x->x_addrbytes[i].a_type = A_FLOAT; - x->x_addrbytes[i].a_w.w_float = 0; - } - x->x_addr = 0L; - x->x_timeout_us = 1000; /* set a default 1ms send timeout */ - /* prepare child thread */ - if(pthread_attr_init(&x->x_threadattr) < 0) - post("%s: warning: could not prepare child thread", objName); - if(pthread_attr_setdetachstate(&x->x_threadattr, PTHREAD_CREATE_DETACHED) < 0) - post("%s: warning: could not prepare child thread", objName); - clock_delay(x->x_poll, 0); /* start polling the input */ - return (x); + + x->x_clock = clock_new(x, (t_method)tcpclient_tick); + + /* prepare child thread */ + if(pthread_attr_init(&x->x_threadattr) < 0) + post("%s: warning: could not prepare child thread", objName); + if(pthread_attr_setdetachstate(&x->x_threadattr, PTHREAD_CREATE_DETACHED) < 0) + post("%s: warning: could not prepare child thread", objName); + + + return (x); } static void tcpclient_free(t_tcpclient *x) { - tcpclient_disconnect(x); - clock_free(x->x_poll); - clock_free(x->x_clock); + tcpclient_disconnect(x); + clock_free(x->x_poll); + clock_free(x->x_clock); } void tcpclient_setup(void) { - tcpclient_class = class_new(gensym(objName), (t_newmethod)tcpclient_new, - (t_method)tcpclient_free, - sizeof(t_tcpclient), 0, A_DEFFLOAT, 0); - class_addmethod(tcpclient_class, (t_method)tcpclient_connect, gensym("connect") - , A_SYMBOL, A_FLOAT, 0); - class_addmethod(tcpclient_class, (t_method)tcpclient_disconnect, gensym("disconnect"), 0); - class_addmethod(tcpclient_class, (t_method)tcpclient_send, gensym("send"), A_GIMME, 0); - class_addmethod(tcpclient_class, (t_method)tcpclient_buf_size, gensym("buf"), A_GIMME, 0); - class_addmethod(tcpclient_class, (t_method)tcpclient_rcv, gensym("receive"), 0); - class_addmethod(tcpclient_class, (t_method)tcpclient_rcv, gensym("rcv"), 0); - class_addmethod(tcpclient_class, (t_method)tcpclient_verbosity, gensym("verbosity"), A_FLOAT, 0); - class_addmethod(tcpclient_class, (t_method)tcpclient_dump, gensym("dump"), A_FLOAT, 0); - class_addmethod(tcpclient_class, (t_method)tcpclient_timeout, gensym("timeout"), A_FLOAT, 0); - class_addlist(tcpclient_class, (t_method)tcpclient_send); + tcpclient_class = class_new(gensym(objName), (t_newmethod)tcpclient_new, + (t_method)tcpclient_free, + sizeof(t_tcpclient), 0, A_DEFFLOAT, 0); + class_addmethod(tcpclient_class, (t_method)tcpclient_connect, gensym("connect") + , A_SYMBOL, A_FLOAT, 0); + class_addmethod(tcpclient_class, (t_method)tcpclient_disconnect, gensym("disconnect"), 0); + class_addmethod(tcpclient_class, (t_method)tcpclient_send, gensym("send"), A_GIMME, 0); + class_addlist(tcpclient_class, (t_method)tcpclient_send); } /* end of tcpclient.c */ diff --git a/tcpserver.c b/tcpserver.c index 9dcf529..f5f4f09 100644 --- a/tcpserver.c +++ b/tcpserver.c @@ -7,59 +7,36 @@ /* */ /* A server for bidirectional communication from within Pd. */ /* Allows to send back data to specific clients connected to the server. */ -/* Written by Olaf Matthes */ -/* Get source at http://www.akustische-kunst.org/puredata/maxlib */ - /* */ - /* This program is free software; you can redistribute it and/or */ - /* modify it under the terms of the GNU General Public License */ - /* as published by the Free Software Foundation; either version 2 */ - /* of the License, or (at your option) any later version. */ - /* */ - /* This program is distributed in the hope that it will be useful, */ - /* but WITHOUT ANY WARRANTY; without even the implied warranty of */ - /* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */ - /* GNU General Public License for more details. */ - /* */ - /* You should have received a copy of the GNU General Public License */ - /* along with this program; if not, write to the Free Software */ - /* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */ - /* */ - - /* ---------------------------------------------------------------------------- */ -#define DEBUG +/* */ +/* This program is free software; you can redistribute it and/or */ +/* modify it under the terms of the GNU General Public License */ +/* as published by the Free Software Foundation; either version 2 */ +/* of the License, or (at your option) any later version. */ +/* */ +/* This program is distributed in the hope that it will be useful, */ +/* but WITHOUT ANY WARRANTY; without even the implied warranty of */ +/* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */ +/* GNU General Public License for more details. */ +/* */ +/* You should have received a copy of the GNU General Public License */ +/* along with this program; if not, write to the Free Software */ +/* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */ +/* */ + +/* ---------------------------------------------------------------------------- */ +//#define DEBUG #include "iemnet.h" -#include "m_imp.h" #include "s_stuff.h" -#include -#include #if defined(UNIX) || defined(unix) -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include /* linux has the SIOCOUTQ ioctl */ -#define SOCKET_ERROR -1 +# include #else -#include +# include #endif -#ifdef _MSC_VER -#define snprintf sprintf_s -#endif - #define MAX_CONNECT 32 /* maximum number of connections */ -#define INBUFSIZE 65536L /* was 4096: size of receiving data buffer */ -#define MAX_UDP_RECEIVE 65536L /* longer than data in maximum UDP packet */ - /* ----------------------------- tcpserver ------------------------- */ @@ -79,8 +56,6 @@ typedef struct _tcpserver t_object x_obj; t_outlet *x_msgout; t_outlet *x_connectout; - t_outlet *x_sockout; - t_outlet *x_addrout; t_outlet *x_status_outlet; t_tcpserver_socketreceiver *x_sr[MAX_CONNECT]; /* socket per connection */ @@ -91,25 +66,7 @@ typedef struct _tcpserver t_atom x_addrbytes[4]; } t_tcpserver; -static t_tcpserver_socketreceiver *tcpserver_socketreceiver_new(void *owner, int sockfd, t_symbol*host); -static void tcpserver_socketreceiver_free(t_tcpserver_socketreceiver *x); - -static void tcpserver_send_client(t_tcpserver *x, t_symbol *s, int argc, t_atom *argv); -static void tcpserver_send_socket(t_tcpserver *x, t_symbol *s, int argc, t_atom *argv); -static void tcpserver_send_bytes(t_tcpserver *x, int sockfd, t_iemnet_chunk*chunk); -#ifdef SIOCOUTQ -static int tcpserver_send_buffer_avaliable_for_client(t_tcpserver *x, int client); -#endif -static void tcpserver_datacallback(t_tcpserver *x, int sockfd, int argc, t_atom*argv); - -static void tcpserver_disconnect_client(t_tcpserver *x, t_floatarg fclient); -static void tcpserver_disconnect_socket(t_tcpserver *x, t_floatarg fsocket); -static void tcpserver_broadcast(t_tcpserver *x, t_symbol *s, int argc, t_atom *argv); -static void tcpserver_connectpoll(t_tcpserver *x); -static void *tcpserver_new(t_floatarg fportno); -static void tcpserver_free(t_tcpserver *x); -void tcpserver_setup(void); - +static void tcpserver_receive_callback(t_tcpserver *x, int sockfd, int argc, t_atom*argv); static t_tcpserver_socketreceiver *tcpserver_socketreceiver_new(void *owner, int sockfd, t_symbol*host) { @@ -122,7 +79,7 @@ static t_tcpserver_socketreceiver *tcpserver_socketreceiver_new(void *owner, int x->sr_fd=sockfd; x->sr_sender=iemnet__sender_create(sockfd); - x->sr_receiver=iemnet__receiver_create(sockfd, owner, (t_iemnet_receivecallback)tcpserver_datacallback); + x->sr_receiver=iemnet__receiver_create(sockfd, owner, (t_iemnet_receivecallback)tcpserver_receive_callback); } return (x); } @@ -131,13 +88,15 @@ static void tcpserver_socketreceiver_free(t_tcpserver_socketreceiver *x) { if (x != NULL) { - if(x->sr_sender)iemnet__sender_destroy(x->sr_sender); + if(x->sr_sender) iemnet__sender_destroy(x->sr_sender); if(x->sr_receiver)iemnet__receiver_destroy(x->sr_receiver); + + sys_closesocket(x->sr_fd); + freebytes(x, sizeof(*x)); } } - static int tcpserver_socket2index(t_tcpserver*x, int sockfd) { int i=0; @@ -148,10 +107,29 @@ static int tcpserver_socket2index(t_tcpserver*x, int sockfd) return i; } } - return -1; } +/* checks whether client is a valid (1-based) index + * if the id is invalid, returns -1 + * if the id is valid, return the 0-based index (client-1) + */ +static int tcpserver_fixindex(t_tcpserver*x, int client) +{ + if(x->x_nconnections <= 0) + { + pd_error(x, "[%s]: no clients connected", objName); + return -1; + } + + if (!((client > 0) && (client <= x->x_nconnections))) + { + pd_error(x, "[%s] client %d out of range [1..%d]", objName, client, x->x_nconnections); + return -1; + } + return (client-1); +} + /* ---------------- main tcpserver (send) stuff --------------------- */ static void tcpserver_send_bytes(t_tcpserver*x, int client, t_iemnet_chunk*chunk) @@ -174,55 +152,24 @@ static void tcpserver_send_bytes(t_tcpserver*x, int client, t_iemnet_chunk*chunk } } -#ifdef SIOCOUTQ -/* SIOCOUTQ exists only(?) on linux, returns remaining space in the socket's output buffer */ -static int tcpserver_send_buffer_avaliable_for_client(t_tcpserver *x, int client) -{ - int sockfd = x->x_sr[client].sr_fd; - int result = 0L; - - ioctl(sockfd, SIOCOUTQ, &result); - return result; -} -#endif // SIOCOUTQ - - - /* send message to client using client number note that the client numbers might change in case a client disconnects! */ /* clients start at 1 but our index starts at 0 */ static void tcpserver_send_client(t_tcpserver *x, t_symbol *s, int argc, t_atom *argv) { - int client = -1; - - if(x->x_nconnections <= 0) - { - post("%s_client_send: no clients connected", objName); - return; - } - if(argc > 0) - { - /* get number of client (first element in list) */ - if(argv[0].a_type == A_FLOAT) - client = atom_getfloatarg(0, argc, argv); - else - { - post("%s_client_send: specify client by number", objName); - return; - } - if (!((client > 0) && (client < MAX_CONNECT))) - { - post("%s_client_send: client %d out of range [1..%d]", objName, client, MAX_CONNECT); - return; - } - } if (argc > 1) { + int client=tcpserver_fixindex(x, atom_getint(argv)); + if(client<0)return; t_iemnet_chunk*chunk=iemnet__chunk_create_list(argc-1, argv+1); --client;/* zero based index*/ tcpserver_send_bytes(x, client, chunk); return; } + else + { + pd_error(x, "[%s] no client specified", objName); + } } /* broadcasts a message to all connected clients */ @@ -237,20 +184,44 @@ static void tcpserver_broadcast(t_tcpserver *x, t_symbol *s, int argc, t_atom *a /* socket exists for this client */ tcpserver_send_bytes(x, client, chunk); } + iemnet__chunk_destroy(chunk); } +/* broadcasts a message to all connected clients */ +static void tcpserver_broadcastbut(t_tcpserver *x, t_symbol *s, int argc, t_atom *argv) +{ + int client=0; + int but=-1; + t_iemnet_chunk*chunk=NULL; + + if(argc<2) { + return; + } + if((but=tcpserver_fixindex(x, atom_getint(argv)))<0)return; + chunk=iemnet__chunk_create_list(argc+1, argv+1); + + /* enumerate through the clients and send each the message */ + for(client = 0; client < x->x_nconnections; client++) /* check if connection exists */ + { + /* socket exists for this client */ + if(client!=but)tcpserver_send_bytes(x, client, chunk); + } + iemnet__chunk_destroy(chunk); +} /* send message to client using socket number */ static void tcpserver_send_socket(t_tcpserver *x, t_symbol *s, int argc, t_atom *argv) { int client = -1; + if(argc) { + client = tcpserver_socket2index(x, atom_getint(argv)); + if(client<0)return; + } else { + pd_error(x, "%s_send: no socket specified", objName); + return; + } - if(x->x_nconnections <= 0) - { - post("%s_send: no clients connected", objName); - return; - } /* get socket number of connection (first element in list) */ if(argc && argv->a_type == A_FLOAT) { @@ -270,24 +241,15 @@ static void tcpserver_send_socket(t_tcpserver *x, t_symbol *s, int argc, t_atom t_iemnet_chunk*chunk=iemnet__chunk_create_list(argc-1, argv+1); tcpserver_send_bytes(x, client, chunk); + iemnet__chunk_destroy(chunk); } - - - static void tcpserver_disconnect(t_tcpserver *x, int client) { - t_tcpserver_socketreceiver *y=NULL; - int fd=0; int k; - y = x->x_sr[client]; - fd = y->sr_fd; - post("closing fd[%d]=%d", client, fd); - tcpserver_socketreceiver_free(x->x_sr[client]); x->x_sr[client]=NULL; - sys_closesocket(fd); /* rearrange list now: move entries to close the gap */ for(k = client; k < x->x_nconnections; k++) @@ -304,20 +266,9 @@ static void tcpserver_disconnect(t_tcpserver *x, int client) /* disconnect a client by number */ static void tcpserver_disconnect_client(t_tcpserver *x, t_floatarg fclient) { - int client = (int)fclient; - - if(x->x_nconnections <= 0) - { - post("%s_client_disconnect: no clients connected", objName); - return; - } - if (!((client > 0) && (client < MAX_CONNECT))) - { - post("%s: client %d out of range [1..%d]", objName, client, MAX_CONNECT); - return; - } - --client; /* zero based index*/ + int client = tcpserver_fixindex(x, fclient); + if(client<0)return; tcpserver_disconnect(x, client); } @@ -332,16 +283,23 @@ static void tcpserver_disconnect_socket(t_tcpserver *x, t_floatarg fsocket) +/* disconnect a client by socket */ +static void tcpserver_disconnect_all(t_tcpserver *x) +{ + int id=x->x_nconnections; + while(--id>=0) { + tcpserver_disconnect(x, id); + } +} + + + /* ---------------- main tcpserver (receive) stuff --------------------- */ -static void tcpserver_datacallback(t_tcpserver *x, int sockfd, int argc, t_atom*argv) { - static int packetcount=0; - static int bytecount=0; - if(argc) { +static void tcpserver_receive_callback(t_tcpserver *x, int sockfd, int argc, t_atom*argv) { + if(argc) { outlet_list(x->x_msgout, &s_list, argc, argv); - packetcount++; - bytecount+=argc; } else { // disconnected tcpserver_disconnect_socket(x, sockfd); @@ -371,6 +329,8 @@ static void tcpserver_connectpoll(t_tcpserver *x) i = x->x_nconnections - 1; x->x_sr[i] = y; } + + outlet_float(x->x_connectout, x->x_nconnections); } static void *tcpserver_new(t_floatarg fportno) @@ -419,8 +379,6 @@ static void *tcpserver_new(t_floatarg fportno) { sys_addpollfn(sockfd, (t_fdpollfn)tcpserver_connectpoll, x); // wait for new connections x->x_connectout = outlet_new(&x->x_obj, &s_float); /* 2nd outlet for number of connected clients */ - x->x_sockout = outlet_new(&x->x_obj, &s_float); /* 3rd outlet for socket number of current client */ - x->x_addrout = outlet_new(&x->x_obj, &s_list); /* 4th outlet for ip address of current client */ x->x_status_outlet = outlet_new(&x->x_obj, &s_anything);/* 5th outlet for everything else */ } x->x_connectsocket = sockfd; @@ -445,13 +403,8 @@ static void tcpserver_free(t_tcpserver *x) for(i = 0; i < MAX_CONNECT; i++) { - if (NULL!=x->x_sr[i]) { tcpserver_socketreceiver_free(x->x_sr[i]); - if (x->x_sr[i]->sr_fd >= 0) - { - sys_closesocket(x->x_sr[i]->sr_fd); - } } } if (x->x_connectsocket >= 0) @@ -468,14 +421,16 @@ void tcpserver_setup(void) sizeof(t_tcpserver), 0, A_DEFFLOAT, 0); class_addmethod(tcpserver_class, (t_method)tcpserver_disconnect_client, gensym("disconnectclient"), A_DEFFLOAT, 0); class_addmethod(tcpserver_class, (t_method)tcpserver_disconnect_socket, gensym("disconnectsocket"), A_DEFFLOAT, 0); + class_addmethod(tcpserver_class, (t_method)tcpserver_disconnect_all, gensym("disconnect"), 0); class_addmethod(tcpserver_class, (t_method)tcpserver_send_socket, gensym("send"), A_GIMME, 0); class_addmethod(tcpserver_class, (t_method)tcpserver_send_client, gensym("client"), A_GIMME, 0); class_addmethod(tcpserver_class, (t_method)tcpserver_broadcast, gensym("broadcast"), A_GIMME, 0); - class_addlist(tcpserver_class, (t_method)tcpserver_broadcast); + class_addmethod(tcpserver_class, (t_method)tcpserver_broadcastbut, gensym("broadcastbut"), A_GIMME, 0); + class_addlist(tcpserver_class, (t_method)tcpserver_broadcast); post("iemnet: networking with Pd :: %s", objName); post(" (c) 2010 IOhannes m zmoelnig, IEM"); -- cgit v1.2.1