aboutsummaryrefslogtreecommitdiff
path: root/iemnet_sender.c
diff options
context:
space:
mode:
Diffstat (limited to 'iemnet_sender.c')
-rw-r--r--iemnet_sender.c161
1 files changed, 103 insertions, 58 deletions
diff --git a/iemnet_sender.c b/iemnet_sender.c
index 0e4f866..8cf1d14 100644
--- a/iemnet_sender.c
+++ b/iemnet_sender.c
@@ -4,7 +4,7 @@
* sends data "chunks" to a socket
* possibly threaded
*
- * copyright (c) 2010 IOhannes m zmölnig, IEM
+ * copyright (c) 2010 IOhannes m zmölnig, IEM
*/
/* This program is free software; you can redistribute it and/or */
@@ -18,9 +18,8 @@
/* GNU General Public License for more details. */
/* */
/* You should have received a copy of the GNU General Public License */
-/* along with this program; if not, write to the Free Software */
-/* Foundation, Inc., */
-/* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
+/* along with this program; if not, see */
+/* http://www.gnu.org/licenses/ */
/* */
#define DEBUGLEVEL 2
@@ -54,11 +53,11 @@ static int debug_lockcount=0;
# define UNLOCK(x) pthread_mutex_unlock(x)
#endif
- /* draft:
- * - there is a sender thread for each open connection
- * - the main thread just adds chunks to each sender threads processing queue
- * - the sender thread tries to send the queue as fast as possible
- */
+/* draft:
+ * - there is a sender thread for each open connection
+ * - the main thread just adds chunks to each sender threads processing queue
+ * - the sender thread tries to send the queue as fast as possible
+ */
struct _iemnet_sender {
pthread_t thread;
@@ -68,62 +67,92 @@ struct _iemnet_sender {
int keepsending; // indicates whether we want to thread to continue or to terminate
int isrunning;
+ const void*userdata; /* user provided data */
+ t_iemnet_sendfunction sendfun; /* user provided send function */
+
pthread_mutex_t mtx; /* mutex to protect isrunning,.. */
};
/* the workhorse of the family */
-static int iemnet__sender_dosend(int sockfd, t_iemnet_queue*q) {
+
+static int iemnet__sender_defaultsend(const void*x, int sockfd,
+ t_iemnet_chunk*c)
+{
+ int result=-1;
+
struct sockaddr_in to;
socklen_t tolen = sizeof(to);
- t_iemnet_chunk*c=queue_pop_block(q);
- if(c) {
- unsigned char*data=c->data;
- unsigned int size=c->size;
-
- int result=-1;
+ unsigned char*data=c->data;
+ unsigned int size=c->size;
- // fprintf(stderr, "sending %d bytes at %x to %d\n", size, data, sockfd);
- if(c->port) {
- DEBUG("sending %d bytes to %x:%d", size, c->addr, c->port);
+ int flags = 0;
+#ifdef __linux__
+ flags |= MSG_NOSIGNAL;
+#endif
- to.sin_addr.s_addr=htonl(c->addr);
- to.sin_port =htons(c->port);
- result = sendto(sockfd, data, size, 0, (struct sockaddr *)&to, tolen);
- } else {
- DEBUG("sending %d bytes", size);
- result = send(sockfd, data, size, 0);
- }
- if(result<0) {
- // broken pipe
- return 0;
- }
+ // fprintf(stderr, "sending %d bytes at %x to %d\n", size, data, sockfd);
+ if(c->port) {
+ DEBUG("sending %d bytes to %x:%d @%d", size, c->addr, c->port, c->family);
- // shouldn't we do something with the result here?
- DEBUG("sent %d bytes", result);
- iemnet__chunk_destroy(c);
+ to.sin_addr.s_addr=htonl(c->addr);
+ to.sin_port =htons(c->port);
+ to.sin_family =c->family;
+ result = sendto(sockfd,
+ data, size, /* DATA */
+ flags, /* FLAGS */
+ (struct sockaddr *)&to, tolen); /* DESTADDR */
} else {
+ DEBUG("sending %d bytes", size);
+ result = send(sockfd,
+ data, size, /* DATA */
+ flags); /* FLAGS */
+ }
+ if(result<0) {
+ // broken pipe
return 0;
}
+
+ // shouldn't we do something with the result here?
+ DEBUG("sent %d bytes", result);
return 1;
}
-static void*iemnet__sender_sendthread(void*arg) {
+static void*iemnet__sender_sendthread(void*arg)
+{
t_iemnet_sender*sender=(t_iemnet_sender*)arg;
int sockfd=-1;
t_iemnet_queue*q=NULL;
+ t_iemnet_chunk*c=NULL;
+ t_iemnet_sendfunction dosend=iemnet__sender_defaultsend;
+ const void*userdata=NULL;
LOCK(&sender->mtx);
- sockfd=sender->sockfd;
q=sender->queue;
+ userdata=sender->userdata;
+ if(NULL!=sender->sendfun) {
+ dosend=sender->sendfun;
+ }
+
+ sockfd=sender->sockfd;
+
+
while(sender->keepsending) {
UNLOCK(&sender->mtx);
- if(!iemnet__sender_dosend(sockfd, q)){
- LOCK(&sender->mtx);
- break;
+
+ c=queue_pop_block(q);
+ if(c) {
+ if(!dosend(userdata, sockfd, c)) {
+ iemnet__chunk_destroy(c);
+
+ LOCK(&sender->mtx);
+ break;
+ }
+ iemnet__chunk_destroy(c);
+ c=NULL;
}
LOCK(&sender->mtx);
}
@@ -133,7 +162,8 @@ static void*iemnet__sender_sendthread(void*arg) {
return NULL;
}
-int iemnet__sender_send(t_iemnet_sender*s, t_iemnet_chunk*c) {
+int iemnet__sender_send(t_iemnet_sender*s, t_iemnet_chunk*c)
+{
t_iemnet_queue*q=0;
int size=-1;
LOCK (&s->mtx);
@@ -150,10 +180,11 @@ int iemnet__sender_send(t_iemnet_sender*s, t_iemnet_chunk*c) {
return size;
}
-void iemnet__sender_destroy(t_iemnet_sender*s) {
+void iemnet__sender_destroy(t_iemnet_sender*s, int subthread)
+{
int sockfd=-1;
/* simple protection against recursive calls:
- * s->keepsending is only set to "0" in here,
+ * s->keepsending is only set to "0" in here,
* so if it is false, we know that we are already being called
*/
DEBUG("destroy sender %x with queue %x (%d)", s, s->queue, s->keepsending);
@@ -178,11 +209,7 @@ void iemnet__sender_destroy(t_iemnet_sender*s) {
queue_finish(s->queue);
DEBUG("queue finished");
-
- if(sockfd>=0) {
- int err=shutdown(sockfd, 2); /* needed on linux, since the recv won't shutdown on sys_closesocket() alone */
- sys_closesocket(sockfd);
- }
+ //iemnet__closesocket(sockfd);
pthread_join(s->thread, NULL);
DEBUG("thread joined");
@@ -191,18 +218,23 @@ void iemnet__sender_destroy(t_iemnet_sender*s) {
pthread_mutex_destroy (&s->mtx);
memset(s, 0, sizeof(t_iemnet_sender));
+ s->sockfd = -1;
free(s);
s=NULL;
DEBUG("destroyed sender");
}
-t_iemnet_sender*iemnet__sender_create(int sock) {
+t_iemnet_sender*iemnet__sender_create(int sock,
+ t_iemnet_sendfunction sendfun, const void*userdata,
+ int subthread)
+{
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
- t_iemnet_sender*result=(t_iemnet_sender*)malloc(sizeof(t_iemnet_sender));
+ t_iemnet_sender*result=(t_iemnet_sender*)calloc(1,
+ sizeof(t_iemnet_sender));
int res=0;
DEBUG("create sender %x", result);
- if(NULL==result){
+ if(NULL==result) {
DEBUG("create sender failed");
return NULL;
}
@@ -211,23 +243,28 @@ t_iemnet_sender*iemnet__sender_create(int sock) {
result->sockfd = sock;
result->keepsending =1;
result->isrunning=1;
+ result->sendfun=sendfun;
+ result->userdata=userdata;
DEBUG("create_sender queue=%x", result->queue);
memcpy(&result->mtx , &mtx, sizeof(pthread_mutex_t));
res=pthread_create(&result->thread, 0, iemnet__sender_sendthread, result);
if(0==res) {
-
} else {
// something went wrong
+ queue_destroy(result->queue);
+ free(result);
+ return NULL;
}
DEBUG("created sender");
return result;
}
-int iemnet__sender_getlasterror(t_iemnet_sender*x) {
- x=NULL;
+/* coverity[param_set_but_not_used]: as x is there for potentially more specific implentations in the future */
+int iemnet__sender_getlasterror(t_iemnet_sender*x)
+{
#ifdef _WIN32
return WSAGetLastError();
#endif
@@ -235,27 +272,35 @@ int iemnet__sender_getlasterror(t_iemnet_sender*x) {
}
-int iemnet__sender_getsockopt(t_iemnet_sender*s, int level, int optname, void *optval, socklen_t*optlen) {
+int iemnet__sender_getsockopt(t_iemnet_sender*s, int level, int optname,
+ void *optval, socklen_t*optlen)
+{
int result=getsockopt(s->sockfd, level, optname, optval, optlen);
if(result!=0) {
- post("%s: getsockopt returned %d", __FUNCTION__, iemnet__sender_getlasterror(s));
+ post("%s: getsockopt returned %d", __FUNCTION__,
+ iemnet__sender_getlasterror(s));
}
return result;
}
-int iemnet__sender_setsockopt(t_iemnet_sender*s, int level, int optname, const void*optval, socklen_t optlen) {
+int iemnet__sender_setsockopt(t_iemnet_sender*s, int level, int optname,
+ const void*optval, socklen_t optlen)
+{
int result=setsockopt(s->sockfd, level, optname, optval, optlen);
if(result!=0) {
- post("%s: setsockopt returned %d", __FUNCTION__, iemnet__sender_getlasterror(s));
+ post("%s: setsockopt returned %d", __FUNCTION__,
+ iemnet__sender_getlasterror(s));
}
return result;
}
-int iemnet__sender_getsize(t_iemnet_sender*x) {
+int iemnet__sender_getsize(t_iemnet_sender*x)
+{
int size=-1;
- if(x && x->queue)
+ if(x && x->queue) {
size=queue_getsize(x->queue);
+ }
return size;
}