From c5e0079ae1323ad4c35d28f83ce732bb3820c46a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?IOhannes=20m=20zm=C3=B6lnig?= Date: Mon, 31 Aug 2015 22:11:51 +0000 Subject: un-threaded receive svn path=/trunk/externals/iem/iemnet/; revision=17543 --- iemnet_receiver.c | 265 +++++++++++++----------------------------------------- 1 file changed, 60 insertions(+), 205 deletions(-) (limited to 'iemnet_receiver.c') diff --git a/iemnet_receiver.c b/iemnet_receiver.c index f9f3fa8..898bac8 100644 --- a/iemnet_receiver.c +++ b/iemnet_receiver.c @@ -2,9 +2,8 @@ * * receiver * receives data "chunks" from a socket - * possibly threaded * - * copyright (c) 2010 IOhannes m zmölnig, IEM + * copyright (c) 2010-2015 IOhannes m zmölnig, IEM */ /* This program is free software; you can redistribute it and/or */ @@ -18,9 +17,8 @@ /* GNU General Public License for more details. */ /* */ /* You should have received a copy of the GNU General Public License */ -/* along with this program; if not, write to the Free Software */ -/* Foundation, Inc., */ -/* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ +/* along with this program; if not, see */ +/* http://www.gnu.org/licenses/ */ /* */ #define DEBUGLEVEL 4 @@ -28,258 +26,115 @@ #include "iemnet.h" #include "iemnet_data.h" -#include -#include #include #include -#include - #define INBUFSIZE 65536L /* was 4096: size of receiving data buffer */ struct _iemnet_receiver { - pthread_t recthread; int sockfd; /* owned outside; you must call iemnet__receiver_destroy() before freeing socket yourself */ void*userdata; - t_iemnet_chunk*data; t_iemnet_receivecallback callback; - t_iemnet_queue*queue; - - int running; - int keepreceiving; - - pthread_mutex_t running_mtx, keeprec_mtx; - pthread_cond_t running_cond; - t_iemnet_notify*notifier; }; -static t_iemnet_notifier*receivenotifier=NULL; - -/* the workhorse of the family */ -static void*iemnet__receiver_readthread(void*arg) { - unsigned int i=0; - int result = 0; - t_iemnet_receiver*receiver=(t_iemnet_receiver*)arg; - int sockfd=receiver->sockfd; - t_iemnet_queue*q=receiver->queue; +static void pollfun(void*z, int fd) +{ + // read data from socket and call callback + t_iemnet_receiver*rec=(t_iemnet_receiver*)z; unsigned char data[INBUFSIZE]; unsigned int size=INBUFSIZE; + t_iemnet_chunk*chunk=NULL; + int result = 0; + int local_errno = 0; struct sockaddr_in from; socklen_t fromlen = sizeof(from); int recv_flags=0; - - struct timeval timout; - fd_set readset; - FD_ZERO(&readset); - FD_SET(sockfd, &readset); - - for(i=0; irunning=1; - - pthread_mutex_lock (&receiver->running_mtx); - pthread_cond_signal (&receiver->running_cond); - pthread_mutex_unlock(&receiver->running_mtx); - - while(1) { - t_iemnet_chunk*c=NULL; - fd_set rs; - - pthread_mutex_lock(&receiver->keeprec_mtx); - if(!receiver->keepreceiving) { - pthread_mutex_unlock(&receiver->keeprec_mtx); - break; - } - pthread_mutex_unlock(&receiver->keeprec_mtx); - - fromlen = sizeof(from); - rs=readset; - timout.tv_sec=0; - timout.tv_usec=1000; - #ifdef MSG_DONTWAIT - recv_flags|=MSG_DONTWAIT; + recv_flags|=MSG_DONTWAIT; #endif - select(sockfd+1, &rs, NULL, NULL, - &timout); - if (!FD_ISSET(sockfd, &rs))continue; - - DEBUG("select can read"); - - //fprintf(stderr, "reading %d bytes...\n", size); - //result = recv(sockfd, data, size, 0); - - result = recvfrom(sockfd, data, size, recv_flags, (struct sockaddr *)&from, &fromlen); - //fprintf(stderr, "read %d bytes...\n", result); - DEBUG("recfrom %d bytes: %p", result, data); - c= iemnet__chunk_create_dataaddr(result, (result>0)?data:NULL, &from); - DEBUG("pushing"); - queue_push(q, c); - DEBUG("signalling"); - iemnet__notify(receiver->notifier); - - - if(result<=0) break; - - DEBUG("rereceive"); - } - // oha - DEBUG("readthread loop termination: %d", result); - //if(result>=0)iemnet_signalNewData(receiver); - - pthread_mutex_lock(&receiver->running_mtx); - receiver->running=0; - pthread_mutex_unlock(&receiver->running_mtx); - - DEBUG("read thread terminated"); - return NULL; + errno=0; + result = recvfrom(rec->sockfd, data, size, recv_flags, + (struct sockaddr *)&from, &fromlen); + local_errno=errno; + //fprintf(stderr, "read %d bytes...\n", result); + DEBUG("recvfrom %d bytes: %d %p %d", result, rec->sockfd, data, size); + DEBUG("errno=%d", local_errno); + chunk = iemnet__chunk_create_dataaddr(result, (result>0)?data:NULL, &from); + + // call the callback with a NULL-chunk to signal a disconnect event. + (rec->callback)(rec->userdata, chunk); + + iemnet__chunk_destroy(chunk); } -/* callback from Pd's main thread to fetch queued data */ -static void iemnet__receiver_tick(t_iemnet_receiver *x) +t_iemnet_receiver*iemnet__receiver_create(int sock, void*userdata, + t_iemnet_receivecallback callback, int subthread) { - int running=0, keepreceiving=0; - // received data - t_iemnet_chunk*c=queue_pop_noblock(x->queue); - DEBUG("tick got chunk %p", c); - - while(NULL!=c) { - (x->callback)(x->userdata, c); - iemnet__chunk_destroy(c); - c=queue_pop_noblock(x->queue); - } - DEBUG("tick cleanup"); - - pthread_mutex_lock(&x->running_mtx); - running = x->running; - pthread_mutex_unlock(&x->running_mtx); - - DEBUG("tick running %d", running); - if(!running) { - // read terminated - pthread_mutex_lock(&x->keeprec_mtx); - keepreceiving=x->keepreceiving; - pthread_mutex_unlock(&x->keeprec_mtx); - - /* keepreceiving is set, if receiver is not yet in shutdown mode */ - if(keepreceiving) - x->callback(x->userdata, NULL); - } - DEBUG("tick DONE"); -} - -int iemnet__receiver_getsize(t_iemnet_receiver*x) { - int size=-1; - if(x && x->queue) - size=queue_getsize(x->queue); - - return size; -} - - -t_iemnet_receiver*iemnet__receiver_create(int sock, void*userdata, t_iemnet_receivecallback callback) { - static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; - t_iemnet_receiver*rec=(t_iemnet_receiver*)malloc(sizeof(t_iemnet_receiver)); - - if(NULL==receivenotifier) - receivenotifier=iemnet__notify_create(); + t_iemnet_receiver*rec=(t_iemnet_receiver*)malloc(sizeof( + t_iemnet_receiver)); DEBUG("create new receiver for 0x%X:%d", userdata, sock); //fprintf(stderr, "new receiver for %d\t%x\t%x\n", sock, userdata, callback); if(rec) { - t_iemnet_chunk*data=NULL; - int res=0; - - data=iemnet__chunk_create_empty(INBUFSIZE); - if(NULL==data) { - iemnet__receiver_destroy(rec); - DEBUG("create receiver failed"); - return NULL; - } - - rec->keepreceiving=1; rec->sockfd=sock; rec->userdata=userdata; - rec->data=data; rec->callback=callback; - pthread_mutex_init(&rec->running_mtx , 0); - pthread_mutex_init(&rec->keeprec_mtx , 0); - - pthread_cond_init(&rec->running_cond, 0); - - rec->running=1; - - rec->queue = queue_create(); - rec->notifier = iemnet__notify_add(receivenotifier, (t_iemnet_notifun)iemnet__receiver_tick, rec); + if(subthread) { + sys_lock(); + } + sys_addpollfn(sock, pollfun, rec); + if(subthread) { + sys_unlock(); + } - /* start the recv thread */ - pthread_mutex_lock(&rec->running_mtx); - res=pthread_create(&rec->recthread, 0, iemnet__receiver_readthread, rec); - if(!res)pthread_cond_wait(&rec->running_cond, &rec->running_mtx); - pthread_mutex_unlock(&rec->running_mtx); } //fprintf(stderr, "new receiver created\n"); return rec; } - -void iemnet__receiver_destroy(t_iemnet_receiver*rec) { - static int instance=0; - int inst=instance++; - +void iemnet__receiver_destroy(t_iemnet_receiver*rec, int subthread) +{ int sockfd; - DEBUG("[%d] destroy receiver %x", inst, rec); - if(NULL==rec)return; - pthread_mutex_lock(&rec->keeprec_mtx); - if(!rec->keepreceiving) { - pthread_mutex_unlock(&rec->keeprec_mtx); + if(NULL==rec) { return; - } - rec->keepreceiving=0; - pthread_mutex_unlock(&rec->keeprec_mtx); + } sockfd=rec->sockfd; - pthread_join(rec->recthread, NULL); - iemnet__notify_remove(rec->notifier); - - DEBUG("[%d] really destroying receiver %x -> %d", inst, rec, sockfd); - - if(sockfd>=0) { - /* this doesn't alway make recvfrom() return! - * - try polling - * - try sending a signal with pthread_kill() ? - */ - - shutdown(sockfd, 2); /* needed on linux, since the recv won't shutdown on sys_closesocket() alone */ - sys_closesocket(sockfd); + if(subthread) { + sys_lock(); } - DEBUG("[%d] closed socket %d", inst, sockfd); - - rec->sockfd=-1; + sys_rmpollfn(rec->sockfd); - // empty the queue - DEBUG("[%d] tick %d", inst, rec->running); - iemnet__receiver_tick(rec); - queue_destroy(rec->queue); - DEBUG("[%d] tack", inst); + // FIXXME: read any remaining bytes from the socket - if(rec->data)iemnet__chunk_destroy(rec->data); + if(subthread) { + sys_unlock(); + } - pthread_mutex_destroy(&rec->running_mtx); - pthread_mutex_destroy(&rec->keeprec_mtx); - pthread_cond_destroy(&rec->running_cond); + DEBUG("[%p] really destroying receiver %d", sockfd); + //iemnet__closesocket(sockfd); + DEBUG("[%p] closed socket %d", rec, sockfd); + rec->sockfd=-1; rec->userdata=NULL; - rec->data=NULL; rec->callback=NULL; - rec->queue=NULL; free(rec); rec=NULL; - DEBUG("[%d] destroyed receiver", inst); +} + + +/* just dummy, since we don't maintain a queue any more */ +int iemnet__receiver_getsize(t_iemnet_receiver*x) +{ + if(x) { + return 0; + } + return -1; } -- cgit v1.2.1