From 806df1c7645fdcfc945fb9e1467ea7aaada2b903 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?IOhannes=20m=20zm=C3=B6lnig?= Date: Tue, 30 Mar 2010 07:36:07 +0000 Subject: split core library into separate files svn path=/trunk/externals/iem/iemnet/; revision=13310 --- iemnet_sender.c | 164 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 164 insertions(+) create mode 100644 iemnet_sender.c (limited to 'iemnet_sender.c') diff --git a/iemnet_sender.c b/iemnet_sender.c new file mode 100644 index 0000000..eb1ec3b --- /dev/null +++ b/iemnet_sender.c @@ -0,0 +1,164 @@ +/* iemnet + * + * sender + * sends data "chunks" to a socket + * possibly threaded + * + * copyright (c) 2010 IOhannes m zmölnig, IEM + */ + +//#define DEBUG + +#include "iemnet.h" +#include "iemnet_data.h" + + +#include +#include +#include + +#include + +#ifdef _WIN32 +# include +# include /* for socklen_t */ +#else +# include +#endif + +#include + + + /* draft: + * - there is a sender thread for each open connection + * - the main thread just adds chunks to each sender threads processing queue + * - the sender thread tries to send the queue as fast as possible + */ + +struct _iemnet_sender { + pthread_t thread; + + int sockfd; /* owned outside; must call iemnet__sender_destroy() before freeing socket yourself */ + t_iemnet_queue*queue; + int keepsending; // indicates whether we want to thread to continue or to terminate + int isrunning; +}; + +/* the workhorse of the family */ + +static int iemnet__sender_dosend(int sockfd, t_iemnet_queue*q) { + t_iemnet_chunk*c=queue_pop_block(q); + if(c) { + unsigned char*data=c->data; + unsigned int size=c->size; + int result=-1; + // fprintf(stderr, "sending %d bytes at %x to %d\n", size, data, sockfd); + DEBUG("sending %d bytes", size); + result = send(sockfd, data, size, 0); + if(result<0) { + // broken pipe + return 0; + } + + // shouldn't we do something with the result here? + DEBUG("sent %d bytes", result); + iemnet__chunk_destroy(c); + } else { + return 0; + } + return 1; +} + +static void*iemnet__sender_sendthread(void*arg) { + t_iemnet_sender*sender=(t_iemnet_sender*)arg; + + int sockfd=sender->sockfd; + t_iemnet_queue*q=sender->queue; + + while(sender->keepsending) { + if(!iemnet__sender_dosend(sockfd, q))break; + } + sender->isrunning=0; + DEBUG("send thread terminated"); + return NULL; +} + +int iemnet__sender_send(t_iemnet_sender*s, t_iemnet_chunk*c) { + t_iemnet_queue*q=s->queue; + int size=-1; + if(!s->isrunning)return -1; + if(q) { + t_iemnet_chunk*chunk=iemnet__chunk_create_chunk(c); + size = queue_push(q, chunk); + } + return size; +} + +void iemnet__sender_destroy(t_iemnet_sender*s) { + /* simple protection against recursive calls: + * s->keepsending is only set to "0" in here, + * so if it is false, we know that we are already being called + */ + if(!s->keepsending)return; + + DEBUG("destroy sender %x", s); + s->keepsending=0; + queue_finish(s->queue); + DEBUG("queue finished"); + s->sockfd = -1; + pthread_join(s->thread, NULL); + DEBUG("thread joined"); + queue_destroy(s->queue); + freebytes(s, sizeof(t_iemnet_sender)); + s=NULL; + DEBUG("destroyed sender"); +} +t_iemnet_sender*iemnet__sender_create(int sock) { + t_iemnet_sender*result=(t_iemnet_sender*)getbytes(sizeof(t_iemnet_sender)); + int res=0; + DEBUG("create sender %x", result); + if(NULL==result){ + DEBUG("create sender failed"); + return NULL; + } + + result->queue = queue_create(); + result->sockfd = sock; + result->keepsending =1; + result->isrunning=1; + + res=pthread_create(&result->thread, 0, iemnet__sender_sendthread, result); + + if(0==res) { + + } else { + // something went wrong + } + + DEBUG("created sender"); + return result; +} + +int iemnet__sender_getlasterror(t_iemnet_sender*x) { + x=NULL; +#ifdef _WIN32 + return WSAGetLastError(); +#endif + return errno; +} + + +int iemnet__sender_getsockopt(t_iemnet_sender*s, int level, int optname, void *optval, socklen_t*optlen) { + int result=getsockopt(s->sockfd, level, optname, optval, optlen); + if(result!=0) { + post("%s: getsockopt returned %d", __FUNCTION__, iemnet__sender_getlasterror(s)); + } + return result; +} +int iemnet__sender_setsockopt(t_iemnet_sender*s, int level, int optname, const void*optval, socklen_t optlen) { + int result=setsockopt(s->sockfd, level, optname, optval, optlen); + if(result!=0) { + post("%s: setsockopt returned %d", __FUNCTION__, iemnet__sender_getlasterror(s)); + } + return result; +} -- cgit v1.2.1