From 35e89ad3034e7924ce39a0a5c0dffc6d1920bbea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?IOhannes=20m=20zm=C3=B6lnig?= Date: Wed, 24 Mar 2010 17:20:21 +0000 Subject: automatic calling svn path=/trunk/externals/iem/iemnet/; revision=13259 --- Makefile | 2 +- iemnet.c | 629 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- iemnet.h | 56 +++++- shared.c | 596 -------------------------------------------------------- tcpclient.c | 8 + tcpserver.c | 8 +- 6 files changed, 686 insertions(+), 613 deletions(-) delete mode 100644 shared.c diff --git a/Makefile b/Makefile index ce9b4e5..26e52b3 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ SOURCES_linux = SOURCES_windows = # .c source files that will be statically linked to _all_ objects -HELPERSOURCES = shared.c +HELPERSOURCES = iemnet.c # list all pd objects (i.e. myobject.pd) files here, and their helpfiles will diff --git a/iemnet.c b/iemnet.c index c58c54d..2f87248 100644 --- a/iemnet.c +++ b/iemnet.c @@ -1,12 +1,617 @@ -#include "iemnet.h" - -void tcpclient_setup(void); -void tcpserver_setup(void); - - -IEMNET_EXTERN void iemnet_setup(void) { -tcpserver_setup(); -tcpclient_setup(); - - -} \ No newline at end of file +/* iemnet + * copyright (c) 2010 IOhannes m zmölnig, IEM + */ + +#include "iemnet.h" + +#include "s_stuff.h" + +#include +#include +#include + +#include + +#ifdef _WIN32 +# include +# include /* for socklen_t */ +#else +# include +#endif + +#include + + +//#define INBUFSIZE 4096L /* was 4096: size of receiving data buffer */ +#define INBUFSIZE 65536L /* was 4096: size of receiving data buffer */ + + +/* data handling */ + +typedef struct _iemnet_floatlist { + t_atom*argv; + size_t argc; + + size_t size; // real size (might be bigger than argc) +} t_iemnet_floatlist; + +t_iemnet_floatlist*iemnet__floatlist_init(t_iemnet_floatlist*cl) { + unsigned int i; + if(NULL==cl)return NULL; + for(i=0; isize; i++) + SETFLOAT((cl->argv+i), 0.f); + + return cl; +} + +void iemnet__floatlist_destroy(t_iemnet_floatlist*cl) { + if(NULL==cl)return; + if(cl->argv) freebytes(cl->argv, sizeof(t_atom)*cl->size); + cl->argv=NULL; + cl->argc=0; + cl->size=0; + + freebytes(cl, sizeof(t_iemnet_floatlist)); +} + +t_iemnet_floatlist*iemnet__floatlist_create(unsigned int size) { + t_iemnet_floatlist*result=(t_iemnet_floatlist*)getbytes(sizeof(t_iemnet_floatlist)); + if(NULL==result)return NULL; + + result->argv = (t_atom*)getbytes(size*sizeof(t_atom)); + if(NULL==result->argv) { + iemnet__floatlist_destroy(result); + return NULL; + } + + result->argc=size; + result->size=size; + + result=iemnet__floatlist_init(result); + + return result; +} + +t_iemnet_floatlist*iemnet__floatlist_resize(t_iemnet_floatlist*cl, unsigned int size) { + t_atom*tmp; + if(size<=cl->size) { + cl->argc=size; + return cl; + } + + tmp=(t_atom*)getbytes(size*sizeof(t_atom)); + if(NULL==tmp) return NULL; + + freebytes(cl->argv, sizeof(t_atom)*cl->size); + + cl->argv=tmp; + cl->argc=cl->size=size; + + cl=iemnet__floatlist_init(cl); + + return cl; +} + + + +void iemnet__chunk_destroy(t_iemnet_chunk*c) { + if(NULL==c)return; + + if(c->data)freebytes(c->data, c->size*sizeof(unsigned char)); + + c->data=NULL; + c->size=0; + + freebytes(c, sizeof(t_iemnet_chunk)); +} + +t_iemnet_chunk* iemnet__chunk_create_empty(int size) { + t_iemnet_chunk*result=(t_iemnet_chunk*)getbytes(sizeof(t_iemnet_chunk)); + if(result) { + result->size=size; + result->data=(unsigned char*)getbytes(sizeof(unsigned char)*size); + + if(NULL == result->data) { + result->size=0; + iemnet__chunk_destroy(result); + return NULL; + } + + memset(result->data, 0, result->size); + } + return result; +} + +t_iemnet_chunk* iemnet__chunk_create_data(int size, unsigned char*data) { + t_iemnet_chunk*result=(t_iemnet_chunk*)getbytes(sizeof(t_iemnet_chunk)); + if(result) { + result->size=size; + result->data=(unsigned char*)getbytes(sizeof(unsigned char)*size); + + if(NULL == result->data) { + result->size=0; + iemnet__chunk_destroy(result); + return NULL; + } + + memcpy(result->data, data, result->size); + } + return result; +} + + +t_iemnet_chunk* iemnet__chunk_create_list(int argc, t_atom*argv) { + t_iemnet_chunk*result=(t_iemnet_chunk*)getbytes(sizeof(t_iemnet_chunk)); + int i; + if(NULL==result)return NULL; + + result->size=argc; + result->data=(unsigned char*)getbytes(sizeof(unsigned char)*argc); + + if(NULL == result->data) { + result->size=0; + iemnet__chunk_destroy(result); + return NULL; + } + + for(i=0; idata[i]=c; + argv++; + } + + return result; +} + +t_iemnet_chunk*iemnet__chunk_create_chunk(t_iemnet_chunk*c) { + t_iemnet_chunk*result=NULL; + if(NULL==c)return NULL; + + result=(t_iemnet_chunk*)getbytes(sizeof(t_iemnet_chunk)); + + result->size=c->size; + result->data=(unsigned char*)getbytes(sizeof(unsigned char)*(result->size)); + if(NULL == result->data) { + result->size=0; + iemnet__chunk_destroy(result); + return NULL; + } + + memcpy(result->data, c->data, result->size); + + return result; +} + + +t_iemnet_floatlist*iemnet__chunk2list(t_iemnet_chunk*c, t_iemnet_floatlist*dest) { + unsigned int i; + if(NULL==c)return NULL; + dest=iemnet__floatlist_resize(dest, c->size); + if(NULL==dest)return NULL; + + for(i=0; isize; i++) { + dest->argv[i].a_w.w_float = c->data[i]; + } + + return dest; +} + + +/* queue handling */ + +/* + * using code found at http://newsgroups.derkeiler.com/Archive/Comp/comp.programming.threads/2008-02/msg00502.html + */ + + + +typedef struct _node { + struct _node* next; + t_iemnet_chunk*data; +} t_node; + +typedef struct _queue { + t_node* head; /* = 0 */ + t_node* tail; /* = 0 */ + pthread_mutex_t mtx; + pthread_cond_t cond; + + int done; // in cleanup state + int size; +} t_queue; + + +int queue_push( + t_queue* const _this, + t_iemnet_chunk* const data + ) { + t_node* tail; + t_node* n=NULL; + int size=_this->size; + + if(NULL == data) return size; + //fprintf(stderr, "pushing %d bytes\n", data->size); + + n=(t_node*)getbytes(sizeof(t_node)); + + n->next = 0; + n->data = data; + pthread_mutex_lock(&_this->mtx); + if (! (tail = _this->tail)) { + _this->head = n; + } else { + tail->next = n; + } + _this->tail = n; + + _this->size+=data->size; + size=_this->size; + + + //fprintf(stderr, "pushed %d bytes\n", data->size); + + pthread_mutex_unlock(&_this->mtx); + pthread_cond_signal(&_this->cond); + + return size; +} + +t_iemnet_chunk* queue_pop_block( + t_queue* const _this + ) { + t_node* head=0; + t_iemnet_chunk*data=0; + pthread_mutex_lock(&_this->mtx); + while (! (head = _this->head)) { + if(_this->done) { + pthread_mutex_unlock(&_this->mtx); + return NULL; + } + else { + pthread_cond_wait(&_this->cond, &_this->mtx); + } + } + + if (! (_this->head = head->next)) { + _this->tail = 0; + } + if(head && head->data) { + _this->size-=head->data->size; + } + + pthread_mutex_unlock(&_this->mtx); + if(head) { + data=head->data; + freebytes(head, sizeof(t_node)); + head=NULL; + } + return data; +} + +t_iemnet_chunk* queue_pop_noblock( + t_queue* const _this + ) { + t_node* head=0; + t_iemnet_chunk*data=0; + pthread_mutex_lock(&_this->mtx); + if (! (head = _this->head)) { + // empty head + pthread_mutex_unlock(&_this->mtx); + return NULL; + } + if (! (_this->head = head->next)) { + _this->tail = 0; + } + if(head && head->data) { + _this->size-=head->data->size; + } + + pthread_mutex_unlock(&_this->mtx); + if(head) { + data=head->data; + freebytes(head, sizeof(t_node)); + head=NULL; + } + return data; +} + +t_iemnet_chunk* queue_pop(t_queue* const _this) { + return queue_pop_block(_this); +} + + +void queue_finish(t_queue* q) { + if(NULL==q) + return; + q->done=1; + pthread_cond_signal(&q->cond); +} + +void queue_destroy(t_queue* q) { + t_iemnet_chunk*c=NULL; + if(NULL==q) + return; + + queue_finish(q); + + /* remove all the chunks from the queue */ + while(NULL!=(c=queue_pop_noblock(q))) { + iemnet__chunk_destroy(c); + } + + q->head=NULL; + q->tail=NULL; + + pthread_mutex_destroy(&q->mtx); + pthread_cond_destroy(&q->cond); + + freebytes(q, sizeof(t_queue)); + q=NULL; +} + +t_queue* queue_create(void) { + static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; + static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; + + t_queue*q=(t_queue*)getbytes(sizeof(t_queue)); + if(NULL==q)return NULL; + + q->head = NULL; + q->tail = NULL; + + memcpy(&q->cond, &cond, sizeof(pthread_cond_t)); + memcpy(&q->mtx , &mtx, sizeof(pthread_mutex_t)); + + q->done = 0; + q->size = 0; + + return q; +} + + + + + /* draft: + * - there is a sender thread for each open connection + * - the main thread just adds chunks to each sender threads processing queue + * - the sender thread tries to send the queue as fast as possible + */ + +struct _iemnet_sender { + pthread_t thread; + + int sockfd; /* owned outside; must call iemnet__sender_destroy() before freeing socket yourself */ + t_queue*queue; + int cont; // indicates whether we want to thread to continue or to terminate +}; + +/* the workhorse of the family */ + +static int iemnet__sender_dosend(int sockfd, t_queue*q) { + t_iemnet_chunk*c=queue_pop(q); + if(c) { + unsigned char*data=c->data; + unsigned int size=c->size; + int result=-1; + // fprintf(stderr, "sending %d bytes at %x to %d\n", size, data, sockfd); + + result = send(sockfd, data, size, 0); + // shouldn't we do something with the result here? + iemnet__chunk_destroy(c); + } else { + return 0; + } + return 1; +} + +static void*iemnet__sender_sendthread(void*arg) { + t_iemnet_sender*sender=(t_iemnet_sender*)arg; + + int sockfd=sender->sockfd; + t_queue*q=sender->queue; + + while(sender->cont) { + if(!iemnet__sender_dosend(sockfd, q))break; + } + //fprintf(stderr, "write thread terminated\n"); + return NULL; +} + +int iemnet__sender_send(t_iemnet_sender*s, t_iemnet_chunk*c) { + t_queue*q=s->queue; + int size=0; + if(q) { + t_iemnet_chunk*chunk=iemnet__chunk_create_chunk(c); + size = queue_push(q, chunk); + } + return size; +} + +void iemnet__sender_destroy(t_iemnet_sender*s) { + s->cont=0; + queue_finish(s->queue); + s->sockfd = -1; + pthread_join(s->thread, NULL); + queue_destroy(s->queue); + + freebytes(s, sizeof(t_iemnet_sender)); + s=NULL; +} +t_iemnet_sender*iemnet__sender_create(int sock) { + t_iemnet_sender*result=(t_iemnet_sender*)getbytes(sizeof(t_iemnet_sender)); + int res=0; + + if(NULL==result)return NULL; + + result->queue = queue_create(); + result->sockfd = sock; + result->cont =1; + + res=pthread_create(&result->thread, 0, iemnet__sender_sendthread, result); + + if(0==res) { + + } else { + // something went wrong + } + + return result; +} + +int iemnet__sender_getlasterror(t_iemnet_sender*x) { + x=NULL; +#ifdef _WIN32 + return WSAGetLastError(); +#endif + return errno; +} + + +int iemnet__sender_getsockopt(t_iemnet_sender*s, int level, int optname, void *optval, socklen_t*optlen) { + int result=getsockopt(s->sockfd, level, optname, optval, optlen); + if(result!=0) { + post("%s: getsockopt returned %d", __FUNCTION__, iemnet__sender_getlasterror(s)); + } + return result; +} +int iemnet__sender_setsockopt(t_iemnet_sender*s, int level, int optname, const void*optval, socklen_t optlen) { + int result=setsockopt(s->sockfd, level, optname, optval, optlen); + if(result!=0) { + post("%s: setsockopt returned %d", __FUNCTION__, iemnet__sender_getlasterror(s)); + } + return result; + +} + + + +struct _iemnet_receiver { + pthread_t thread; + int sockfd; /* owned outside; you must call iemnet__receiver_destroy() before freeing socket yourself */ + void*owner; + t_iemnet_chunk*data; + t_iemnet_receivecallback callback; + t_queue*queue; + int running; + t_clock *clock; + t_iemnet_floatlist*flist; +}; + + +/* the workhorse of the family */ +static void*iemnet__receiver_readthread(void*arg) { + int result = 0; + t_iemnet_receiver*receiver=(t_iemnet_receiver*)arg; + + int sockfd=receiver->sockfd; + t_queue*q=receiver->queue; + + unsigned char data[INBUFSIZE]; + unsigned int size=INBUFSIZE; + + unsigned int i=0; + for(i=0; irunning=1; + while(1) { + t_iemnet_chunk*c=NULL; + //fprintf(stderr, "reading %d bytes...\n", size); + result = recv(sockfd, data, size, 0); + //fprintf(stderr, "read %d bytes...\n", result); + + if(result<=0)break; + c= iemnet__chunk_create_data(result, data); + + queue_push(q, c); + + if(receiver->clock)clock_delay(receiver->clock, 0); + } + + + if(result>=0) + if(receiver->clock)clock_delay(receiver->clock, 0); + + receiver->running=0; + + //fprintf(stderr, "read thread terminated\n"); + return NULL; +} + +static void iemnet__receiver_tick(t_iemnet_receiver *x) +{ + // received data + t_iemnet_chunk*c=queue_pop_noblock(x->queue); + while(NULL!=c) { + x->flist = iemnet__chunk2list(c, x->flist); + (x->callback)(x->owner, x->sockfd, x->flist->argc, x->flist->argv); + iemnet__chunk_destroy(c); + c=queue_pop_noblock(x->queue); + } + if(!x->running) { + // read terminated + x->callback(x->owner, x->sockfd, 0, NULL); + } +} + + +t_iemnet_receiver*iemnet__receiver_create(int sock, void*owner, t_iemnet_receivecallback callback) { + t_iemnet_receiver*rec=(t_iemnet_receiver*)getbytes(sizeof(t_iemnet_receiver)); + //fprintf(stderr, "new receiver for %d\t%x\t%x\n", sock, owner, callback); + if(rec) { + t_iemnet_chunk*data=iemnet__chunk_create_empty(INBUFSIZE); + int res=0; + if(NULL==data) { + iemnet__receiver_destroy(rec); + return NULL; + } + rec->sockfd=sock; + rec->owner=owner; + rec->data=data; + rec->callback=callback; + rec->flist=iemnet__floatlist_create(1024); + + rec->queue = queue_create(); + rec->clock = clock_new(rec, (t_method)iemnet__receiver_tick); + rec->running=1; + res=pthread_create(&rec->thread, 0, iemnet__receiver_readthread, rec); + } + //fprintf(stderr, "new receiver created\n"); + + return rec; +} +void iemnet__receiver_destroy(t_iemnet_receiver*rec) { + if(NULL==rec)return; + if(rec->data)iemnet__chunk_destroy(rec->data); + if(rec->flist)iemnet__floatlist_destroy(rec->flist); + clock_free(rec->clock); + rec->clock=NULL; + + shutdown(rec->sockfd, 2); /* needed on linux, since the recv won't shutdown on sys_closesocket() alone */ + sys_closesocket(rec->sockfd); + + rec->sockfd=0; + pthread_join(rec->thread, NULL); + rec->owner=NULL; + rec->data=NULL; + rec->callback=NULL; + rec->clock=NULL; + rec->flist=NULL; + + freebytes(rec, sizeof(t_iemnet_receiver)); + rec=NULL; +} + + + + + + + + + + + + +IEMNET_EXTERN void iemnet_setup(void) { + + +} diff --git a/iemnet.h b/iemnet.h index 6bc7190..58d98bb 100644 --- a/iemnet.h +++ b/iemnet.h @@ -1,11 +1,43 @@ +/* *********************************************+ + * iemnet + * networking for Pd + * + * (c) 2010 IOhannes m zmölnig + * Institute of Electronic Music and Acoustics (IEM) + * University of Music and Dramatic Arts (KUG), Graz, Austria + * + * based on net/ library by Martin Peach + * based on maxlib by Olaf Matthes + */ + +/* ---------------------------------------------------------------------------- */ + +/* This program is free software; you can redistribute it and/or */ +/* modify it under the terms of the GNU General Public License */ +/* as published by the Free Software Foundation; either version 2 */ +/* of the License, or (at your option) any later version. */ +/* */ +/* This program is distributed in the hope that it will be useful, */ +/* but WITHOUT ANY WARRANTY; without even the implied warranty of */ +/* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */ +/* GNU General Public License for more details. */ +/* */ +/* You should have received a copy of the GNU General Public License */ +/* along with this program; if not, write to the Free Software */ +/* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */ +/* */ + +/* ---------------------------------------------------------------------------- */ + +#ifndef INCLUDE_IEMNET_H_ +#define INCLUDE_IEMNET_H_ + #include "m_pd.h" #ifdef _WIN32 -# define IEMNET_EXTERN __declspec(dllexport) extern # include # include #else -# define IEMNET_EXTERN extern # include #endif @@ -46,3 +78,23 @@ typedef void (*t_iemnet_receivecallback)(void*x, int sockfd, int argc, t_atom*ar */ t_iemnet_receiver*iemnet__receiver_create(int sock, void*owner, t_iemnet_receivecallback callback); void iemnet__receiver_destroy(t_iemnet_receiver*); + + +#if defined(_MSC_VER) +# define IEMNET_EXTERN __declspec(dllexport) extern +# define CCALL __cdecl +# pragma section(".CRT$XCU",read) +# define IEMNET_INITIALIZER(f) \ + static void __cdecl f(void); \ + __declspec(allocate(".CRT$XCU")) void (__cdecl*f##_)(void) = f; \ + static void __cdecl f(void) +#elif defined(__GNUC__) +# define IEMNET_EXTERN extern +# define CCALL +# define IEMNET_INITIALIZER(f) \ + static void autoinit ## f(void) __attribute__((constructor)); \ + static void autoinit ## f(void) { f(); } +#endif + + +#endif /* INCLUDE_IEMNET_H_ */ diff --git a/shared.c b/shared.c deleted file mode 100644 index 8ed2111..0000000 --- a/shared.c +++ /dev/null @@ -1,596 +0,0 @@ -/* queue.c - * copyright (c) 2010 IOhannes m zmölnig, IEM - * using code found at http://newsgroups.derkeiler.com/Archive/Comp/comp.programming.threads/2008-02/msg00502.html - */ - - -#include "iemnet.h" - -#include "s_stuff.h" - -#include -#include -#include - -#include - -#ifdef _WIN32 -# include -# include /* for socklen_t */ -#else -# include -#endif - -#include - - -//#define INBUFSIZE 4096L /* was 4096: size of receiving data buffer */ -#define INBUFSIZE 65536L /* was 4096: size of receiving data buffer */ - - -/* data handling */ - -typedef struct _iemnet_floatlist { - t_atom*argv; - size_t argc; - - size_t size; // real size (might be bigger than argc) -} t_iemnet_floatlist; - -t_iemnet_floatlist*iemnet__floatlist_init(t_iemnet_floatlist*cl) { - unsigned int i; - if(NULL==cl)return NULL; - for(i=0; isize; i++) - SETFLOAT((cl->argv+i), 0.f); - - return cl; -} - -void iemnet__floatlist_destroy(t_iemnet_floatlist*cl) { - if(NULL==cl)return; - if(cl->argv) freebytes(cl->argv, sizeof(t_atom)*cl->size); - cl->argv=NULL; - cl->argc=0; - cl->size=0; - - freebytes(cl, sizeof(t_iemnet_floatlist)); -} - -t_iemnet_floatlist*iemnet__floatlist_create(unsigned int size) { - t_iemnet_floatlist*result=(t_iemnet_floatlist*)getbytes(sizeof(t_iemnet_floatlist)); - if(NULL==result)return NULL; - - result->argv = (t_atom*)getbytes(size*sizeof(t_atom)); - if(NULL==result->argv) { - iemnet__floatlist_destroy(result); - return NULL; - } - - result->argc=size; - result->size=size; - - result=iemnet__floatlist_init(result); - - return result; -} - -t_iemnet_floatlist*iemnet__floatlist_resize(t_iemnet_floatlist*cl, unsigned int size) { - t_atom*tmp; - if(size<=cl->size) { - cl->argc=size; - return cl; - } - - tmp=(t_atom*)getbytes(size*sizeof(t_atom)); - if(NULL==tmp) return NULL; - - freebytes(cl->argv, sizeof(t_atom)*cl->size); - - cl->argv=tmp; - cl->argc=cl->size=size; - - cl=iemnet__floatlist_init(cl); - - return cl; -} - - - -void iemnet__chunk_destroy(t_iemnet_chunk*c) { - if(NULL==c)return; - - if(c->data)freebytes(c->data, c->size*sizeof(unsigned char)); - - c->data=NULL; - c->size=0; - - freebytes(c, sizeof(t_iemnet_chunk)); -} - -t_iemnet_chunk* iemnet__chunk_create_empty(int size) { - t_iemnet_chunk*result=(t_iemnet_chunk*)getbytes(sizeof(t_iemnet_chunk)); - if(result) { - result->size=size; - result->data=(unsigned char*)getbytes(sizeof(unsigned char)*size); - - if(NULL == result->data) { - result->size=0; - iemnet__chunk_destroy(result); - return NULL; - } - - memset(result->data, 0, result->size); - } - return result; -} - -t_iemnet_chunk* iemnet__chunk_create_data(int size, unsigned char*data) { - t_iemnet_chunk*result=(t_iemnet_chunk*)getbytes(sizeof(t_iemnet_chunk)); - if(result) { - result->size=size; - result->data=(unsigned char*)getbytes(sizeof(unsigned char)*size); - - if(NULL == result->data) { - result->size=0; - iemnet__chunk_destroy(result); - return NULL; - } - - memcpy(result->data, data, result->size); - } - return result; -} - - -t_iemnet_chunk* iemnet__chunk_create_list(int argc, t_atom*argv) { - t_iemnet_chunk*result=(t_iemnet_chunk*)getbytes(sizeof(t_iemnet_chunk)); - int i; - if(NULL==result)return NULL; - - result->size=argc; - result->data=(unsigned char*)getbytes(sizeof(unsigned char)*argc); - - if(NULL == result->data) { - result->size=0; - iemnet__chunk_destroy(result); - return NULL; - } - - for(i=0; idata[i]=c; - argv++; - } - - return result; -} - -t_iemnet_chunk*iemnet__chunk_create_chunk(t_iemnet_chunk*c) { - t_iemnet_chunk*result=NULL; - if(NULL==c)return NULL; - - result=(t_iemnet_chunk*)getbytes(sizeof(t_iemnet_chunk)); - - result->size=c->size; - result->data=(unsigned char*)getbytes(sizeof(unsigned char)*(result->size)); - if(NULL == result->data) { - result->size=0; - iemnet__chunk_destroy(result); - return NULL; - } - - memcpy(result->data, c->data, result->size); - - return result; -} - - -t_iemnet_floatlist*iemnet__chunk2list(t_iemnet_chunk*c, t_iemnet_floatlist*dest) { - unsigned int i; - if(NULL==c)return NULL; - dest=iemnet__floatlist_resize(dest, c->size); - if(NULL==dest)return NULL; - - for(i=0; isize; i++) { - dest->argv[i].a_w.w_float = c->data[i]; - } - - return dest; -} - - -/* queue handling */ -typedef struct _node { - struct _node* next; - t_iemnet_chunk*data; -} t_node; - -typedef struct _queue { - t_node* head; /* = 0 */ - t_node* tail; /* = 0 */ - pthread_mutex_t mtx; - pthread_cond_t cond; - - int done; // in cleanup state - int size; -} t_queue; - - -int queue_push( - t_queue* const _this, - t_iemnet_chunk* const data - ) { - t_node* tail; - t_node* n=NULL; - int size=_this->size; - - if(NULL == data) return size; - //fprintf(stderr, "pushing %d bytes\n", data->size); - - n=(t_node*)getbytes(sizeof(t_node)); - - n->next = 0; - n->data = data; - pthread_mutex_lock(&_this->mtx); - if (! (tail = _this->tail)) { - _this->head = n; - } else { - tail->next = n; - } - _this->tail = n; - - _this->size+=data->size; - size=_this->size; - - - //fprintf(stderr, "pushed %d bytes\n", data->size); - - pthread_mutex_unlock(&_this->mtx); - pthread_cond_signal(&_this->cond); - - return size; -} - -t_iemnet_chunk* queue_pop_block( - t_queue* const _this - ) { - t_node* head=0; - t_iemnet_chunk*data=0; - pthread_mutex_lock(&_this->mtx); - while (! (head = _this->head)) { - if(_this->done) { - pthread_mutex_unlock(&_this->mtx); - return NULL; - } - else { - pthread_cond_wait(&_this->cond, &_this->mtx); - } - } - - if (! (_this->head = head->next)) { - _this->tail = 0; - } - if(head && head->data) { - _this->size-=head->data->size; - } - - pthread_mutex_unlock(&_this->mtx); - if(head) { - data=head->data; - freebytes(head, sizeof(t_node)); - head=NULL; - } - return data; -} - -t_iemnet_chunk* queue_pop_noblock( - t_queue* const _this - ) { - t_node* head=0; - t_iemnet_chunk*data=0; - pthread_mutex_lock(&_this->mtx); - if (! (head = _this->head)) { - // empty head - pthread_mutex_unlock(&_this->mtx); - return NULL; - } - if (! (_this->head = head->next)) { - _this->tail = 0; - } - if(head && head->data) { - _this->size-=head->data->size; - } - - pthread_mutex_unlock(&_this->mtx); - if(head) { - data=head->data; - freebytes(head, sizeof(t_node)); - head=NULL; - } - return data; -} - -t_iemnet_chunk* queue_pop(t_queue* const _this) { - return queue_pop_block(_this); -} - - -void queue_finish(t_queue* q) { - if(NULL==q) - return; - q->done=1; - pthread_cond_signal(&q->cond); -} - -void queue_destroy(t_queue* q) { - t_iemnet_chunk*c=NULL; - if(NULL==q) - return; - - queue_finish(q); - - /* remove all the chunks from the queue */ - while(NULL!=(c=queue_pop_noblock(q))) { - iemnet__chunk_destroy(c); - } - - q->head=NULL; - q->tail=NULL; - - pthread_mutex_destroy(&q->mtx); - pthread_cond_destroy(&q->cond); - - freebytes(q, sizeof(t_queue)); - q=NULL; -} - -t_queue* queue_create(void) { - static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; - static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; - - t_queue*q=(t_queue*)getbytes(sizeof(t_queue)); - if(NULL==q)return NULL; - - q->head = NULL; - q->tail = NULL; - - memcpy(&q->cond, &cond, sizeof(pthread_cond_t)); - memcpy(&q->mtx , &mtx, sizeof(pthread_mutex_t)); - - q->done = 0; - q->size = 0; - - return q; -} - - - - - /* draft: - * - there is a sender thread for each open connection - * - the main thread just adds chunks to each sender threads processing queue - * - the sender thread tries to send the queue as fast as possible - */ - -struct _iemnet_sender { - pthread_t thread; - - int sockfd; /* owned outside; must call iemnet__sender_destroy() before freeing socket yourself */ - t_queue*queue; - int cont; // indicates whether we want to thread to continue or to terminate -}; - -/* the workhorse of the family */ - -static int iemnet__sender_dosend(int sockfd, t_queue*q) { - t_iemnet_chunk*c=queue_pop(q); - if(c) { - unsigned char*data=c->data; - unsigned int size=c->size; - int result=-1; - // fprintf(stderr, "sending %d bytes at %x to %d\n", size, data, sockfd); - - result = send(sockfd, data, size, 0); - // shouldn't we do something with the result here? - iemnet__chunk_destroy(c); - } else { - return 0; - } - return 1; -} - -static void*iemnet__sender_sendthread(void*arg) { - t_iemnet_sender*sender=(t_iemnet_sender*)arg; - - int sockfd=sender->sockfd; - t_queue*q=sender->queue; - - while(sender->cont) { - if(!iemnet__sender_dosend(sockfd, q))break; - } - //fprintf(stderr, "write thread terminated\n"); - return NULL; -} - -int iemnet__sender_send(t_iemnet_sender*s, t_iemnet_chunk*c) { - t_queue*q=s->queue; - int size=0; - if(q) { - t_iemnet_chunk*chunk=iemnet__chunk_create_chunk(c); - size = queue_push(q, chunk); - } - return size; -} - -void iemnet__sender_destroy(t_iemnet_sender*s) { - s->cont=0; - queue_finish(s->queue); - s->sockfd = -1; - pthread_join(s->thread, NULL); - queue_destroy(s->queue); - - freebytes(s, sizeof(t_iemnet_sender)); - s=NULL; -} -t_iemnet_sender*iemnet__sender_create(int sock) { - t_iemnet_sender*result=(t_iemnet_sender*)getbytes(sizeof(t_iemnet_sender)); - int res=0; - - if(NULL==result)return NULL; - - result->queue = queue_create(); - result->sockfd = sock; - result->cont =1; - - res=pthread_create(&result->thread, 0, iemnet__sender_sendthread, result); - - if(0==res) { - - } else { - // something went wrong - } - - return result; -} - -int iemnet__sender_getlasterror(t_iemnet_sender*x) { - x=NULL; -#ifdef _WIN32 - return WSAGetLastError(); -#endif - return errno; -} - - -int iemnet__sender_getsockopt(t_iemnet_sender*s, int level, int optname, void *optval, socklen_t*optlen) { - int result=getsockopt(s->sockfd, level, optname, optval, optlen); - if(result!=0) { - post("%s: getsockopt returned %d", __FUNCTION__, iemnet__sender_getlasterror(s)); - } - return result; -} -int iemnet__sender_setsockopt(t_iemnet_sender*s, int level, int optname, const void*optval, socklen_t optlen) { - int result=setsockopt(s->sockfd, level, optname, optval, optlen); - if(result!=0) { - post("%s: setsockopt returned %d", __FUNCTION__, iemnet__sender_getlasterror(s)); - } - return result; - -} - - - -struct _iemnet_receiver { - pthread_t thread; - int sockfd; /* owned outside; you must call iemnet__receiver_destroy() before freeing socket yourself */ - void*owner; - t_iemnet_chunk*data; - t_iemnet_receivecallback callback; - t_queue*queue; - int running; - t_clock *clock; - t_iemnet_floatlist*flist; -}; - - -/* the workhorse of the family */ -static void*iemnet__receiver_readthread(void*arg) { - int result = 0; - t_iemnet_receiver*receiver=(t_iemnet_receiver*)arg; - - int sockfd=receiver->sockfd; - t_queue*q=receiver->queue; - - unsigned char data[INBUFSIZE]; - unsigned int size=INBUFSIZE; - - unsigned int i=0; - for(i=0; irunning=1; - while(1) { - t_iemnet_chunk*c=NULL; - //fprintf(stderr, "reading %d bytes...\n", size); - result = recv(sockfd, data, size, 0); - //fprintf(stderr, "read %d bytes...\n", result); - - if(result<=0)break; - c= iemnet__chunk_create_data(result, data); - - queue_push(q, c); - - if(receiver->clock)clock_delay(receiver->clock, 0); - } - - - if(result>=0) - if(receiver->clock)clock_delay(receiver->clock, 0); - - receiver->running=0; - - //fprintf(stderr, "read thread terminated\n"); - return NULL; -} - -static void iemnet__receiver_tick(t_iemnet_receiver *x) -{ - // received data - t_iemnet_chunk*c=queue_pop_noblock(x->queue); - while(NULL!=c) { - x->flist = iemnet__chunk2list(c, x->flist); - (x->callback)(x->owner, x->sockfd, x->flist->argc, x->flist->argv); - iemnet__chunk_destroy(c); - c=queue_pop_noblock(x->queue); - } - if(!x->running) { - // read terminated - x->callback(x->owner, x->sockfd, 0, NULL); - } -} - - -t_iemnet_receiver*iemnet__receiver_create(int sock, void*owner, t_iemnet_receivecallback callback) { - t_iemnet_receiver*rec=(t_iemnet_receiver*)getbytes(sizeof(t_iemnet_receiver)); - //fprintf(stderr, "new receiver for %d\t%x\t%x\n", sock, owner, callback); - if(rec) { - t_iemnet_chunk*data=iemnet__chunk_create_empty(INBUFSIZE); - int res=0; - if(NULL==data) { - iemnet__receiver_destroy(rec); - return NULL; - } - rec->sockfd=sock; - rec->owner=owner; - rec->data=data; - rec->callback=callback; - rec->flist=iemnet__floatlist_create(1024); - - rec->queue = queue_create(); - rec->clock = clock_new(rec, (t_method)iemnet__receiver_tick); - rec->running=1; - res=pthread_create(&rec->thread, 0, iemnet__receiver_readthread, rec); - } - //fprintf(stderr, "new receiver created\n"); - - return rec; -} -void iemnet__receiver_destroy(t_iemnet_receiver*rec) { - if(NULL==rec)return; - if(rec->data)iemnet__chunk_destroy(rec->data); - if(rec->flist)iemnet__floatlist_destroy(rec->flist); - clock_free(rec->clock); - rec->clock=NULL; - - shutdown(rec->sockfd, 2); /* needed on linux, since the recv won't shutdown on sys_closesocket() alone */ - sys_closesocket(rec->sockfd); - - rec->sockfd=0; - pthread_join(rec->thread, NULL); - rec->owner=NULL; - rec->data=NULL; - rec->callback=NULL; - rec->clock=NULL; - rec->flist=NULL; - - freebytes(rec, sizeof(t_iemnet_receiver)); - rec=NULL; -} diff --git a/tcpclient.c b/tcpclient.c index 42fe2e0..23019d0 100644 --- a/tcpclient.c +++ b/tcpclient.c @@ -247,6 +247,8 @@ static void tcpclient_free(t_tcpclient *x) IEMNET_EXTERN void tcpclient_setup(void) { + static int again=0; if(again)return; again=1; + tcpclient_class = class_new(gensym(objName), (t_newmethod)tcpclient_new, (t_method)tcpclient_free, sizeof(t_tcpclient), 0, A_DEFFLOAT, 0); @@ -261,4 +263,10 @@ IEMNET_EXTERN void tcpclient_setup(void) post(" based on mrpeach/net, based on maxlib"); } + +IEMNET_INITIALIZER(tcpclient_setup); + + + + /* end of tcpclient.c */ diff --git a/tcpserver.c b/tcpserver.c index 74e8211..52067a6 100644 --- a/tcpserver.c +++ b/tcpserver.c @@ -414,11 +414,12 @@ static void tcpserver_free(t_tcpserver *x) sys_rmpollfn(x->x_connectsocket); sys_closesocket(x->x_connectsocket); } - } -IEMNET_EXTERN void tcpserver_setup(void) +IEMNET_EXTERN void tcpserver_setup(void) { + static int again=0; if(again)return; again=1; + tcpserver_class = class_new(gensym(objName),(t_newmethod)tcpserver_new, (t_method)tcpserver_free, sizeof(t_tcpserver), 0, A_DEFFLOAT, 0); class_addmethod(tcpserver_class, (t_method)tcpserver_disconnect_client, gensym("disconnectclient"), A_DEFFLOAT, 0); @@ -439,4 +440,7 @@ IEMNET_EXTERN void tcpserver_setup(void) post(" based on mrpeach/net, based on maxlib"); } +IEMNET_INITIALIZER(tcpserver_setup); + + /* end of tcpserver.c */ -- cgit v1.2.1