aboutsummaryrefslogtreecommitdiff
path: root/iemnet_sender.c
diff options
context:
space:
mode:
Diffstat (limited to 'iemnet_sender.c')
-rw-r--r--iemnet_sender.c43
1 files changed, 37 insertions, 6 deletions
diff --git a/iemnet_sender.c b/iemnet_sender.c
index 20fb93b..bdbc26d 100644
--- a/iemnet_sender.c
+++ b/iemnet_sender.c
@@ -28,6 +28,14 @@
#include <pthread.h>
+#if IEMNET_HAVE_DEBUG
+static int debug_lockcount=0;
+# define LOCK(x) do {pthread_mutex_lock(x); debug_lockcount++; post(" LOCK %d (@%d)", debug_lockcount, __LINE__); } while(0)
+# define UNLOCK(x) do {debug_lockcount--;post("UNLOCK %d (@%d)", debug_lockcount, __LINE__);pthread_mutex_unlock(x);}while(0)
+#else
+# define LOCK(x) pthread_mutex_lock(x)
+# define UNLOCK(x) pthread_mutex_unlock(x)
+#endif
/* draft:
* - there is a sender thread for each open connection
@@ -42,6 +50,8 @@ struct _iemnet_sender {
t_iemnet_queue*queue;
int keepsending; // indicates whether we want to thread to continue or to terminate
int isrunning;
+
+ pthread_mutex_t mtx; /* mutex to protect isrunning,.. */
};
/* the workhorse of the family */
@@ -92,15 +102,23 @@ static void*iemnet__sender_sendthread(void*arg) {
while(sender->keepsending) {
if(!iemnet__sender_dosend(sockfd, q))break;
}
+ LOCK (&sender->mtx);
sender->isrunning=0;
+ UNLOCK (&sender->mtx);
DEBUG("send thread terminated");
return NULL;
}
int iemnet__sender_send(t_iemnet_sender*s, t_iemnet_chunk*c) {
- t_iemnet_queue*q=s->queue;
+ t_iemnet_queue*q=0;
int size=-1;
- if(!s->isrunning)return -1;
+ LOCK (&s->mtx);
+ q=s->queue;
+ if(!s->isrunning) {
+ UNLOCK (&s->mtx);
+ return -1;
+ }
+ UNLOCK (&s->mtx);
if(q) {
t_iemnet_chunk*chunk=iemnet__chunk_create_chunk(c);
size = queue_push(q, chunk);
@@ -109,27 +127,37 @@ int iemnet__sender_send(t_iemnet_sender*s, t_iemnet_chunk*c) {
}
void iemnet__sender_destroy(t_iemnet_sender*s) {
- int sockfd=s->sockfd;
+ int sockfd=-1;
/* simple protection against recursive calls:
* s->keepsending is only set to "0" in here,
* so if it is false, we know that we are already being called
*/
DEBUG("destroy sender %x with queue %x (%d)", s, s->queue, s->keepsending);
- if(!s->keepsending)return;
+ LOCK (&s->mtx);
+ sockfd=s->sockfd;
+ // check s->isrunning
+ DEBUG("keepsending %d\tisrunning %d", s->keepsending, s->isrunning);
+ if(!s->keepsending) {
+ UNLOCK (&s->mtx);
+ return;
+ }
s->keepsending=0;
+ UNLOCK (&s->mtx);
queue_finish(s->queue);
DEBUG("queue finished");
- s->sockfd = -1;
if(sockfd>=0) {
- shutdown(sockfd, 2); /* needed on linux, since the recv won't shutdown on sys_closesocket() alone */
+ int err=shutdown(sockfd, 2); /* needed on linux, since the recv won't shutdown on sys_closesocket() alone */
sys_closesocket(sockfd);
}
+
pthread_join(s->thread, NULL);
DEBUG("thread joined");
queue_destroy(s->queue);
+ pthread_mutex_destroy (&s->mtx);
+
memset(s, 0, sizeof(t_iemnet_sender));
freebytes(s, sizeof(t_iemnet_sender));
s=NULL;
@@ -138,6 +166,7 @@ void iemnet__sender_destroy(t_iemnet_sender*s) {
t_iemnet_sender*iemnet__sender_create(int sock) {
+ static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
t_iemnet_sender*result=(t_iemnet_sender*)getbytes(sizeof(t_iemnet_sender));
int res=0;
DEBUG("create sender %x", result);
@@ -151,6 +180,8 @@ t_iemnet_sender*iemnet__sender_create(int sock) {
result->keepsending =1;
result->isrunning=1;
DEBUG("create_sender queue=%x", result->queue);
+
+ memcpy(&result->mtx , &mtx, sizeof(pthread_mutex_t));
res=pthread_create(&result->thread, 0, iemnet__sender_sendthread, result);
if(0==res) {