From c5e0079ae1323ad4c35d28f83ce732bb3820c46a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?IOhannes=20m=20zm=C3=B6lnig?= Date: Mon, 31 Aug 2015 22:11:51 +0000 Subject: un-threaded receive svn path=/trunk/externals/iem/iemnet/; revision=17543 --- iemnet_sender.c | 161 ++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 103 insertions(+), 58 deletions(-) (limited to 'iemnet_sender.c') diff --git a/iemnet_sender.c b/iemnet_sender.c index 0e4f866..8cf1d14 100644 --- a/iemnet_sender.c +++ b/iemnet_sender.c @@ -4,7 +4,7 @@ * sends data "chunks" to a socket * possibly threaded * - * copyright (c) 2010 IOhannes m zmölnig, IEM + * copyright (c) 2010 IOhannes m zmölnig, IEM */ /* This program is free software; you can redistribute it and/or */ @@ -18,9 +18,8 @@ /* GNU General Public License for more details. */ /* */ /* You should have received a copy of the GNU General Public License */ -/* along with this program; if not, write to the Free Software */ -/* Foundation, Inc., */ -/* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ +/* along with this program; if not, see */ +/* http://www.gnu.org/licenses/ */ /* */ #define DEBUGLEVEL 2 @@ -54,11 +53,11 @@ static int debug_lockcount=0; # define UNLOCK(x) pthread_mutex_unlock(x) #endif - /* draft: - * - there is a sender thread for each open connection - * - the main thread just adds chunks to each sender threads processing queue - * - the sender thread tries to send the queue as fast as possible - */ +/* draft: + * - there is a sender thread for each open connection + * - the main thread just adds chunks to each sender threads processing queue + * - the sender thread tries to send the queue as fast as possible + */ struct _iemnet_sender { pthread_t thread; @@ -68,62 +67,92 @@ struct _iemnet_sender { int keepsending; // indicates whether we want to thread to continue or to terminate int isrunning; + const void*userdata; /* user provided data */ + t_iemnet_sendfunction sendfun; /* user provided send function */ + pthread_mutex_t mtx; /* mutex to protect isrunning,.. */ }; /* the workhorse of the family */ -static int iemnet__sender_dosend(int sockfd, t_iemnet_queue*q) { + +static int iemnet__sender_defaultsend(const void*x, int sockfd, + t_iemnet_chunk*c) +{ + int result=-1; + struct sockaddr_in to; socklen_t tolen = sizeof(to); - t_iemnet_chunk*c=queue_pop_block(q); - if(c) { - unsigned char*data=c->data; - unsigned int size=c->size; - - int result=-1; + unsigned char*data=c->data; + unsigned int size=c->size; - // fprintf(stderr, "sending %d bytes at %x to %d\n", size, data, sockfd); - if(c->port) { - DEBUG("sending %d bytes to %x:%d", size, c->addr, c->port); + int flags = 0; +#ifdef __linux__ + flags |= MSG_NOSIGNAL; +#endif - to.sin_addr.s_addr=htonl(c->addr); - to.sin_port =htons(c->port); - result = sendto(sockfd, data, size, 0, (struct sockaddr *)&to, tolen); - } else { - DEBUG("sending %d bytes", size); - result = send(sockfd, data, size, 0); - } - if(result<0) { - // broken pipe - return 0; - } + // fprintf(stderr, "sending %d bytes at %x to %d\n", size, data, sockfd); + if(c->port) { + DEBUG("sending %d bytes to %x:%d @%d", size, c->addr, c->port, c->family); - // shouldn't we do something with the result here? - DEBUG("sent %d bytes", result); - iemnet__chunk_destroy(c); + to.sin_addr.s_addr=htonl(c->addr); + to.sin_port =htons(c->port); + to.sin_family =c->family; + result = sendto(sockfd, + data, size, /* DATA */ + flags, /* FLAGS */ + (struct sockaddr *)&to, tolen); /* DESTADDR */ } else { + DEBUG("sending %d bytes", size); + result = send(sockfd, + data, size, /* DATA */ + flags); /* FLAGS */ + } + if(result<0) { + // broken pipe return 0; } + + // shouldn't we do something with the result here? + DEBUG("sent %d bytes", result); return 1; } -static void*iemnet__sender_sendthread(void*arg) { +static void*iemnet__sender_sendthread(void*arg) +{ t_iemnet_sender*sender=(t_iemnet_sender*)arg; int sockfd=-1; t_iemnet_queue*q=NULL; + t_iemnet_chunk*c=NULL; + t_iemnet_sendfunction dosend=iemnet__sender_defaultsend; + const void*userdata=NULL; LOCK(&sender->mtx); - sockfd=sender->sockfd; q=sender->queue; + userdata=sender->userdata; + if(NULL!=sender->sendfun) { + dosend=sender->sendfun; + } + + sockfd=sender->sockfd; + + while(sender->keepsending) { UNLOCK(&sender->mtx); - if(!iemnet__sender_dosend(sockfd, q)){ - LOCK(&sender->mtx); - break; + + c=queue_pop_block(q); + if(c) { + if(!dosend(userdata, sockfd, c)) { + iemnet__chunk_destroy(c); + + LOCK(&sender->mtx); + break; + } + iemnet__chunk_destroy(c); + c=NULL; } LOCK(&sender->mtx); } @@ -133,7 +162,8 @@ static void*iemnet__sender_sendthread(void*arg) { return NULL; } -int iemnet__sender_send(t_iemnet_sender*s, t_iemnet_chunk*c) { +int iemnet__sender_send(t_iemnet_sender*s, t_iemnet_chunk*c) +{ t_iemnet_queue*q=0; int size=-1; LOCK (&s->mtx); @@ -150,10 +180,11 @@ int iemnet__sender_send(t_iemnet_sender*s, t_iemnet_chunk*c) { return size; } -void iemnet__sender_destroy(t_iemnet_sender*s) { +void iemnet__sender_destroy(t_iemnet_sender*s, int subthread) +{ int sockfd=-1; /* simple protection against recursive calls: - * s->keepsending is only set to "0" in here, + * s->keepsending is only set to "0" in here, * so if it is false, we know that we are already being called */ DEBUG("destroy sender %x with queue %x (%d)", s, s->queue, s->keepsending); @@ -178,11 +209,7 @@ void iemnet__sender_destroy(t_iemnet_sender*s) { queue_finish(s->queue); DEBUG("queue finished"); - - if(sockfd>=0) { - int err=shutdown(sockfd, 2); /* needed on linux, since the recv won't shutdown on sys_closesocket() alone */ - sys_closesocket(sockfd); - } + //iemnet__closesocket(sockfd); pthread_join(s->thread, NULL); DEBUG("thread joined"); @@ -191,18 +218,23 @@ void iemnet__sender_destroy(t_iemnet_sender*s) { pthread_mutex_destroy (&s->mtx); memset(s, 0, sizeof(t_iemnet_sender)); + s->sockfd = -1; free(s); s=NULL; DEBUG("destroyed sender"); } -t_iemnet_sender*iemnet__sender_create(int sock) { +t_iemnet_sender*iemnet__sender_create(int sock, + t_iemnet_sendfunction sendfun, const void*userdata, + int subthread) +{ static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; - t_iemnet_sender*result=(t_iemnet_sender*)malloc(sizeof(t_iemnet_sender)); + t_iemnet_sender*result=(t_iemnet_sender*)calloc(1, + sizeof(t_iemnet_sender)); int res=0; DEBUG("create sender %x", result); - if(NULL==result){ + if(NULL==result) { DEBUG("create sender failed"); return NULL; } @@ -211,23 +243,28 @@ t_iemnet_sender*iemnet__sender_create(int sock) { result->sockfd = sock; result->keepsending =1; result->isrunning=1; + result->sendfun=sendfun; + result->userdata=userdata; DEBUG("create_sender queue=%x", result->queue); memcpy(&result->mtx , &mtx, sizeof(pthread_mutex_t)); res=pthread_create(&result->thread, 0, iemnet__sender_sendthread, result); if(0==res) { - } else { // something went wrong + queue_destroy(result->queue); + free(result); + return NULL; } DEBUG("created sender"); return result; } -int iemnet__sender_getlasterror(t_iemnet_sender*x) { - x=NULL; +/* coverity[param_set_but_not_used]: as x is there for potentially more specific implentations in the future */ +int iemnet__sender_getlasterror(t_iemnet_sender*x) +{ #ifdef _WIN32 return WSAGetLastError(); #endif @@ -235,27 +272,35 @@ int iemnet__sender_getlasterror(t_iemnet_sender*x) { } -int iemnet__sender_getsockopt(t_iemnet_sender*s, int level, int optname, void *optval, socklen_t*optlen) { +int iemnet__sender_getsockopt(t_iemnet_sender*s, int level, int optname, + void *optval, socklen_t*optlen) +{ int result=getsockopt(s->sockfd, level, optname, optval, optlen); if(result!=0) { - post("%s: getsockopt returned %d", __FUNCTION__, iemnet__sender_getlasterror(s)); + post("%s: getsockopt returned %d", __FUNCTION__, + iemnet__sender_getlasterror(s)); } return result; } -int iemnet__sender_setsockopt(t_iemnet_sender*s, int level, int optname, const void*optval, socklen_t optlen) { +int iemnet__sender_setsockopt(t_iemnet_sender*s, int level, int optname, + const void*optval, socklen_t optlen) +{ int result=setsockopt(s->sockfd, level, optname, optval, optlen); if(result!=0) { - post("%s: setsockopt returned %d", __FUNCTION__, iemnet__sender_getlasterror(s)); + post("%s: setsockopt returned %d", __FUNCTION__, + iemnet__sender_getlasterror(s)); } return result; } -int iemnet__sender_getsize(t_iemnet_sender*x) { +int iemnet__sender_getsize(t_iemnet_sender*x) +{ int size=-1; - if(x && x->queue) + if(x && x->queue) { size=queue_getsize(x->queue); + } return size; } -- cgit v1.2.1