aboutsummaryrefslogtreecommitdiff
path: root/tcpserver.c
diff options
context:
space:
mode:
authorIOhannes m zmölnig <zmoelnig@users.sourceforge.net>2010-03-23 13:36:30 +0000
committerIOhannes m zmölnig <zmoelnig@users.sourceforge.net>2010-03-23 13:36:30 +0000
commit7dc935cf2938b1e47b60716654a5879737b5d437 (patch)
tree256a085b92a32f5be16fc036f677ba176112f1ee /tcpserver.c
parentee1b0d431491d1fef7673e26e6ffb651c0ca1a93 (diff)
cleaned up
svn path=/trunk/externals/iem/iemnet/; revision=13242
Diffstat (limited to 'tcpserver.c')
-rw-r--r--tcpserver.c1404
1 files changed, 668 insertions, 736 deletions
diff --git a/tcpserver.c b/tcpserver.c
index 5272029..0be452c 100644
--- a/tcpserver.c
+++ b/tcpserver.c
@@ -1,31 +1,31 @@
-/* tcpserver.c Martin Peach 20060511 working version 20060512 */
-/* 20060515 works on linux too... */
-/* tcpserver.c is based on netserver: */
-/* -------------------------- netserver ------------------------------------- */
+/* tcpserver.c
+ * copyright (c) 2010 IOhannes m zmölnig, IEM
+ * copyright (c) 2006-2010 Martin Peach
+ * copyright (c) 2004 Olaf Matthes
+ */
+
/* */
/* A server for bidirectional communication from within Pd. */
/* Allows to send back data to specific clients connected to the server. */
/* Written by Olaf Matthes <olaf.matthes@gmx.de> */
/* Get source at http://www.akustische-kunst.org/puredata/maxlib */
-/* */
-/* This program is free software; you can redistribute it and/or */
-/* modify it under the terms of the GNU General Public License */
-/* as published by the Free Software Foundation; either version 2 */
-/* of the License, or (at your option) any later version. */
-/* */
-/* This program is distributed in the hope that it will be useful, */
-/* but WITHOUT ANY WARRANTY; without even the implied warranty of */
-/* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */
-/* GNU General Public License for more details. */
-/* */
-/* You should have received a copy of the GNU General Public License */
-/* along with this program; if not, write to the Free Software */
-/* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */
-/* */
-/* Based on PureData by Miller Puckette and others. */
-/* */
-/* ---------------------------------------------------------------------------- */
-//define DEBUG
+ /* */
+ /* This program is free software; you can redistribute it and/or */
+ /* modify it under the terms of the GNU General Public License */
+ /* as published by the Free Software Foundation; either version 2 */
+ /* of the License, or (at your option) any later version. */
+ /* */
+ /* This program is distributed in the hope that it will be useful, */
+ /* but WITHOUT ANY WARRANTY; without even the implied warranty of */
+ /* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */
+ /* GNU General Public License for more details. */
+ /* */
+ /* You should have received a copy of the GNU General Public License */
+ /* along with this program; if not, write to the Free Software */
+ /* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */
+ /* */
+
+ /* ---------------------------------------------------------------------------- */
#include "m_pd.h"
#include "m_imp.h"
@@ -60,6 +60,285 @@
#define INBUFSIZE 65536L /* was 4096: size of receiving data buffer */
#define MAX_UDP_RECEIVE 65536L /* longer than data in maximum UDP packet */
+
+
+ /* draft:
+ * - there is a sender thread for each open connection
+ * - the main thread just adds chunks to each sender threads processing queue
+ * - the sender thread tries to send the queue as fast as possible
+ */
+
+
+ /* data handling */
+#include <string.h>
+
+typedef struct _chunk {
+ unsigned char* data;
+ size_t size;
+} t_chunk;
+void chunk_destroy(t_chunk*c) {
+ if(NULL==c)return;
+
+ if(c->data)freebytes(c->data, c->size*sizeof(unsigned char));
+
+ c->data=NULL;
+ c->size=0;
+
+ freebytes(c, sizeof(t_chunk));
+}
+
+t_chunk* chunk_create(int argc, t_atom*argv) {
+ t_chunk*result=(t_chunk*)getbytes(sizeof(t_chunk));
+ int i;
+ if(NULL==result)return NULL;
+
+ result->size=argc;
+ result->data=(unsigned char*)getbytes(sizeof(unsigned char)*argc);
+
+ if(NULL == result->data) {
+ result->size=0;
+ chunk_destroy(result);
+ return NULL;
+ }
+
+ for(i=0; i<argc; i++) {
+ unsigned char c = atom_getint(argv);
+ result->data[i]=c;
+ argv++;
+ }
+
+ return result;
+}
+
+t_chunk*chunk_duplicate(t_chunk*c) {
+ t_chunk*result=NULL;
+ if(NULL==c)return NULL;
+
+ result=(t_chunk*)getbytes(sizeof(t_chunk));
+
+ result->size=c->size;
+ result->data=(unsigned char*)getbytes(sizeof(unsigned char)*(result->size));
+ if(NULL == result->data) {
+ result->size=0;
+ chunk_destroy(result);
+ return NULL;
+ }
+
+ memcpy(result->data, c->data, result->size);
+
+ return result;
+}
+
+
+/* queue handling */
+typedef struct _node {
+ struct _node* next;
+ t_chunk*data;
+} t_node;
+
+typedef struct _queue {
+ t_node* head; /* = 0 */
+ t_node* tail; /* = 0 */
+ pthread_mutex_t mtx;
+ pthread_cond_t cond;
+
+ int done; // in cleanup state
+ int size;
+} t_queue;
+
+
+static int queue_push(
+ t_queue* const _this,
+ t_chunk* const data
+ ) {
+ t_node* tail;
+ t_node* n=NULL;
+ int size=_this->size;
+
+ if(NULL == data) return size;
+ fprintf(stderr, "pushing %d bytes\n", data->size);
+
+ n=(t_node*)getbytes(sizeof(t_node));
+
+ n->next = 0;
+ n->data = data;
+ pthread_mutex_lock(&_this->mtx);
+ if (! (tail = _this->tail)) {
+ _this->head = n;
+ } else {
+ tail->next = n;
+ }
+ _this->tail = n;
+
+ _this->size+=data->size;
+ size=_this->size;
+
+
+ fprintf(stderr, "pushed %d bytes\n", data->size);
+
+ pthread_mutex_unlock(&_this->mtx);
+ pthread_cond_signal(&_this->cond);
+
+ return size;
+}
+
+t_chunk* queue_pop(
+ t_queue* const _this
+ ) {
+ t_node* head=0;
+ t_chunk*data=0;
+ pthread_mutex_lock(&_this->mtx);
+ while (! (head = _this->head)) {
+ if(_this->done) {
+ pthread_mutex_unlock(&_this->mtx);
+ return NULL;
+ }
+ else
+ pthread_cond_wait(&_this->cond, &_this->mtx);
+ }
+ if (! (_this->head = head->next)) {
+ _this->tail = 0;
+ }
+ if(head && head->data) {
+ _this->size-=head->data->size;
+ }
+
+ pthread_mutex_unlock(&_this->mtx);
+ if(head) {
+ data=head->data;
+ freebytes(head, sizeof(t_node));
+ head=NULL;
+ }
+ fprintf(stderr, "popped %d bytes\n", data->size);
+ return data;
+}
+
+void queue_finish(t_queue* q) {
+ if(NULL==q)
+ return;
+ q->done=1;
+ pthread_cond_signal(&q->cond);
+}
+
+void queue_destroy(t_queue* q) {
+ t_chunk*c=NULL;
+ if(NULL==q)
+ return;
+
+ queue_finish(q);
+
+ /* remove all the chunks from the queue */
+ while(NULL!=(c=queue_pop(q))) {
+ chunk_destroy(c);
+ }
+
+ q->head=NULL;
+ q->tail=NULL;
+
+ pthread_mutex_destroy(&q->mtx);
+ pthread_cond_destroy(&q->cond);
+
+ freebytes(q, sizeof(t_queue));
+ q=NULL;
+}
+
+t_queue* queue_create(void) {
+ static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
+ static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
+
+ t_queue*q=(t_queue*)getbytes(sizeof(t_queue));
+ if(NULL==q)return NULL;
+
+ q->head = NULL;
+ q->tail = NULL;
+
+ memcpy(&q->cond, &cond, sizeof(pthread_cond_t));
+ memcpy(&q->mtx , &mtx, sizeof(pthread_mutex_t));
+
+ q->done = 0;
+ q->size = 0;
+
+ return q;
+}
+
+
+typedef struct _sender {
+ pthread_t thread;
+
+ int sockfd; /* owned outside; must call sender_destroy() before freeing socket yourself */
+ t_queue*queue;
+ int cont; // indicates whether we want to thread to continue or to terminate
+} t_sender;
+
+/* the workhorse of the family */
+static void*sender_sendthread(void*arg) {
+ t_sender*sender=(t_sender*)arg;
+
+ int sockfd=sender->sockfd;
+ t_queue*q=sender->queue;
+
+ while(sender->cont) {
+ t_chunk*c=NULL;
+ c=queue_pop(q);
+ if(c) {
+ unsigned char*data=c->data;
+ unsigned int size=c->size;
+
+ int result = send(sockfd, data, size, 0);
+
+
+ // shouldn't we do something with the result here?
+
+ chunk_destroy(c);
+ }
+ }
+ sender->queue=NULL;
+ queue_destroy(q);
+ return NULL;
+}
+
+static int sender_send(t_sender*s, t_chunk*c) {
+ t_queue*q=s->queue;
+ int size=0;
+ if(q) {
+ size = queue_push(q, c);
+ }
+ return size;
+}
+
+static void sender_destroy(t_sender*s) {
+ s->cont=0;
+ if(s->queue)
+ queue_finish(s->queue);
+
+ s->sockfd = -1;
+
+ pthread_join(s->thread, NULL);
+
+ freebytes(s, sizeof(t_sender));
+ s=NULL;
+}
+static t_sender*sender_create(int sock) {
+ t_sender*result=(t_sender*)getbytes(sizeof(t_sender));
+ int res=0;
+
+ if(NULL==result)return NULL;
+
+ result->queue = queue_create();
+ result->sockfd = sock;
+ result->cont =1;
+
+ res=pthread_create(&result->thread, 0, sender_sendthread, result);
+
+ if(0==res) {
+
+ } else {
+ // something went wrong
+ }
+
+ return result;
+}
+
/* ----------------------------- tcpserver ------------------------- */
static t_class *tcpserver_class;
@@ -70,50 +349,40 @@ typedef void (*t_tcpserver_socketreceivefn)(void *x, t_binbuf *b);
typedef struct _tcpserver_socketreceiver
{
- t_symbol *sr_host;
- t_int sr_fd;
- t_int sr_fdbuf;
- u_long sr_addr;
- unsigned char *sr_inbuf;
- int sr_inhead;
- int sr_intail;
- void *sr_owner;
- t_tcpserver_socketnotifier sr_notifier;
- t_tcpserver_socketreceivefn sr_socketreceivefn;
+ t_symbol *sr_host;
+ t_int sr_fd;
+ t_int sr_fdbuf;
+ u_long sr_addr;
+ unsigned char *sr_inbuf;
+ int sr_inhead;
+ int sr_intail;
+ void *sr_owner;
+ t_tcpserver_socketnotifier sr_notifier;
+ t_tcpserver_socketreceivefn sr_socketreceivefn;
+ t_sender*sr_sender;
} t_tcpserver_socketreceiver;
-typedef struct _tcpserver_send_params
-{
- int client;
- int sockfd;
- char *byte_buf;
- size_t length;
- t_int timeout_us;
-} t_tcpserver_send_params;
-
typedef struct _tcpserver
{
- t_object x_obj;
- t_outlet *x_msgout;
- t_outlet *x_connectout;
- t_outlet *x_sockout;
- t_outlet *x_addrout;
- t_outlet *x_status_outlet;
- t_int x_dump; // 1 = hexdump received bytes
-
- t_tcpserver_socketreceiver *x_sr[MAX_CONNECT];
-
- t_atom x_addrbytes[4];
- t_int x_sock_fd;
- t_int x_connectsocket;
- t_int x_nconnections;
- t_int x_timeout_us;
- t_atom x_msgoutbuf[MAX_UDP_RECEIVE];
- char x_msginbuf[MAX_UDP_RECEIVE];
+ t_object x_obj;
+ t_outlet *x_msgout;
+ t_outlet *x_connectout;
+ t_outlet *x_sockout;
+ t_outlet *x_addrout;
+ t_outlet *x_status_outlet;
+
+ t_tcpserver_socketreceiver *x_sr[MAX_CONNECT];
+
+ t_int x_sock_fd;
+ t_int x_connectsocket;
+ t_int x_nconnections;
+
+ t_atom x_addrbytes[4];
+ t_atom x_msgoutbuf[MAX_UDP_RECEIVE];
} t_tcpserver;
static t_tcpserver_socketreceiver *tcpserver_socketreceiver_new(void *owner, t_tcpserver_socketnotifier notifier,
- t_tcpserver_socketreceivefn socketreceivefn);
+ t_tcpserver_socketreceivefn socketreceivefn);
static int tcpserver_socketreceiver_doread(t_tcpserver_socketreceiver *x);
static void tcpserver_socketreceiver_read(t_tcpserver_socketreceiver *x, int fd);
static void tcpserver_socketreceiver_free(t_tcpserver_socketreceiver *x);
@@ -122,13 +391,10 @@ static void tcpserver_send_bytes(int sockfd, t_tcpserver *x, int argc, t_atom *a
#ifdef SIOCOUTQ
static int tcpserver_send_buffer_avaliable_for_client(t_tcpserver *x, int client);
#endif
-static void *tcpserver_send_buf_thread(void *arg);
-static size_t tcpserver_send_buf(int client, int sockfd, char *byte_buf, size_t length, t_int timeout_us);
static void tcpserver_client_send(t_tcpserver *x, t_symbol *s, int argc, t_atom *argv);
static void tcpserver_output_client_state(t_tcpserver *x, int client);
-static int tcpserver_get_socket_send_buf_size(int sockfd);
-static int tcpserver_set_socket_send_buf_size(int sockfd, int size);
-static void tcpserver_buf_size(t_tcpserver *x, t_symbol *s, int argc, t_atom *argv);
+static int tcpserver_set_socket_send_buf_size(int sockfd, int size);
+static int tcpserver_get_socket_send_buf_size(int sockfd);
static void tcpserver_disconnect(t_tcpserver *x);
static void tcpserver_client_disconnect(t_tcpserver *x, t_floatarg fclient);
static void tcpserver_socket_disconnect(t_tcpserver *x, t_floatarg fsocket);
@@ -139,182 +405,131 @@ static void tcpserver_print(t_tcpserver *x);
static void *tcpserver_new(t_floatarg fportno);
static void tcpserver_free(t_tcpserver *x);
void tcpserver_setup(void);
-static void tcpserver_dump(t_tcpserver *x, t_float dump);
-static void tcpserver_timeout(t_tcpserver *x, t_float timeout);
-static void tcpserver_hexdump(unsigned char *buf, long len);
-
-static void tcpserver_timeout(t_tcpserver *x, t_float timeout)
-{
- /* set the timeout on the select call in tcpserver_send_buf */
- /* this is the maximum time in microseconds to wait */
- /* before abandoning attempt to send */
-
- t_int timeout_us = 0;
- if ((timeout >= 0)&&(timeout < 1000000))
- {
- timeout_us = (t_int)timeout;
- x->x_timeout_us = timeout_us;
- }
-}
-
-static void tcpserver_dump(t_tcpserver *x, t_float dump)
-{
- x->x_dump = (dump == 0)?0:1;
-}
-
-static void tcpserver_hexdump(unsigned char *buf, long len)
-{
-#define BYTES_PER_LINE 16
- char hexStr[(3*BYTES_PER_LINE)+1];
- char ascStr[BYTES_PER_LINE+1];
- long i, j, k = 0L;
-#ifdef DEBUG
- post("tcpserver_hexdump %d", len);
-#endif
- while (k < len)
- {
- for (i = j = 0; i < BYTES_PER_LINE; ++i, ++k, j+=3)
- {
- if (k < len)
- {
- snprintf(&hexStr[j], 4, "%02X ", buf[k]);
- snprintf(&ascStr[i], 2, "%c", ((buf[k] >= 32) && (buf[k] <= 126))? buf[k]: '.');
- }
- else
- { // the last line
- snprintf(&hexStr[j], 4, " ");
- snprintf(&ascStr[i], 2, " ");
- }
- }
- post ("%s%s", hexStr, ascStr);
- }
-}
static t_tcpserver_socketreceiver *tcpserver_socketreceiver_new(void *owner, t_tcpserver_socketnotifier notifier,
- t_tcpserver_socketreceivefn socketreceivefn)
+ t_tcpserver_socketreceivefn socketreceivefn)
{
- t_tcpserver_socketreceiver *x = (t_tcpserver_socketreceiver *)getbytes(sizeof(*x));
- if (!x)
+ t_tcpserver_socketreceiver *x = (t_tcpserver_socketreceiver *)getbytes(sizeof(*x));
+ if (!x)
{
- error("%s_socketreceiver: unable to allocate %d bytes", objName, sizeof(*x));
+ error("%s_socketreceiver: unable to allocate %d bytes", objName, sizeof(*x));
}
- else
+ else
{
- x->sr_inhead = x->sr_intail = 0;
- x->sr_owner = owner;
- x->sr_notifier = notifier;
- x->sr_socketreceivefn = socketreceivefn;
- if (!(x->sr_inbuf = malloc(INBUFSIZE)))
+ x->sr_inhead = x->sr_intail = 0;
+ x->sr_owner = owner;
+ x->sr_notifier = notifier;
+ x->sr_socketreceivefn = socketreceivefn;
+ if (!(x->sr_inbuf = malloc(INBUFSIZE)))
{
- freebytes(x, sizeof(*x));
- x = NULL;
- error("%s_socketreceiver: unable to allocate %ld bytes", objName, INBUFSIZE);
+ freebytes(x, sizeof(*x));
+ x = NULL;
+ error("%s_socketreceiver: unable to allocate %ld bytes", objName, INBUFSIZE);
}
}
- return (x);
+ return (x);
}
/* this is in a separately called subroutine so that the buffer isn't
sitting on the stack while the messages are getting passed. */
static int tcpserver_socketreceiver_doread(t_tcpserver_socketreceiver *x)
{
- char messbuf[INBUFSIZE];
- char *bp = messbuf;
- int indx, i;
- int inhead = x->sr_inhead;
- int intail = x->sr_intail;
- unsigned char c;
- t_tcpserver *y = x->sr_owner;
- unsigned char *inbuf = x->sr_inbuf;
-
- if (intail == inhead) return (0);
+ char messbuf[INBUFSIZE];
+ char *bp = messbuf;
+ int indx, i;
+ int inhead = x->sr_inhead;
+ int intail = x->sr_intail;
+ unsigned char c;
+ t_tcpserver *y = x->sr_owner;
+ unsigned char *inbuf = x->sr_inbuf;
+
+ if (intail == inhead) return (0);
#ifdef DEBUG
- post ("%s_socketreceiver_doread: intail=%d inhead=%d", objName, intail, inhead);
+ post ("%s_socketreceiver_doread: intail=%d inhead=%d", objName, intail, inhead);
#endif
- for (indx = intail, i = 0; indx != inhead; indx = (indx+1)&(INBUFSIZE-1), ++i)
+ for (indx = intail, i = 0; indx != inhead; indx = (indx+1)&(INBUFSIZE-1), ++i)
{
- c = *bp++ = inbuf[indx];
- y->x_msgoutbuf[i].a_w.w_float = (float)c;
+ c = *bp++ = inbuf[indx];
+ y->x_msgoutbuf[i].a_w.w_float = (float)c;
}
- if (y->x_dump)tcpserver_hexdump(&inbuf[intail], i);
-
- if (i > 1) outlet_list(y->x_msgout, &s_list, i, y->x_msgoutbuf);
- else outlet_float(y->x_msgout, y->x_msgoutbuf[0].a_w.w_float);
+ if (i > 1) outlet_list(y->x_msgout, &s_list, i, y->x_msgoutbuf);
+ else outlet_float(y->x_msgout, y->x_msgoutbuf[0].a_w.w_float);
- // intail = (indx+1)&(INBUFSIZE-1);
- x->sr_inhead = inhead;
- x->sr_intail = indx;//intail;
- return (1);
+ // intail = (indx+1)&(INBUFSIZE-1);
+ x->sr_inhead = inhead;
+ x->sr_intail = indx;//intail;
+ return (1);
}
static void tcpserver_socketreceiver_read(t_tcpserver_socketreceiver *x, int fd)
{
- int readto = (x->sr_inhead >= x->sr_intail ? INBUFSIZE : x->sr_intail-1);
- int ret, i;
- t_tcpserver *y = x->sr_owner;
+ int readto = (x->sr_inhead >= x->sr_intail ? INBUFSIZE : x->sr_intail-1);
+ int ret, i;
+ t_tcpserver *y = x->sr_owner;
- y->x_sock_fd = fd;
- /* the input buffer might be full. If so, drop the whole thing */
- if (readto == x->sr_inhead)
+ y->x_sock_fd = fd;
+ /* the input buffer might be full. If so, drop the whole thing */
+ if (readto == x->sr_inhead)
{
- post("%s: dropped message", objName);
- x->sr_inhead = x->sr_intail = 0;
- readto = INBUFSIZE;
+ post("%s: dropped message", objName);
+ x->sr_inhead = x->sr_intail = 0;
+ readto = INBUFSIZE;
}
- else
+ else
{
- ret = recv(fd, x->sr_inbuf + x->sr_inhead,
- readto - x->sr_inhead, 0);
- if (ret < 0)
+ ret = recv(fd, x->sr_inbuf + x->sr_inhead,
+ readto - x->sr_inhead, 0);
+ if (ret < 0)
{
- sys_sockerror("tcpserver: recv");
- if (x->sr_notifier) (*x->sr_notifier)(x->sr_owner);
- sys_rmpollfn(fd);
- sys_closesocket(fd);
+ sys_sockerror("tcpserver: recv");
+ if (x->sr_notifier) (*x->sr_notifier)(x->sr_owner);
+ sys_rmpollfn(fd);
+ sys_closesocket(fd);
}
- else if (ret == 0)
+ else if (ret == 0)
{
- post("%s: connection closed on socket %d", objName, fd);
- if (x->sr_notifier) (*x->sr_notifier)(x->sr_owner);
- sys_rmpollfn(fd);
- sys_closesocket(fd);
+ post("%s: connection closed on socket %d", objName, fd);
+ if (x->sr_notifier) (*x->sr_notifier)(x->sr_owner);
+ sys_rmpollfn(fd);
+ sys_closesocket(fd);
}
- else
+ else
{
#ifdef DEBUG
- post ("%s_socketreceiver_read: ret = %d", objName, ret);
+ post ("%s_socketreceiver_read: ret = %d", objName, ret);
#endif
- x->sr_inhead += ret;
- if (x->sr_inhead >= INBUFSIZE) x->sr_inhead = 0;
- /* output client's IP and socket no. */
- for(i = 0; i < y->x_nconnections; i++) /* search for corresponding IP */
+ x->sr_inhead += ret;
+ if (x->sr_inhead >= INBUFSIZE) x->sr_inhead = 0;
+ /* output client's IP and socket no. */
+ for(i = 0; i < y->x_nconnections; i++) /* search for corresponding IP */
{
- if(y->x_sr[i]->sr_fd == y->x_sock_fd)
+ if(y->x_sr[i]->sr_fd == y->x_sock_fd)
{
-// outlet_symbol(x->x_connectionip, x->x_sr[i].sr_host);
- /* find sender's ip address and output it */
- y->x_addrbytes[0].a_w.w_float = (y->x_sr[i]->sr_addr & 0xFF000000)>>24;
- y->x_addrbytes[1].a_w.w_float = (y->x_sr[i]->sr_addr & 0x0FF0000)>>16;
- y->x_addrbytes[2].a_w.w_float = (y->x_sr[i]->sr_addr & 0x0FF00)>>8;
- y->x_addrbytes[3].a_w.w_float = (y->x_sr[i]->sr_addr & 0x0FF);
- outlet_list(y->x_addrout, &s_list, 4L, y->x_addrbytes);
- break;
+ // outlet_symbol(x->x_connectionip, x->x_sr[i].sr_host);
+ /* find sender's ip address and output it */
+ y->x_addrbytes[0].a_w.w_float = (y->x_sr[i]->sr_addr & 0xFF000000)>>24;
+ y->x_addrbytes[1].a_w.w_float = (y->x_sr[i]->sr_addr & 0x0FF0000)>>16;
+ y->x_addrbytes[2].a_w.w_float = (y->x_sr[i]->sr_addr & 0x0FF00)>>8;
+ y->x_addrbytes[3].a_w.w_float = (y->x_sr[i]->sr_addr & 0x0FF);
+ outlet_list(y->x_addrout, &s_list, 4L, y->x_addrbytes);
+ break;
}
}
- outlet_float(y->x_sockout, y->x_sock_fd); /* the socket number */
- tcpserver_socketreceiver_doread(x);
+ outlet_float(y->x_sockout, y->x_sock_fd); /* the socket number */
+ tcpserver_socketreceiver_doread(x);
}
}
}
static void tcpserver_socketreceiver_free(t_tcpserver_socketreceiver *x)
{
- if (x != NULL)
+ if (x != NULL)
{
- free(x->sr_inbuf);
- freebytes(x, sizeof(*x));
+ sender_destroy(x->sr_sender);
+ free(x->sr_inbuf);
+ freebytes(x, sizeof(*x));
}
}
@@ -322,368 +537,142 @@ static void tcpserver_socketreceiver_free(t_tcpserver_socketreceiver *x)
static void tcpserver_send_bytes(int client, t_tcpserver *x, int argc, t_atom *argv)
{
- static char byte_buf[MAX_UDP_RECEIVE];// arbitrary maximum similar to max IP packet size
- int i, j, d;
- unsigned char c;
- float f, e;
- int length;
- size_t flen = 0;
- int sockfd = x->x_sr[client]->sr_fd;
- char fpath[FILENAME_MAX];
- FILE *fptr;
+ if(x && x->x_sr && x->x_sr[client]) {
t_atom output_atom[3];
- t_tcpserver_send_params *ttsp;
- pthread_t sender_thread;
- pthread_attr_t sender_attr;
- int sender_thread_result;
-
- /* process & send data */
- if(sockfd >= 0)
- {
- /* sender thread should start out detached so its resouces will be freed when it is done */
- if (0!= (sender_thread_result = pthread_attr_init(&sender_attr)))
- {
- error("%s: pthread_attr_init failed: %d", objName, sender_thread_result);
- goto failed;
- }
- if(0!= (sender_thread_result = pthread_attr_setdetachstate(&sender_attr, PTHREAD_CREATE_DETACHED)))
- {
- error("%s: pthread_attr_setdetachstate failed: %d", objName, sender_thread_result);
- goto failed;
- }
- for (i = j = 0; i < argc; ++i)
- {
- if (argv[i].a_type == A_FLOAT)
- { /* load floats into buffer as long as they are integers on [0..255]*/
- f = argv[i].a_w.w_float;
- d = (int)f;
- e = f - d;
-#ifdef DEBUG
- post("%s: argv[%d]: float:%f int:%d delta:%f", objName, i, f, d, e);
-#endif
- if (e != 0)
- {
- error("%s: item %d (%f) is not an integer", objName, i, f);
- goto failed;
- }
- if ((d < 0) || (d > 255))
- {
- error("%s: item %d (%f) is not between 0 and 255", objName, i, f);
- goto failed;
- }
- c = (unsigned char)d; /* make sure it doesn't become negative; this only matters for post() */
-#ifdef DEBUG
- post("%s: argv[%d]: %d", objName, i, c);
-#endif
- byte_buf[j++] = c;
- if (j >= MAX_UDP_RECEIVE)
- { /* if the argument list is longer than our buffer, send the buffer whenever it's full */
-#ifdef SIOCOUTQ
- if (tcpserver_send_buffer_avaliable_for_client(x, client) < j)
- {
- error("%s: buffer too small for client(%d)", objName, client);
- goto failed;
- }
-#endif // SIOCOUTQ
- ttsp = (t_tcpserver_send_params *)getbytes(sizeof(t_tcpserver_send_params));
- if (ttsp == NULL)
- {
- error("%s: unable to allocate %d bytes for t_tcpserver_send_params", objName, sizeof(t_tcpserver_send_params));
- goto failed;
- }
- ttsp->client = client;
- ttsp->sockfd = sockfd;
- ttsp->byte_buf = byte_buf;
- ttsp->length = j;
- ttsp->timeout_us = x->x_timeout_us;
- if (0 != (sender_thread_result = pthread_create(&sender_thread, &sender_attr, tcpserver_send_buf_thread, (void *)ttsp)))
- {
- error("%s: couldn't create sender thread (%d)", objName, sender_thread_result);
- goto failed;
- }
- flen += j;
- j = 0;
- }
- }
- else if (argv[i].a_type == A_SYMBOL)
- { /* symbols are interpreted to be file names; attempt to load the file and send it */
+ int size=0;
+
+ t_sender*sender=sender=x->x_sr[client]->sr_sender;
+ int sockfd = x->x_sr[client]->sr_fd;
- atom_string(&argv[i], fpath, FILENAME_MAX);
-#ifdef DEBUG
- post ("%s: fname: %s", objName, fpath);
-#endif
- fptr = fopen(fpath, "rb");
- if (fptr == NULL)
- {
- error("%s: unable to open \"%s\"", objName, fpath);
- goto failed;
- }
- rewind(fptr);
-#ifdef DEBUG
- post("%s: d is %d", objName, d);
-#endif
- while ((d = fgetc(fptr)) != EOF)
- {
- byte_buf[j++] = (char)(d & 0x0FF);
-#ifdef DEBUG
- post("%s: byte_buf[%d] = %d", objName, j-1, byte_buf[j-1]);
-#endif
- if (j >= MAX_UDP_RECEIVE)
- { /* if the file is longer than our buffer, send the buffer whenever it's full */
- /* this might be better than allocating huge amounts of memory */
-#ifdef SIOCOUTQ
- if (tcpserver_send_buffer_avaliable_for_client(x, client) < j)
- {
- error("%s: buffer too small for client(%d)", objName, client);
- goto failed;
- }
-#endif // SIOCOUTQ
- ttsp = (t_tcpserver_send_params *)getbytes(sizeof(t_tcpserver_send_params));
- if (ttsp == NULL)
- {
- error("%s: unable to allocate %d bytes for t_tcpserver_send_params", objName, sizeof(t_tcpserver_send_params));
- goto failed;
- }
- ttsp->client = client;
- ttsp->sockfd = sockfd;
- ttsp->byte_buf = byte_buf;
- ttsp->length = j;
- ttsp->timeout_us = x->x_timeout_us;
- if ( 0!= (sender_thread_result = pthread_create(&sender_thread, &sender_attr, tcpserver_send_buf_thread, (void *)ttsp)))
- {
- error("%s: couldn't create sender thread (%d)", objName, sender_thread_result);
- goto failed;
- }
- flen += j;
- j = 0;
- }
- }
- flen += j;
- fclose(fptr);
- fptr = NULL;
- post("%s: read \"%s\" length %d byte%s", objName, fpath, flen, ((d==1)?"":"s"));
- }
- else
- { /* arg was neither a float nor a valid file name */
- error("%s: item %d is not a float or a file name", objName, i);
- goto failed;
- }
- }
- length = j;
- if (length > 0)
- { /* send whatever remains in our buffer */
-#ifdef SIOCOUTQ
- if (tcpserver_send_buffer_avaliable_for_client(x, client) < length)
- {
- error("%s: buffer too small for client(%d)", objName, client);
- goto failed;
- }
-#endif // SIOCOUTQ
- ttsp = (t_tcpserver_send_params *)getbytes(sizeof(t_tcpserver_send_params));
- if (ttsp == NULL)
- {
- error("%s: unable to allocate %d bytes for t_tcpserver_send_params", objName, sizeof(t_tcpserver_send_params));
- goto failed;
- }
- ttsp->client = client;
- ttsp->sockfd = sockfd;
- ttsp->byte_buf = byte_buf;
- ttsp->length = length;
- ttsp->timeout_us = x->x_timeout_us;
- if ( 0!= (sender_thread_result = pthread_create(&sender_thread, &sender_attr, tcpserver_send_buf_thread, (void *)ttsp)))
- {
- error("%s: couldn't create sender thread (%d)", objName, sender_thread_result);
- goto failed;
- }
- flen += length;
- }
+ if(sender) {
+ t_chunk*chunk=chunk_create(argc, argv);
+ size=sender_send(sender, chunk);
}
- else post("%s: not a valid socket number (%d)", objName, sockfd);
-failed:
+
SETFLOAT(&output_atom[0], client+1);
- SETFLOAT(&output_atom[1], flen);
+ SETFLOAT(&output_atom[1], size);
SETFLOAT(&output_atom[2], sockfd);
outlet_anything( x->x_status_outlet, gensym("sent"), 3, output_atom);
+ }
}
#ifdef SIOCOUTQ
/* SIOCOUTQ exists only(?) on linux, returns remaining space in the socket's output buffer */
static int tcpserver_send_buffer_avaliable_for_client(t_tcpserver *x, int client)
{
- int sockfd = x->x_sr[client].sr_fd;
- int result = 0L;
+ int sockfd = x->x_sr[client].sr_fd;
+ int result = 0L;
- ioctl(sockfd, SIOCOUTQ, &result);
- return result;
+ ioctl(sockfd, SIOCOUTQ, &result);
+ return result;
}
#endif // SIOCOUTQ
-// send a buffer in its own thread
-static void *tcpserver_send_buf_thread(void *arg)
-{
- t_tcpserver_send_params *ttsp = (t_tcpserver_send_params *)arg;
- int result;
-
- result = send(ttsp->sockfd, ttsp->byte_buf, ttsp->length, 0);
- if (result <= 0)
- {
- sys_sockerror("tcpserver: send");
- post("%s_send_buf: could not send data to client %d", objName, ttsp->client+1);
- }
- freebytes (arg, sizeof (t_tcpserver_send_params));
- return NULL;
-}
-
-// send a buffer one byte at a time, no thread
-static size_t tcpserver_send_buf(int client, int sockfd, char *byte_buf, size_t length, t_int timeout_us)
-{
- char *bp;
- size_t sent = 0;
- int result;
- fd_set wfds;
- struct timeval timeout;
-
- for (bp = byte_buf, sent = 0; sent < length;)
- {
- FD_ZERO(&wfds);
- FD_SET(sockfd, &wfds);
- timeout.tv_sec = 0;
- timeout.tv_usec = timeout_us; /* give it a short time to clear buffer */
- result = select(sockfd+1, NULL, &wfds, NULL, &timeout);
- if (result == -1)
- {
- post("%s_send_buf: select returned error %d", objName, errno);
- break;
- }
- if (FD_ISSET(sockfd, &wfds))
- {
- result = send(sockfd, bp, 1, 0);/*(sockfd, bp, (int)(length-sent), 0);*/
- if (result <= 0)
- {
- sys_sockerror("tcpserver: send");
- post("%s_send_buf: could not send data to client %d", objName, client+1);
- break;
- }
- else
- {
- sent += result;
- bp += result;
- }
- }
- else
- {
- post ("%s_send_buf: can't send right now, sent %lu of %lu", objName, sent, length);
- return sent;/* abandon any further attempts to send so we don't block */
- }
- }
- return sent;
-}
-
/* send message to client using socket number */
static void tcpserver_send(t_tcpserver *x, t_symbol *s, int argc, t_atom *argv)
{
- int i, sockfd;
- int client = -1;
+ int i, sockfd;
+ int client = -1;
- if(x->x_nconnections <= 0)
+ if(x->x_nconnections <= 0)
{
- post("%s_send: no clients connected", objName);
- return;
+ post("%s_send: no clients connected", objName);
+ return;
}
- if(argc == 0) /* no socket specified: output state of all sockets */
+ if(argc == 0) /* no socket specified: output state of all sockets */
{
- tcpserver_output_client_state(x, client);
- return;
+ tcpserver_output_client_state(x, client);
+ return;
}
- /* get socket number of connection (first element in list) */
- if(argv[0].a_type == A_FLOAT)
+ /* get socket number of connection (first element in list) */
+ if(argv[0].a_type == A_FLOAT)
{
- sockfd = atom_getfloatarg(0, argc, argv);
- for(i = 0; i < x->x_nconnections; i++) /* check if connection exists */
+ sockfd = atom_getfloatarg(0, argc, argv);
+ for(i = 0; i < x->x_nconnections; i++) /* check if connection exists */
{
- if(x->x_sr[i]->sr_fd == sockfd)
+ if(x->x_sr[i]->sr_fd == sockfd)
{
- client = i; /* the client we're sending to */
- break;
+ client = i; /* the client we're sending to */
+ break;
}
}
- if(client == -1)
+ if(client == -1)
{
- post("%s_send: no connection on socket %d", objName, sockfd);
- return;
+ post("%s_send: no connection on socket %d", objName, sockfd);
+ return;
}
}
- else
+ else
{
- post("%s_send: no socket specified", objName);
- return;
+ post("%s_send: no socket specified", objName);
+ return;
}
- if (argc < 2) /* nothing to send: output state of this socket */
+ if (argc < 2) /* nothing to send: output state of this socket */
{
- tcpserver_output_client_state(x, client+1);
- return;
+ tcpserver_output_client_state(x, client+1);
+ return;
}
- tcpserver_send_bytes(client, x, argc-1, &argv[1]);
+ tcpserver_send_bytes(client, x, argc-1, &argv[1]);
}
/* disconnect the client at x_sock_fd */
static void tcpserver_disconnect(t_tcpserver *x)
{
- int i, fd;
- t_tcpserver_socketreceiver *y;
+ int i, fd;
+ t_tcpserver_socketreceiver *y;
- if (x->x_sock_fd >= 0)
+ if (x->x_sock_fd >= 0)
{
- /* find the socketreceiver for this socket */
- for(i = 0; i < x->x_nconnections; i++)
+ /* find the socketreceiver for this socket */
+ for(i = 0; i < x->x_nconnections; i++)
{
- if(x->x_sr[i]->sr_fd == x->x_sock_fd)
+ if(x->x_sr[i]->sr_fd == x->x_sock_fd)
{
- y = x->x_sr[i];
- fd = y->sr_fd;
- if (y->sr_notifier) (*y->sr_notifier)(x);
- sys_rmpollfn(fd);
- sys_closesocket(fd);
- x->x_sock_fd = -1;
- return;
+ y = x->x_sr[i];
+ fd = y->sr_fd;
+ if (y->sr_notifier) (*y->sr_notifier)(x);
+ sys_rmpollfn(fd);
+ sys_closesocket(fd);
+ x->x_sock_fd = -1;
+ return;
}
}
}
- post("%s__disconnect: no connection on socket %d", objName, x->x_sock_fd);
+ post("%s__disconnect: no connection on socket %d", objName, x->x_sock_fd);
}
/* disconnect a client by socket */
static void tcpserver_socket_disconnect(t_tcpserver *x, t_floatarg fsocket)
{
- int sock = (int)fsocket;
+ int sock = (int)fsocket;
- if(x->x_nconnections <= 0)
+ if(x->x_nconnections <= 0)
{
- post("%s_socket_disconnect: no clients connected", objName);
- return;
+ post("%s_socket_disconnect: no clients connected", objName);
+ return;
}
- x->x_sock_fd = sock;
- tcpserver_disconnect(x);
+ x->x_sock_fd = sock;
+ tcpserver_disconnect(x);
}
/* disconnect a client by number */
static void tcpserver_client_disconnect(t_tcpserver *x, t_floatarg fclient)
{
- int client = (int)fclient;
+ int client = (int)fclient;
- if(x->x_nconnections <= 0)
+ if(x->x_nconnections <= 0)
{
- post("%s_client_disconnect: no clients connected", objName);
- return;
+ post("%s_client_disconnect: no clients connected", objName);
+ return;
}
- if (!((client > 0) && (client < MAX_CONNECT)))
+ if (!((client > 0) && (client < MAX_CONNECT)))
{
- post("%s: client %d out of range [1..%d]", objName, client, MAX_CONNECT);
- return;
+ post("%s: client %d out of range [1..%d]", objName, client, MAX_CONNECT);
+ return;
}
- --client;/* zero based index*/
- x->x_sock_fd = x->x_sr[client]->sr_fd;
- tcpserver_disconnect(x);
+ --client;/* zero based index*/
+ x->x_sock_fd = x->x_sr[client]->sr_fd;
+ tcpserver_disconnect(x);
}
@@ -692,159 +681,113 @@ static void tcpserver_client_disconnect(t_tcpserver *x, t_floatarg fclient)
/* clients start at 1 but our index starts at 0 */
static void tcpserver_client_send(t_tcpserver *x, t_symbol *s, int argc, t_atom *argv)
{
- int client = -1;
+ int client = -1;
- if(x->x_nconnections <= 0)
+ if(x->x_nconnections <= 0)
{
- post("%s_client_send: no clients connected", objName);
- return;
+ post("%s_client_send: no clients connected", objName);
+ return;
}
- if(argc > 0)
+ if(argc > 0)
{
- /* get number of client (first element in list) */
- if(argv[0].a_type == A_FLOAT)
- client = atom_getfloatarg(0, argc, argv);
- else
+ /* get number of client (first element in list) */
+ if(argv[0].a_type == A_FLOAT)
+ client = atom_getfloatarg(0, argc, argv);
+ else
{
- post("%s_client_send: specify client by number", objName);
- return;
+ post("%s_client_send: specify client by number", objName);
+ return;
}
- if (!((client > 0) && (client < MAX_CONNECT)))
+ if (!((client > 0) && (client < MAX_CONNECT)))
{
- post("%s_client_send: client %d out of range [1..%d]", objName, client, MAX_CONNECT);
- return;
+ post("%s_client_send: client %d out of range [1..%d]", objName, client, MAX_CONNECT);
+ return;
}
}
- if (argc > 1)
+ if (argc > 1)
{
- --client;/* zero based index*/
- tcpserver_send_bytes(client, x, argc-1, &argv[1]);
- return;
+ --client;/* zero based index*/
+ tcpserver_send_bytes(client, x, argc-1, &argv[1]);
+ return;
}
- tcpserver_output_client_state(x, client);
+ tcpserver_output_client_state(x, client);
}
static void tcpserver_output_client_state(t_tcpserver *x, int client)
{
- t_atom output_atom[4];
+ t_atom output_atom[4];
- if (client == -1)
+ if (client == -1)
{
- /* output parameters of all connections via status outlet */
- for(client = 0; client < x->x_nconnections; client++)
+ /* output parameters of all connections via status outlet */
+ for(client = 0; client < x->x_nconnections; client++)
{
- x->x_sr[client]->sr_fdbuf = tcpserver_get_socket_send_buf_size(x->x_sr[client]->sr_fd);
- SETFLOAT(&output_atom[0], client+1);
- SETFLOAT(&output_atom[1], x->x_sr[client]->sr_fd);
- output_atom[2].a_type = A_SYMBOL;
- output_atom[2].a_w.w_symbol = x->x_sr[client]->sr_host;
- SETFLOAT(&output_atom[3], x->x_sr[client]->sr_fdbuf);
- outlet_anything( x->x_status_outlet, gensym("client"), 4, output_atom);
+ x->x_sr[client]->sr_fdbuf = tcpserver_get_socket_send_buf_size(x->x_sr[client]->sr_fd);
+ SETFLOAT(&output_atom[0], client+1);
+ SETFLOAT(&output_atom[1], x->x_sr[client]->sr_fd);
+ output_atom[2].a_type = A_SYMBOL;
+ output_atom[2].a_w.w_symbol = x->x_sr[client]->sr_host;
+ SETFLOAT(&output_atom[3], x->x_sr[client]->sr_fdbuf);
+ outlet_anything( x->x_status_outlet, gensym("client"), 4, output_atom);
}
}
- else
+ else
{
- client -= 1;/* zero-based client index conflicts with 1-based user index !!! */
- /* output client parameters via status outlet */
- x->x_sr[client]->sr_fdbuf = tcpserver_get_socket_send_buf_size(x->x_sr[client]->sr_fd);
- SETFLOAT(&output_atom[0], client+1);/* user sees client 0 as 1 */
- SETFLOAT(&output_atom[1], x->x_sr[client]->sr_fd);
- output_atom[2].a_type = A_SYMBOL;
- output_atom[2].a_w.w_symbol = x->x_sr[client]->sr_host;
- SETFLOAT(&output_atom[3], x->x_sr[client]->sr_fdbuf);
- outlet_anything( x->x_status_outlet, gensym("client"), 4, output_atom);
+ client -= 1;/* zero-based client index conflicts with 1-based user index !!! */
+ /* output client parameters via status outlet */
+ x->x_sr[client]->sr_fdbuf = tcpserver_get_socket_send_buf_size(x->x_sr[client]->sr_fd);
+ SETFLOAT(&output_atom[0], client+1);/* user sees client 0 as 1 */
+ SETFLOAT(&output_atom[1], x->x_sr[client]->sr_fd);
+ output_atom[2].a_type = A_SYMBOL;
+ output_atom[2].a_w.w_symbol = x->x_sr[client]->sr_host;
+ SETFLOAT(&output_atom[3], x->x_sr[client]->sr_fdbuf);
+ outlet_anything( x->x_status_outlet, gensym("client"), 4, output_atom);
}
}
/* Return the send buffer size of socket */
static int tcpserver_get_socket_send_buf_size(int sockfd)
{
- int optVal = 0;
- unsigned int optLen = sizeof(int);
+ int optVal = 0;
+ unsigned int optLen = sizeof(int);
#ifdef _WIN32
- if (getsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, (char*)&optVal, &optLen) == SOCKET_ERROR)
- post("%_get_socket_send_buf_size: getsockopt returned %d\n", objName, WSAGetLastError());
+ if (getsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, (char*)&optVal, &optLen) == SOCKET_ERROR)
+ post("%_get_socket_send_buf_size: getsockopt returned %d\n", objName, WSAGetLastError());
#else
- if (getsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, (char*)&optVal, &optLen) == -1)
- post("%_get_socket_send_buf_size: getsockopt returned %d\n", objName, errno);
+ if (getsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, (char*)&optVal, &optLen) == -1)
+ post("%_get_socket_send_buf_size: getsockopt returned %d\n", objName, errno);
#endif
- return optVal;
+ return optVal;
}
/* Set the send buffer size of socket, returns actual size */
-static int tcpserver_set_socket_send_buf_size(int sockfd, int size)
-{
- int optVal = size;
- int optLen = sizeof(int);
#ifdef _WIN32
- if (setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, (char*)&optVal, optLen) == SOCKET_ERROR)
- {
- post("%s_set_socket_send_buf_size: setsockopt returned %d\n", objName, WSAGetLastError());
+#define TCPSERVER_SOCKET_ERROR SOCKET_ERROR
#else
- if (setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, (char*)&optVal, optLen) == -1)
- {
- post("%s_set_socket_send_buf_size: setsockopt returned %d\n", objName, errno);
+#define TCPSERVER_SOCKET_ERROR -1
#endif
- return 0;
- }
- else return (tcpserver_get_socket_send_buf_size(sockfd));
-}
-
-/* Get/set the send buffer of client socket */
-static void tcpserver_buf_size(t_tcpserver *x, t_symbol *s, int argc, t_atom *argv)
+static int tcpserver_set_socket_send_buf_size(int sockfd, int size)
{
- int client = -1;
- float buf_size = 0;
- t_atom output_atom[3];
-
- if(x->x_nconnections <= 0)
- {
- post("%s_buf_size: no clients connected", objName);
- return;
- }
- /* get number of client (first element in list) */
- if (argc > 0)
- {
- if (argv[0].a_type == A_FLOAT)
- client = atom_getfloatarg(0, argc, argv);
- else
- {
- post("%s_buf_size: specify client by number", objName);
- return;
- }
- if (!((client > 0) && (client < MAX_CONNECT)))
- {
- post("%s__buf_size: client %d out of range [1..%d]", objName, client, MAX_CONNECT);
- return;
- }
- }
- if (argc > 1)
+ int optVal = size;
+ int optLen = sizeof(int);
+ if (setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, (char*)&optVal, optLen) == TCPSERVER_SOCKET_ERROR)
{
- if (argv[1].a_type != A_FLOAT)
- {
- post("%s_buf_size: specify buffer size with a float", objName);
- return;
- }
- buf_size = atom_getfloatarg(1, argc, argv);
- --client;/* zero based index*/
- x->x_sr[client]->sr_fdbuf = tcpserver_set_socket_send_buf_size(x->x_sr[client]->sr_fd, (int)buf_size);
- post("%s_buf_size: client %d set to %d", objName, client+1, x->x_sr[client]->sr_fdbuf);
- return;
+ post("%s_set_socket_send_buf_size: setsockopt returned %d\n", objName, WSAGetLastError());
+ return 0;
}
- post("%s_buf_size: specify client and buffer size", objName);
- return;
+ else return (tcpserver_get_socket_send_buf_size(sockfd));
}
/* broadcasts a message to all connected clients */
static void tcpserver_broadcast(t_tcpserver *x, t_symbol *s, int argc, t_atom *argv)
{
- int client;
- /* enumerate through the clients and send each the message */
- for(client = 0; client < x->x_nconnections; client++) /* check if connection exists */
+ int client;
+ /* enumerate through the clients and send each the message */
+ for(client = 0; client < x->x_nconnections; client++) /* check if connection exists */
{
- if(x->x_sr[client]->sr_fd >= 0)
+ if(x->x_sr[client]->sr_fd >= 0)
{ /* socket exists for this client */
- tcpserver_send_bytes(client, x, argc, argv);
+ tcpserver_send_bytes(client, x, argc, argv);
}
}
}
@@ -853,213 +796,202 @@ static void tcpserver_broadcast(t_tcpserver *x, t_symbol *s, int argc, t_atom *a
static void tcpserver_notify(t_tcpserver *x)
{
- int i, k;
+ int i, k;
- /* remove connection from list */
- for(i = 0; i < x->x_nconnections; i++)
+ /* remove connection from list */
+ for(i = 0; i < x->x_nconnections; i++)
{
- if(x->x_sr[i]->sr_fd == x->x_sock_fd)
+ if(x->x_sr[i]->sr_fd == x->x_sock_fd)
{
- x->x_nconnections--;
- post("%s: \"%s\" removed from list of clients", objName, x->x_sr[i]->sr_host->s_name);
- tcpserver_socketreceiver_free(x->x_sr[i]);
- x->x_sr[i] = NULL;
+ x->x_nconnections--;
+ post("%s: \"%s\" removed from list of clients", objName, x->x_sr[i]->sr_host->s_name);
+ tcpserver_socketreceiver_free(x->x_sr[i]);
+ x->x_sr[i] = NULL;
- /* rearrange list now: move entries to close the gap */
- for(k = i; k < x->x_nconnections; k++)
+ /* rearrange list now: move entries to close the gap */
+ for(k = i; k < x->x_nconnections; k++)
{
- x->x_sr[k] = x->x_sr[k + 1];
+ x->x_sr[k] = x->x_sr[k + 1];
}
}
}
- outlet_float(x->x_connectout, x->x_nconnections);
+ outlet_float(x->x_connectout, x->x_nconnections);
}
static void tcpserver_connectpoll(t_tcpserver *x)
{
- struct sockaddr_in incomer_address;
- unsigned int sockaddrl = sizeof( struct sockaddr );
- int fd = accept(x->x_connectsocket, (struct sockaddr*)&incomer_address, &sockaddrl);
- int i;
- int optVal;
- unsigned int optLen = sizeof(int);
-
- if (fd < 0) post("%s: accept failed", objName);
- else
- {
- t_tcpserver_socketreceiver *y = tcpserver_socketreceiver_new((void *)x,
- (t_tcpserver_socketnotifier)tcpserver_notify, NULL);/* MP tcpserver_doit isn't used I think...*/
- if (!y)
+ struct sockaddr_in incomer_address;
+ unsigned int sockaddrl = sizeof( struct sockaddr );
+ int fd = accept(x->x_connectsocket, (struct sockaddr*)&incomer_address, &sockaddrl);
+ int i;
+ int optVal;
+ unsigned int optLen = sizeof(int);
+
+ if (fd < 0) post("%s: accept failed", objName);
+ else
+ {
+ t_tcpserver_socketreceiver *y = tcpserver_socketreceiver_new((void *)x,
+ (t_tcpserver_socketnotifier)tcpserver_notify, NULL);/* MP tcpserver_doit isn't used I think...*/
+ if (!y)
{
#ifdef _WIN32
- closesocket(fd);
+ closesocket(fd);
#else
- close(fd);
+ close(fd);
#endif
- return;
+ return;
}
- sys_addpollfn(fd, (t_fdpollfn)tcpserver_socketreceiver_read, y);
- x->x_nconnections++;
- i = x->x_nconnections - 1;
- x->x_sr[i] = y;
- x->x_sr[i]->sr_host = gensym(inet_ntoa(incomer_address.sin_addr));
- x->x_sr[i]->sr_fd = fd;
- post("%s: accepted connection from %s on socket %d",
- objName, x->x_sr[i]->sr_host->s_name, x->x_sr[i]->sr_fd);
-/* see how big the send buffer is on this socket */
- x->x_sr[i]->sr_fdbuf = 0;
-#ifdef _WIN32
- if (getsockopt(x->x_sr[i]->sr_fd, SOL_SOCKET, SO_SNDBUF, (char*)&optVal, &optLen) != SOCKET_ERROR)
- {
- /* post("%s_connectpoll: send buffer is %ld\n", objName, optVal); */
- x->x_sr[i]->sr_fdbuf = optVal;
- }
- else post("%s_connectpoll: getsockopt returned %d\n", objName, WSAGetLastError());
-#else
- if (getsockopt(x->x_sr[i]->sr_fd, SOL_SOCKET, SO_SNDBUF, (char*)&optVal, &optLen) == 0)
+ y->sr_sender=sender_create(fd);
+
+ sys_addpollfn(fd, (t_fdpollfn)tcpserver_socketreceiver_read, y);
+ x->x_nconnections++;
+ i = x->x_nconnections - 1;
+ x->x_sr[i] = y;
+ x->x_sr[i]->sr_host = gensym(inet_ntoa(incomer_address.sin_addr));
+ x->x_sr[i]->sr_fd = fd;
+ post("%s: accepted connection from %s on socket %d",
+ objName, x->x_sr[i]->sr_host->s_name, x->x_sr[i]->sr_fd);
+ /* see how big the send buffer is on this socket */
+ x->x_sr[i]->sr_fdbuf = 0;
+ if (getsockopt(x->x_sr[i]->sr_fd, SOL_SOCKET, SO_SNDBUF, (char*)&optVal, &optLen) == 0)
{
- /* post("%s_connectpoll: send buffer is %ld\n", objName, optVal); */
- x->x_sr[i]->sr_fdbuf = optVal;
+ x->x_sr[i]->sr_fdbuf = optVal;
}
- else post("%s_connectpoll: getsockopt returned %d\n", objName, errno);
-#endif
- outlet_float(x->x_connectout, x->x_nconnections);
- outlet_float(x->x_sockout, x->x_sr[i]->sr_fd); /* the socket number */
- x->x_sr[i]->sr_addr = ntohl(incomer_address.sin_addr.s_addr);
- x->x_addrbytes[0].a_w.w_float = (x->x_sr[i]->sr_addr & 0xFF000000)>>24;
- x->x_addrbytes[1].a_w.w_float = (x->x_sr[i]->sr_addr & 0x0FF0000)>>16;
- x->x_addrbytes[2].a_w.w_float = (x->x_sr[i]->sr_addr & 0x0FF00)>>8;
- x->x_addrbytes[3].a_w.w_float = (x->x_sr[i]->sr_addr & 0x0FF);
- outlet_list(x->x_addrout, &s_list, 4L, x->x_addrbytes);
+ else post("%s_connectpoll: getsockopt returned %d\n", objName, WSAGetLastError());
+
+ outlet_float(x->x_connectout, x->x_nconnections);
+ outlet_float(x->x_sockout, x->x_sr[i]->sr_fd); /* the socket number */
+ x->x_sr[i]->sr_addr = ntohl(incomer_address.sin_addr.s_addr);
+ x->x_addrbytes[0].a_w.w_float = (x->x_sr[i]->sr_addr & 0xFF000000)>>24;
+ x->x_addrbytes[1].a_w.w_float = (x->x_sr[i]->sr_addr & 0x0FF0000)>>16;
+ x->x_addrbytes[2].a_w.w_float = (x->x_sr[i]->sr_addr & 0x0FF00)>>8;
+ x->x_addrbytes[3].a_w.w_float = (x->x_sr[i]->sr_addr & 0x0FF);
+ outlet_list(x->x_addrout, &s_list, 4L, x->x_addrbytes);
}
}
static void tcpserver_print(t_tcpserver *x)
{
- int i;
+ int i;
- if(x->x_nconnections > 0)
+ if(x->x_nconnections > 0)
{
- post("%s: %d open connections:", objName, x->x_nconnections);
- for(i = 0; i < x->x_nconnections; i++)
+ post("%s: %d open connections:", objName, x->x_nconnections);
+ for(i = 0; i < x->x_nconnections; i++)
{
- post(" \"%s\" on socket %d",
- x->x_sr[i]->sr_host->s_name, x->x_sr[i]->sr_fd);
+ post(" \"%s\" on socket %d",
+ x->x_sr[i]->sr_host->s_name, x->x_sr[i]->sr_fd);
}
}
- else post("%s: no open connections", objName);
+ else post("%s: no open connections", objName);
}
static void *tcpserver_new(t_floatarg fportno)
{
- t_tcpserver *x;
- int i;
- struct sockaddr_in server;
- int sockfd, portno = fportno;
+ t_tcpserver *x;
+ int i;
+ struct sockaddr_in server;
+ int sockfd, portno = fportno;
- /* create a socket */
- sockfd = socket(AF_INET, SOCK_STREAM, 0);
+ /* create a socket */
+ sockfd = socket(AF_INET, SOCK_STREAM, 0);
#ifdef DEBUG
- post("%s: receive socket %d", objName, sockfd);
+ post("%s: receive socket %d", objName, sockfd);
#endif
- if (sockfd < 0)
+ if (sockfd < 0)
{
- sys_sockerror("tcpserver: socket");
- return (0);
+ sys_sockerror("tcpserver: socket");
+ return (0);
}
- server.sin_family = AF_INET;
- server.sin_addr.s_addr = INADDR_ANY;
+ server.sin_family = AF_INET;
+ server.sin_addr.s_addr = INADDR_ANY;
#ifdef IRIX
- /* this seems to work only in IRIX but is unnecessary in
- Linux. Not sure what NT needs in place of this. */
- if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, 0, 0) < 0)
- post("setsockopt failed\n");
+ /* this seems to work only in IRIX but is unnecessary in
+ Linux. Not sure what NT needs in place of this. */
+ if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, 0, 0) < 0)
+ post("setsockopt failed\n");
#endif
- /* assign server port number */
- server.sin_port = htons((u_short)portno);
- /* name the socket */
- if (bind(sockfd, (struct sockaddr *)&server, sizeof(server)) < 0)
+ /* assign server port number */
+ server.sin_port = htons((u_short)portno);
+ /* name the socket */
+ if (bind(sockfd, (struct sockaddr *)&server, sizeof(server)) < 0)
{
- sys_sockerror("tcpserver: bind");
- sys_closesocket(sockfd);
- return (0);
+ sys_sockerror("tcpserver: bind");
+ sys_closesocket(sockfd);
+ return (0);
}
- x = (t_tcpserver *)pd_new(tcpserver_class);
- x->x_msgout = outlet_new(&x->x_obj, &s_anything); /* 1st outlet for received data */
- /* streaming protocol */
- if (listen(sockfd, 5) < 0)
+ x = (t_tcpserver *)pd_new(tcpserver_class);
+ x->x_msgout = outlet_new(&x->x_obj, &s_anything); /* 1st outlet for received data */
+ /* streaming protocol */
+ if (listen(sockfd, 5) < 0)
{
- sys_sockerror("tcpserver: listen");
- sys_closesocket(sockfd);
- sockfd = -1;
+ sys_sockerror("tcpserver: listen");
+ sys_closesocket(sockfd);
+ sockfd = -1;
}
- else
+ else
{
- sys_addpollfn(sockfd, (t_fdpollfn)tcpserver_connectpoll, x);
- x->x_connectout = outlet_new(&x->x_obj, &s_float); /* 2nd outlet for number of connected clients */
- x->x_sockout = outlet_new(&x->x_obj, &s_float); /* 3rd outlet for socket number of current client */
- x->x_addrout = outlet_new(&x->x_obj, &s_list); /* 4th outlet for ip address of current client */
- x->x_status_outlet = outlet_new(&x->x_obj, &s_anything);/* 5th outlet for everything else */
+ sys_addpollfn(sockfd, (t_fdpollfn)tcpserver_connectpoll, x);
+ x->x_connectout = outlet_new(&x->x_obj, &s_float); /* 2nd outlet for number of connected clients */
+ x->x_sockout = outlet_new(&x->x_obj, &s_float); /* 3rd outlet for socket number of current client */
+ x->x_addrout = outlet_new(&x->x_obj, &s_list); /* 4th outlet for ip address of current client */
+ x->x_status_outlet = outlet_new(&x->x_obj, &s_anything);/* 5th outlet for everything else */
}
- x->x_connectsocket = sockfd;
- x->x_nconnections = 0;
- for(i = 0; i < MAX_CONNECT; i++)
+ x->x_connectsocket = sockfd;
+ x->x_nconnections = 0;
+ for(i = 0; i < MAX_CONNECT; i++)
{
- x->x_sr[i] = NULL;
+ x->x_sr[i] = NULL;
}
- /* prepare to convert the bytes in the buffer to floats in a list */
- for (i = 0; i < MAX_UDP_RECEIVE; ++i)
+ /* prepare to convert the bytes in the buffer to floats in a list */
+ for (i = 0; i < MAX_UDP_RECEIVE; ++i)
{
- x->x_msgoutbuf[i].a_type = A_FLOAT;
- x->x_msgoutbuf[i].a_w.w_float = 0;
+ x->x_msgoutbuf[i].a_type = A_FLOAT;
+ x->x_msgoutbuf[i].a_w.w_float = 0;
}
- for (i = 0; i < 4; ++i)
+ for (i = 0; i < 4; ++i)
{
- x->x_addrbytes[i].a_type = A_FLOAT;
- x->x_addrbytes[i].a_w.w_float = 0;
+ x->x_addrbytes[i].a_type = A_FLOAT;
+ x->x_addrbytes[i].a_w.w_float = 0;
}
- x->x_timeout_us = 1000;/* default 1 ms for select call timeout when sending */
- return (x);
+ return (x);
}
static void tcpserver_free(t_tcpserver *x)
{
- int i;
+ int i;
- for(i = 0; i < MAX_CONNECT; i++)
+ for(i = 0; i < MAX_CONNECT; i++)
{
- if (NULL!=x->x_sr[i]) {
- tcpserver_socketreceiver_free(x->x_sr[i]);
- if (x->x_sr[i]->sr_fd >= 0)
- {
- sys_rmpollfn(x->x_sr[i]->sr_fd);
- sys_closesocket(x->x_sr[i]->sr_fd);
- }
- }
+ if (NULL!=x->x_sr[i]) {
+ tcpserver_socketreceiver_free(x->x_sr[i]);
+ if (x->x_sr[i]->sr_fd >= 0)
+ {
+ sys_rmpollfn(x->x_sr[i]->sr_fd);
+ sys_closesocket(x->x_sr[i]->sr_fd);
+ }
+ }
}
- if (x->x_connectsocket >= 0)
+ if (x->x_connectsocket >= 0)
{
- sys_rmpollfn(x->x_connectsocket);
- sys_closesocket(x->x_connectsocket);
+ sys_rmpollfn(x->x_connectsocket);
+ sys_closesocket(x->x_connectsocket);
}
}
void tcpserver_setup(void)
{
- tcpserver_class = class_new(gensym(objName),(t_newmethod)tcpserver_new, (t_method)tcpserver_free,
- sizeof(t_tcpserver), 0, A_DEFFLOAT, 0);
- class_addmethod(tcpserver_class, (t_method)tcpserver_print, gensym("print"), 0);
- class_addmethod(tcpserver_class, (t_method)tcpserver_send, gensym("send"), A_GIMME, 0);
- class_addmethod(tcpserver_class, (t_method)tcpserver_client_send, gensym("client"), A_GIMME, 0);
- class_addmethod(tcpserver_class, (t_method)tcpserver_buf_size, gensym("clientbuf"), A_GIMME, 0);
- class_addmethod(tcpserver_class, (t_method)tcpserver_client_disconnect, gensym("disconnectclient"), A_DEFFLOAT, 0);
- class_addmethod(tcpserver_class, (t_method)tcpserver_socket_disconnect, gensym("disconnectsocket"), A_DEFFLOAT, 0);
- class_addmethod(tcpserver_class, (t_method)tcpserver_dump, gensym("dump"), A_FLOAT, 0);
- class_addmethod(tcpserver_class, (t_method)tcpserver_broadcast, gensym("broadcast"), A_GIMME, 0);
- class_addmethod(tcpserver_class, (t_method)tcpserver_timeout, gensym("timeout"), A_FLOAT, 0);
- class_addlist(tcpserver_class, (t_method)tcpserver_send);
+ tcpserver_class = class_new(gensym(objName),(t_newmethod)tcpserver_new, (t_method)tcpserver_free,
+ sizeof(t_tcpserver), 0, A_DEFFLOAT, 0);
+ class_addmethod(tcpserver_class, (t_method)tcpserver_print, gensym("print"), 0);
+ class_addmethod(tcpserver_class, (t_method)tcpserver_send, gensym("send"), A_GIMME, 0);
+ class_addmethod(tcpserver_class, (t_method)tcpserver_client_send, gensym("client"), A_GIMME, 0);
+ class_addmethod(tcpserver_class, (t_method)tcpserver_client_disconnect, gensym("disconnectclient"), A_DEFFLOAT, 0);
+ class_addmethod(tcpserver_class, (t_method)tcpserver_socket_disconnect, gensym("disconnectsocket"), A_DEFFLOAT, 0);
+ class_addmethod(tcpserver_class, (t_method)tcpserver_broadcast, gensym("broadcast"), A_GIMME, 0);
+ class_addlist(tcpserver_class, (t_method)tcpserver_send);
}
/* end of tcpserver.c */