From 806df1c7645fdcfc945fb9e1467ea7aaada2b903 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?IOhannes=20m=20zm=C3=B6lnig?= Date: Tue, 30 Mar 2010 07:36:07 +0000 Subject: split core library into separate files svn path=/trunk/externals/iem/iemnet/; revision=13310 --- iemnet.c | 681 --------------------------------------------------------------- 1 file changed, 681 deletions(-) (limited to 'iemnet.c') diff --git a/iemnet.c b/iemnet.c index 46b8332..16025cf 100644 --- a/iemnet.c +++ b/iemnet.c @@ -6,683 +6,6 @@ #include "iemnet.h" -#include -#include -#include - -#include - -#ifdef _WIN32 -# include -# include /* for socklen_t */ -#else -# include -#endif - -#include - - - -#define INBUFSIZE 65536L /* was 4096: size of receiving data buffer */ - - -/* data handling */ - -typedef struct _iemnet_floatlist { - t_atom*argv; - size_t argc; - - size_t size; // real size (might be bigger than argc) -} t_iemnet_floatlist; - -t_iemnet_floatlist*iemnet__floatlist_init(t_iemnet_floatlist*cl) { - unsigned int i; - if(NULL==cl)return NULL; - for(i=0; isize; i++) - SETFLOAT((cl->argv+i), 0.f); - - return cl; -} - -void iemnet__floatlist_destroy(t_iemnet_floatlist*cl) { - if(NULL==cl)return; - if(cl->argv) freebytes(cl->argv, sizeof(t_atom)*cl->size); - cl->argv=NULL; - cl->argc=0; - cl->size=0; - - freebytes(cl, sizeof(t_iemnet_floatlist)); -} - -t_iemnet_floatlist*iemnet__floatlist_create(unsigned int size) { - t_iemnet_floatlist*result=(t_iemnet_floatlist*)getbytes(sizeof(t_iemnet_floatlist)); - if(NULL==result)return NULL; - - result->argv = (t_atom*)getbytes(size*sizeof(t_atom)); - if(NULL==result->argv) { - iemnet__floatlist_destroy(result); - return NULL; - } - - result->argc=size; - result->size=size; - - result=iemnet__floatlist_init(result); - - return result; -} - -t_iemnet_floatlist*iemnet__floatlist_resize(t_iemnet_floatlist*cl, unsigned int size) { - t_atom*tmp; - if(size<=cl->size) { - cl->argc=size; - return cl; - } - - tmp=(t_atom*)getbytes(size*sizeof(t_atom)); - if(NULL==tmp) return NULL; - - freebytes(cl->argv, sizeof(t_atom)*cl->size); - - cl->argv=tmp; - cl->argc=cl->size=size; - - cl=iemnet__floatlist_init(cl); - - return cl; -} - - - -void iemnet__chunk_destroy(t_iemnet_chunk*c) { - if(NULL==c)return; - - if(c->data)freebytes(c->data, c->size*sizeof(unsigned char)); - - c->data=NULL; - c->size=0; - - freebytes(c, sizeof(t_iemnet_chunk)); -} - -t_iemnet_chunk* iemnet__chunk_create_empty(int size) { - t_iemnet_chunk*result=(t_iemnet_chunk*)getbytes(sizeof(t_iemnet_chunk)); - if(result) { - result->size=size; - result->data=(unsigned char*)getbytes(sizeof(unsigned char)*size); - - if(NULL == result->data) { - result->size=0; - iemnet__chunk_destroy(result); - return NULL; - } - - memset(result->data, 0, result->size); - - result->addr=0L; - result->port=0; - - } - return result; -} - -t_iemnet_chunk* iemnet__chunk_create_data(int size, unsigned char*data) { - t_iemnet_chunk*result=iemnet__chunk_create_empty(size); - if(result) { - memcpy(result->data, data, result->size); - } - return result; -} - -t_iemnet_chunk* iemnet__chunk_create_dataaddr(int size, - unsigned char*data, - struct sockaddr_in*addr) { - t_iemnet_chunk*result=iemnet__chunk_create_data(size, data); - if(addr) { - result->addr = ntohl(addr->sin_addr.s_addr); - result->port = ntohs(addr->sin_port); - } - return result; -} - -t_iemnet_chunk* iemnet__chunk_create_list(int argc, t_atom*argv) { - int i; - t_iemnet_chunk*result=iemnet__chunk_create_empty(argc); - if(NULL==result)return NULL; - - for(i=0; idata[i]=c; - argv++; - } - - return result; -} - -t_iemnet_chunk*iemnet__chunk_create_chunk(t_iemnet_chunk*c) { - t_iemnet_chunk*result=NULL; - if(NULL==c)return NULL; - result=iemnet__chunk_create_data(c->size, c->data); - result->addr=c->addr; - result->port=c->port; - - return result; -} - - -t_iemnet_floatlist*iemnet__chunk2list(t_iemnet_chunk*c, t_iemnet_floatlist*dest) { - unsigned int i; - if(NULL==c)return NULL; - dest=iemnet__floatlist_resize(dest, c->size); - if(NULL==dest)return NULL; - - for(i=0; isize; i++) { - dest->argv[i].a_w.w_float = c->data[i]; - } - - return dest; -} - - -/* queue handling */ - -/* - * using code found at http://newsgroups.derkeiler.com/Archive/Comp/comp.programming.threads/2008-02/msg00502.html - */ - - - -typedef struct _node { - struct _node* next; - t_iemnet_chunk*data; -} t_node; - -typedef struct _queue { - t_node* head; /* = 0 */ - t_node* tail; /* = 0 */ - pthread_mutex_t mtx; - pthread_cond_t cond; - - int done; // in cleanup state - int size; -} t_queue; - - -int queue_push( - t_queue* const _this, - t_iemnet_chunk* const data - ) { - t_node* tail; - t_node* n=NULL; - int size=_this->size; - - if(NULL == data) return size; - //fprintf(stderr, "pushing %d bytes\n", data->size); - - n=(t_node*)getbytes(sizeof(t_node)); - - n->next = 0; - n->data = data; - pthread_mutex_lock(&_this->mtx); - if (! (tail = _this->tail)) { - _this->head = n; - } else { - tail->next = n; - } - _this->tail = n; - - _this->size+=data->size; - size=_this->size; - - - //fprintf(stderr, "pushed %d bytes\n", data->size); - - pthread_mutex_unlock(&_this->mtx); - pthread_cond_signal(&_this->cond); - - return size; -} - -t_iemnet_chunk* queue_pop_block( - t_queue* const _this - ) { - t_node* head=0; - t_iemnet_chunk*data=0; - pthread_mutex_lock(&_this->mtx); - while (! (head = _this->head)) { - if(_this->done) { - pthread_mutex_unlock(&_this->mtx); - return NULL; - } - else { - pthread_cond_wait(&_this->cond, &_this->mtx); - } - } - - if (! (_this->head = head->next)) { - _this->tail = 0; - } - if(head && head->data) { - _this->size-=head->data->size; - } - - pthread_mutex_unlock(&_this->mtx); - if(head) { - data=head->data; - freebytes(head, sizeof(t_node)); - head=NULL; - } - return data; -} - -t_iemnet_chunk* queue_pop_noblock( - t_queue* const _this - ) { - t_node* head=0; - t_iemnet_chunk*data=0; - pthread_mutex_lock(&_this->mtx); - if (! (head = _this->head)) { - // empty head - pthread_mutex_unlock(&_this->mtx); - return NULL; - } - if (! (_this->head = head->next)) { - _this->tail = 0; - } - if(head && head->data) { - _this->size-=head->data->size; - } - - pthread_mutex_unlock(&_this->mtx); - if(head) { - data=head->data; - freebytes(head, sizeof(t_node)); - head=NULL; - } - return data; -} - -t_iemnet_chunk* queue_pop(t_queue* const _this) { - return queue_pop_block(_this); -} - - -void queue_finish(t_queue* q) { - DEBUG("queue_finish: %x", q); - if(NULL==q) - return; - q->done=1; - DEBUG("queue signaling: %x", q); - pthread_cond_signal(&q->cond); - DEBUG("queue signaled", q); -} - -void queue_destroy(t_queue* q) { - t_iemnet_chunk*c=NULL; - if(NULL==q) - return; - - queue_finish(q); - - /* remove all the chunks from the queue */ - while(NULL!=(c=queue_pop_noblock(q))) { - iemnet__chunk_destroy(c); - } - - q->head=NULL; - q->tail=NULL; - - pthread_mutex_destroy(&q->mtx); - pthread_cond_destroy(&q->cond); - - freebytes(q, sizeof(t_queue)); - q=NULL; -} - -t_queue* queue_create(void) { - static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; - static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; - - t_queue*q=(t_queue*)getbytes(sizeof(t_queue)); - if(NULL==q)return NULL; - - q->head = NULL; - q->tail = NULL; - - memcpy(&q->cond, &cond, sizeof(pthread_cond_t)); - memcpy(&q->mtx , &mtx, sizeof(pthread_mutex_t)); - - q->done = 0; - q->size = 0; - - return q; -} - - - - - /* draft: - * - there is a sender thread for each open connection - * - the main thread just adds chunks to each sender threads processing queue - * - the sender thread tries to send the queue as fast as possible - */ - -struct _iemnet_sender { - pthread_t thread; - - int sockfd; /* owned outside; must call iemnet__sender_destroy() before freeing socket yourself */ - t_queue*queue; - int keepsending; // indicates whether we want to thread to continue or to terminate - int isrunning; -}; - -/* the workhorse of the family */ - -static int iemnet__sender_dosend(int sockfd, t_queue*q) { - t_iemnet_chunk*c=queue_pop_block(q); - if(c) { - unsigned char*data=c->data; - unsigned int size=c->size; - int result=-1; - // fprintf(stderr, "sending %d bytes at %x to %d\n", size, data, sockfd); - DEBUG("sending %d bytes", size); - result = send(sockfd, data, size, 0); - if(result<0) { - // broken pipe - return 0; - } - - // shouldn't we do something with the result here? - DEBUG("sent %d bytes", result); - iemnet__chunk_destroy(c); - } else { - return 0; - } - return 1; -} - -static void*iemnet__sender_sendthread(void*arg) { - t_iemnet_sender*sender=(t_iemnet_sender*)arg; - - int sockfd=sender->sockfd; - t_queue*q=sender->queue; - - while(sender->keepsending) { - if(!iemnet__sender_dosend(sockfd, q))break; - } - sender->isrunning=0; - DEBUG("send thread terminated"); - return NULL; -} - -int iemnet__sender_send(t_iemnet_sender*s, t_iemnet_chunk*c) { - t_queue*q=s->queue; - int size=-1; - if(!s->isrunning)return -1; - if(q) { - t_iemnet_chunk*chunk=iemnet__chunk_create_chunk(c); - size = queue_push(q, chunk); - } - return size; -} - -void iemnet__sender_destroy(t_iemnet_sender*s) { - /* simple protection against recursive calls: - * s->keepsending is only set to "0" in here, - * so if it is false, we know that we are already being called - */ - if(!s->keepsending)return; - - DEBUG("destroy sender %x", s); - s->keepsending=0; - queue_finish(s->queue); - DEBUG("queue finished"); - s->sockfd = -1; - pthread_join(s->thread, NULL); - DEBUG("thread joined"); - queue_destroy(s->queue); - freebytes(s, sizeof(t_iemnet_sender)); - s=NULL; - DEBUG("destroyed sender"); -} -t_iemnet_sender*iemnet__sender_create(int sock) { - t_iemnet_sender*result=(t_iemnet_sender*)getbytes(sizeof(t_iemnet_sender)); - int res=0; - DEBUG("create sender %x", result); - if(NULL==result){ - DEBUG("create sender failed"); - return NULL; - } - - result->queue = queue_create(); - result->sockfd = sock; - result->keepsending =1; - result->isrunning=1; - - res=pthread_create(&result->thread, 0, iemnet__sender_sendthread, result); - - if(0==res) { - - } else { - // something went wrong - } - - DEBUG("created sender"); - return result; -} - -int iemnet__sender_getlasterror(t_iemnet_sender*x) { - x=NULL; -#ifdef _WIN32 - return WSAGetLastError(); -#endif - return errno; -} - - -int iemnet__sender_getsockopt(t_iemnet_sender*s, int level, int optname, void *optval, socklen_t*optlen) { - int result=getsockopt(s->sockfd, level, optname, optval, optlen); - if(result!=0) { - post("%s: getsockopt returned %d", __FUNCTION__, iemnet__sender_getlasterror(s)); - } - return result; -} -int iemnet__sender_setsockopt(t_iemnet_sender*s, int level, int optname, const void*optval, socklen_t optlen) { - int result=setsockopt(s->sockfd, level, optname, optval, optlen); - if(result!=0) { - post("%s: setsockopt returned %d", __FUNCTION__, iemnet__sender_getlasterror(s)); - } - return result; - -} - - - -struct _iemnet_receiver { - pthread_t thread; - int sockfd; /* owned outside; you must call iemnet__receiver_destroy() before freeing socket yourself */ - void*userdata; - t_iemnet_chunk*data; - t_iemnet_receivecallback callback; - t_queue*queue; - int running; - t_clock *clock; - t_iemnet_floatlist*flist; - - int keepreceiving; - - int newdataflag; - pthread_mutex_t newdatamtx; -}; - -/* notifies Pd that there is new data to fetch */ -static void iemnet_signalNewData(t_iemnet_receiver*x) { - int already=0; - pthread_mutex_lock(&x->newdatamtx); - already=x->newdataflag; - x->newdataflag=1; - - /* don't schedule ticks at the end of life */ - if(x->sockfd<0)already=1; - - pthread_mutex_unlock(&x->newdatamtx); - - if(already)return; - sys_lock(); - if(x->clock)clock_delay(x->clock, 0); - sys_unlock(); -} - - -/* the workhorse of the family */ -static void*iemnet__receiver_readthread(void*arg) { - int result = 0; - t_iemnet_receiver*receiver=(t_iemnet_receiver*)arg; - - int sockfd=receiver->sockfd; - t_queue*q=receiver->queue; - - unsigned char data[INBUFSIZE]; - unsigned int size=INBUFSIZE; - - struct sockaddr_in from; - socklen_t fromlen = sizeof(from); - - unsigned int i=0; - for(i=0; irunning=1; - while(1) { - t_iemnet_chunk*c=NULL; - fromlen = sizeof(from); - //fprintf(stderr, "reading %d bytes...\n", size); - //result = recv(sockfd, data, size, 0); - result = recvfrom(sockfd, data, size, 0, (struct sockaddr *)&from, &fromlen); - //fprintf(stderr, "read %d bytes...\n", result); - - if(result<=0)break; - c= iemnet__chunk_create_dataaddr(result, data, &from); - - queue_push(q, c); - - iemnet_signalNewData(receiver); - - } - if(result>=0)iemnet_signalNewData(receiver); - - receiver->running=0; - - //fprintf(stderr, "read thread terminated\n"); - return NULL; -} - -/* callback from Pd's main thread to fetch queued data */ -static void iemnet__receiver_tick(t_iemnet_receiver *x) -{ - // received data - t_iemnet_chunk*c=queue_pop_noblock(x->queue); - while(NULL!=c) { - x->flist = iemnet__chunk2list(c, x->flist); - (x->callback)(x->userdata, c, x->flist->argc, x->flist->argv); - iemnet__chunk_destroy(c); - c=queue_pop_noblock(x->queue); - } - pthread_mutex_lock(&x->newdatamtx); - x->newdataflag=0; - pthread_mutex_unlock(&x->newdatamtx); - - if(!x->running) { - // read terminated - - /* keepreceiving is set, if receiver is not yet in shutdown mode */ - if(x->keepreceiving) - x->callback(x->userdata, NULL, 0, NULL); - } -} - - -t_iemnet_receiver*iemnet__receiver_create(int sock, void*userdata, t_iemnet_receivecallback callback) { - static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; - t_iemnet_receiver*rec=(t_iemnet_receiver*)getbytes(sizeof(t_iemnet_receiver)); - DEBUG("create new receiver for 0x%X:%d", userdata, sock); - //fprintf(stderr, "new receiver for %d\t%x\t%x\n", sock, userdata, callback); - if(rec) { - t_iemnet_chunk*data=iemnet__chunk_create_empty(INBUFSIZE); - int res=0; - if(NULL==data) { - iemnet__receiver_destroy(rec); - DEBUG("create receiver failed"); - return NULL; - } - rec->keepreceiving=1; - rec->sockfd=sock; - rec->userdata=userdata; - rec->data=data; - rec->callback=callback; - rec->flist=iemnet__floatlist_create(1024); - - memcpy(&rec->newdatamtx , &mtx, sizeof(pthread_mutex_t)); - rec->newdataflag=0; - - rec->queue = queue_create(); - rec->clock = clock_new(rec, (t_method)iemnet__receiver_tick); - rec->running=1; - res=pthread_create(&rec->thread, 0, iemnet__receiver_readthread, rec); - } - //fprintf(stderr, "new receiver created\n"); - - return rec; -} -void iemnet__receiver_destroy(t_iemnet_receiver*rec) { - static int instance=0; - int inst=instance++; - - int sockfd; - DEBUG("[%d] destroy receiver %x", inst, rec); - if(NULL==rec)return; - if(!rec->keepreceiving)return; - rec->keepreceiving=0; - - - sockfd=rec->sockfd; - rec->sockfd=-1; - - DEBUG("[%d] really destroying receiver %x -> %d", inst, rec, sockfd); - - if(sockfd>=0) { - shutdown(sockfd, 2); /* needed on linux, since the recv won't shutdown on sys_closesocket() alone */ - sys_closesocket(sockfd); - } - DEBUG("[%d] closed socket %d", inst, sockfd); - - pthread_join(rec->thread, NULL); - - - // empty the queue - DEBUG("[%d] tick %d", inst, rec->running); - iemnet__receiver_tick(rec); - DEBUG("[%d] tack", inst); - - if(rec->data)iemnet__chunk_destroy(rec->data); - if(rec->flist)iemnet__floatlist_destroy(rec->flist); - - pthread_mutex_destroy(&rec->newdatamtx); - - clock_free(rec->clock); - rec->clock=NULL; - - rec->userdata=NULL; - rec->data=NULL; - rec->callback=NULL; - rec->flist=NULL; - - freebytes(rec, sizeof(t_iemnet_receiver)); - rec=NULL; - DEBUG("[%d] destroyed receiver", inst); -} - - - - void iemnet__addrout(t_outlet*status_outlet, t_outlet*address_outlet, long address, unsigned short port) { @@ -707,10 +30,6 @@ void iemnet__addrout(t_outlet*status_outlet, t_outlet*address_outlet, - - - - #ifdef _MSC_VER void tcpclient_setup(void); void tcpserver_setup(void); -- cgit v1.2.1