/* iemnet * * sender * sends data "chunks" to a socket * possibly threaded * * copyright © 2010-2015 IOhannes m zmölnig, IEM */ /* This program is free software; you can redistribute it and/or */ /* modify it under the terms of the GNU General Public License */ /* as published by the Free Software Foundation; either version 2 */ /* of the License, or (at your option) any later version. */ /* */ /* This program is distributed in the hope that it will be useful, */ /* but WITHOUT ANY WARRANTY; without even the implied warranty of */ /* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */ /* GNU General Public License for more details. */ /* */ /* You should have received a copy of the GNU General Public License */ /* along with this program; if not, see */ /* http://www.gnu.org/licenses/ */ /* */ #define DEBUGLEVEL 2 #include "iemnet.h" #include "iemnet_data.h" #include #include #include #include #include #ifdef _WIN32 # include # include /* for socklen_t */ #else # include #endif #include #if IEMNET_HAVE_DEBUG static int debug_lockcount=0; # define LOCK(x) do {if(iemnet_debug(DEBUGLEVEL, __FILE__, __LINE__, __FUNCTION__))post(" LOCKing %p", x); pthread_mutex_lock(x);debug_lockcount++; if(iemnet_debug(DEBUGLEVEL, __FILE__, __LINE__, __FUNCTION__))post(" LOCKed %p[%d]", x, debug_lockcount); } while(0) # define UNLOCK(x) do {debug_lockcount--;if(iemnet_debug(DEBUGLEVEL, __FILE__, __LINE__, __FUNCTION__))post(" UNLOCK %p [%d]", x, debug_lockcount); pthread_mutex_unlock(x);}while(0) #else # define LOCK(x) pthread_mutex_lock(x) # define UNLOCK(x) pthread_mutex_unlock(x) #endif /* draft: * - there is a sender thread for each open connection * - the main thread just adds chunks to each sender threads processing queue * - the sender thread tries to send the queue as fast as possible */ struct _iemnet_sender { pthread_t thread; int sockfd; /* owned outside; must call iemnet__sender_destroy() before freeing socket yourself */ t_iemnet_queue*queue; int keepsending; // indicates whether we want to thread to continue or to terminate int isrunning; const void*userdata; /* user provided data */ t_iemnet_sendfunction sendfun; /* user provided send function */ pthread_mutex_t mtx; /* mutex to protect isrunning,.. */ }; /* the workhorse of the family */ static int iemnet__sender_defaultsend(const void*x, int sockfd, t_iemnet_chunk*c) { int result=-1; struct sockaddr_in to; socklen_t tolen = sizeof(to); unsigned char*data=c->data; unsigned int size=c->size; int flags = 0; #ifdef __linux__ flags |= MSG_NOSIGNAL; #endif // fprintf(stderr, "sending %d bytes at %x to %d\n", size, data, sockfd); if(c->port) { DEBUG("sending %d bytes to %x:%d @%d", size, c->addr, c->port, c->family); to.sin_addr.s_addr=htonl(c->addr); to.sin_port =htons(c->port); to.sin_family =c->family; result = sendto(sockfd, data, size, /* DATA */ flags, /* FLAGS */ (struct sockaddr *)&to, tolen); /* DESTADDR */ } else { DEBUG("sending %d bytes", size); result = send(sockfd, data, size, /* DATA */ flags); /* FLAGS */ } if(result<0) { // broken pipe return 0; } // shouldn't we do something with the result here? DEBUG("sent %d bytes", result); return 1; } static void*iemnet__sender_sendthread(void*arg) { t_iemnet_sender*sender=(t_iemnet_sender*)arg; int sockfd=-1; t_iemnet_queue*q=NULL; t_iemnet_chunk*c=NULL; t_iemnet_sendfunction dosend=iemnet__sender_defaultsend; const void*userdata=NULL; LOCK(&sender->mtx); q=sender->queue; userdata=sender->userdata; if(NULL!=sender->sendfun) { dosend=sender->sendfun; } sockfd=sender->sockfd; while(sender->keepsending) { UNLOCK(&sender->mtx); c=queue_pop_block(q); if(c) { if(!dosend(userdata, sockfd, c)) { iemnet__chunk_destroy(c); LOCK(&sender->mtx); break; } iemnet__chunk_destroy(c); c=NULL; } LOCK(&sender->mtx); } sender->isrunning=0; UNLOCK (&sender->mtx); DEBUG("send thread terminated"); return NULL; } int iemnet__sender_send(t_iemnet_sender*s, t_iemnet_chunk*c) { t_iemnet_queue*q=0; int size=-1; LOCK (&s->mtx); q=s->queue; if(!s->isrunning) { UNLOCK (&s->mtx); return -1; } UNLOCK (&s->mtx); if(q) { t_iemnet_chunk*chunk=iemnet__chunk_create_chunk(c); size = queue_push(q, chunk); } return size; } void iemnet__sender_destroy(t_iemnet_sender*s, int subthread) { int sockfd=-1; /* simple protection against recursive calls: * s->keepsending is only set to "0" in here, * so if it is false, we know that we are already being called */ DEBUG("destroy sender %x with queue %x (%d)", s, s->queue, s->keepsending); LOCK (&s->mtx); sockfd=s->sockfd; // check s->isrunning DEBUG("keepsending %d\tisrunning %d", s->keepsending, s->isrunning); if(!s->keepsending) { UNLOCK (&s->mtx); return; } s->keepsending=0; while(s->isrunning) { s->keepsending=0; queue_finish(s->queue); UNLOCK (&s->mtx); LOCK (&s->mtx); } UNLOCK (&s->mtx); queue_finish(s->queue); DEBUG("queue finished"); pthread_join(s->thread, NULL); DEBUG("thread joined"); queue_destroy(s->queue); pthread_mutex_destroy (&s->mtx); memset(s, 0, sizeof(t_iemnet_sender)); s->sockfd = -1; free(s); s=NULL; DEBUG("destroyed sender"); } t_iemnet_sender*iemnet__sender_create(int sock, t_iemnet_sendfunction sendfun, const void*userdata, int subthread) { static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; t_iemnet_sender*result=(t_iemnet_sender*)calloc(1, sizeof(t_iemnet_sender)); int res=0; DEBUG("create sender %x", result); if(NULL==result) { DEBUG("create sender failed"); return NULL; } result->queue = queue_create(); result->sockfd = sock; result->keepsending =1; result->isrunning=1; result->sendfun=sendfun; result->userdata=userdata; DEBUG("create_sender queue=%x", result->queue); memcpy(&result->mtx , &mtx, sizeof(pthread_mutex_t)); res=pthread_create(&result->thread, 0, iemnet__sender_sendthread, result); if(0==res) { } else { // something went wrong queue_destroy(result->queue); free(result); return NULL; } DEBUG("created sender"); return result; } /* coverity[param_set_but_not_used]: as x is there for potentially more specific implentations in the future */ int iemnet__sender_getlasterror(t_iemnet_sender*x) { #ifdef _WIN32 return WSAGetLastError(); #endif return errno; } int iemnet__sender_getsockopt(t_iemnet_sender*s, int level, int optname, void *optval, socklen_t*optlen) { int result=getsockopt(s->sockfd, level, optname, optval, optlen); if(result!=0) { error("iemnet::sender: getsockopt returned %d", iemnet__sender_getlasterror(s)); } return result; } int iemnet__sender_setsockopt(t_iemnet_sender*s, int level, int optname, const void*optval, socklen_t optlen) { int result=setsockopt(s->sockfd, level, optname, optval, optlen); if(result!=0) { error("iemnet::sender: setsockopt returned %d", iemnet__sender_getlasterror(s)); } return result; } int iemnet__sender_getsize(t_iemnet_sender*x) { int size=-1; if(x && x->queue) { size=queue_getsize(x->queue); } return size; }