aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--iemnet_data.c166
-rw-r--r--iemnet_sender.c43
2 files changed, 139 insertions, 70 deletions
diff --git a/iemnet_data.c b/iemnet_data.c
index 9969e9a..e671d7d 100644
--- a/iemnet_data.c
+++ b/iemnet_data.c
@@ -203,6 +203,7 @@ struct _iemnet_queue {
/* push a chunk into the queue
+ * this will return the current queue size
*/
int queue_push(
t_iemnet_queue* const _this,
@@ -211,37 +212,39 @@ int queue_push(
t_node* tail;
t_node* n=NULL;
int size=-1;
- if(_this) {
- size=_this->size;
+ if(NULL == _this)return size;
- if(NULL == data) return size;
- //fprintf(stderr, "pushing %d bytes\n", data->size);
+ pthread_mutex_lock(&_this->mtx);
+ size=_this->size;
+ pthread_mutex_unlock(&_this->mtx);
- n=(t_node*)getbytes(sizeof(t_node));
+ if(NULL == data) return size;
- n->next = 0;
- n->data = data;
- pthread_mutex_lock(&_this->mtx);
- if (! (tail = _this->tail)) {
- _this->head = n;
- } else {
- tail->next = n;
- }
- _this->tail = n;
-
- _this->size+=data->size;
- size=_this->size;
+ n=(t_node*)getbytes(sizeof(t_node));
- //fprintf(stderr, "pushed %d bytes\n", data->size);
+ n->next = 0;
+ n->data = data;
- pthread_mutex_unlock(&_this->mtx);
- pthread_cond_signal(&_this->cond);
+ pthread_mutex_lock(&_this->mtx);
+ if (! (tail = _this->tail)) {
+ _this->head = n;
+ } else {
+ tail->next = n;
}
+ _this->tail = n;
+
+ _this->size+=data->size;
+ size=_this->size;
+
+ // added new chunk, so tell waiting threads that they can pop the data
+ pthread_cond_signal(&_this->cond);
+ pthread_mutex_unlock(&_this->mtx);
+
return size;
}
-/* push a chunk from the queue
+/* pop a chunk from the queue
* if the queue is empty, this will block until
* something has been pushed
* OR the queue is "done" (in which case NULL is returned)
@@ -252,61 +255,93 @@ t_iemnet_chunk* queue_pop_block(
t_node* head=0;
t_iemnet_chunk*data=0;
- if(_this) {
- pthread_mutex_lock(&_this->mtx);
-
- /* wait until there is something in the queue or the "done" flag is set */
- while (! (head = _this->head)) {
- if(_this->done) {
- pthread_mutex_unlock(&_this->mtx);
- return NULL;
- } else {
- pthread_cond_wait(&_this->cond, &_this->mtx);
- }
+ if(NULL == _this)return NULL;
+
+ pthread_mutex_lock(&_this->mtx);
+#if 1
+
+ /* if the queue is empty, wait */
+ if(NULL == _this->head) {
+ pthread_cond_wait(&_this->cond, &_this->mtx);
+ /* somebody signaled us, that we should do some work
+ * either the queue has been filled, or we are done...
+ */
+ if(_this->done) {
+ pthread_mutex_unlock(&_this->mtx);
+ return NULL;
}
+ }
+ /* save the head, below we gonna work on this */
+ head = _this->head;
- if (! (_this->head = head->next)) {
- _this->tail = 0;
- }
- if(head && head->data) {
- _this->size-=head->data->size;
- }
+ /* update _this */
+ if (! (_this->head = head->next)) {
+ _this->tail = 0;
+ }
+ if(head && head->data) {
+ _this->size-=head->data->size;
+ }
- pthread_mutex_unlock(&_this->mtx);
- if(head) {
- data=head->data;
- freebytes(head, sizeof(t_node));
- head=NULL;
+ pthread_mutex_unlock(&_this->mtx);
+#else
+ /* wait until there is something in the queue or the "done" flag is set */
+ while (! (head = _this->head)) {
+ if(_this->done) {
+ pthread_mutex_unlock(&_this->mtx);
+ return NULL;
+ } else {
+ pthread_cond_wait(&_this->cond, &_this->mtx);
}
}
+
+ if (! (_this->head = head->next)) {
+ _this->tail = 0;
+ }
+ if(head && head->data) {
+ _this->size-=head->data->size;
+ }
+
+ pthread_mutex_unlock(&_this->mtx);
+#endif
+
+ if(head) {
+ data=head->data;
+ freebytes(head, sizeof(t_node));
+ head=NULL;
+ }
return data;
}
-
+/* pop a chunk from the queue
+ * if the queue is empty, this will immediately return NULL
+ * (note that despite of the name this does block for synchronization)
+ */
t_iemnet_chunk* queue_pop_noblock(
t_iemnet_queue* const _this
) {
t_node* head=0;
t_iemnet_chunk*data=0;
- if(_this) {
- pthread_mutex_lock(&_this->mtx);
- if (! (head = _this->head)) {
- // empty head
- pthread_mutex_unlock(&_this->mtx);
- return NULL;
- }
- if (! (_this->head = head->next)) {
- _this->tail = 0;
- }
- if(head && head->data) {
- _this->size-=head->data->size;
- }
+ if(NULL == _this)return NULL;
+ pthread_mutex_lock(&_this->mtx);
+ if (! (head = _this->head)) {
+ // empty head
pthread_mutex_unlock(&_this->mtx);
- if(head) {
- data=head->data;
- freebytes(head, sizeof(t_node));
- head=NULL;
- }
+ return NULL;
+ }
+
+ if (! (_this->head = head->next)) {
+ _this->tail = 0;
+ }
+ if(head && head->data) {
+ _this->size-=head->data->size;
+ }
+
+ pthread_mutex_unlock(&_this->mtx);
+
+ if(head) {
+ data=head->data;
+ freebytes(head, sizeof(t_node));
+ head=NULL;
}
return data;
}
@@ -326,12 +361,15 @@ int queue_getsize(t_iemnet_queue* const _this) {
}
void queue_finish(t_iemnet_queue* q) {
DEBUG("queue_finish: %x", q);
- if(NULL==q)
- return;
+ if(NULL==q) return;
+
+ pthread_mutex_lock(&q->mtx);
q->done=1;
DEBUG("queue signaling: %x", q);
pthread_cond_signal(&q->cond);
DEBUG("queue signaled: %x", q);
+ pthread_mutex_unlock(&q->mtx);
+ DEBUG("queue_finished: %x", q);
}
void queue_destroy(t_iemnet_queue* q) {
diff --git a/iemnet_sender.c b/iemnet_sender.c
index 20fb93b..bdbc26d 100644
--- a/iemnet_sender.c
+++ b/iemnet_sender.c
@@ -28,6 +28,14 @@
#include <pthread.h>
+#if IEMNET_HAVE_DEBUG
+static int debug_lockcount=0;
+# define LOCK(x) do {pthread_mutex_lock(x); debug_lockcount++; post(" LOCK %d (@%d)", debug_lockcount, __LINE__); } while(0)
+# define UNLOCK(x) do {debug_lockcount--;post("UNLOCK %d (@%d)", debug_lockcount, __LINE__);pthread_mutex_unlock(x);}while(0)
+#else
+# define LOCK(x) pthread_mutex_lock(x)
+# define UNLOCK(x) pthread_mutex_unlock(x)
+#endif
/* draft:
* - there is a sender thread for each open connection
@@ -42,6 +50,8 @@ struct _iemnet_sender {
t_iemnet_queue*queue;
int keepsending; // indicates whether we want to thread to continue or to terminate
int isrunning;
+
+ pthread_mutex_t mtx; /* mutex to protect isrunning,.. */
};
/* the workhorse of the family */
@@ -92,15 +102,23 @@ static void*iemnet__sender_sendthread(void*arg) {
while(sender->keepsending) {
if(!iemnet__sender_dosend(sockfd, q))break;
}
+ LOCK (&sender->mtx);
sender->isrunning=0;
+ UNLOCK (&sender->mtx);
DEBUG("send thread terminated");
return NULL;
}
int iemnet__sender_send(t_iemnet_sender*s, t_iemnet_chunk*c) {
- t_iemnet_queue*q=s->queue;
+ t_iemnet_queue*q=0;
int size=-1;
- if(!s->isrunning)return -1;
+ LOCK (&s->mtx);
+ q=s->queue;
+ if(!s->isrunning) {
+ UNLOCK (&s->mtx);
+ return -1;
+ }
+ UNLOCK (&s->mtx);
if(q) {
t_iemnet_chunk*chunk=iemnet__chunk_create_chunk(c);
size = queue_push(q, chunk);
@@ -109,27 +127,37 @@ int iemnet__sender_send(t_iemnet_sender*s, t_iemnet_chunk*c) {
}
void iemnet__sender_destroy(t_iemnet_sender*s) {
- int sockfd=s->sockfd;
+ int sockfd=-1;
/* simple protection against recursive calls:
* s->keepsending is only set to "0" in here,
* so if it is false, we know that we are already being called
*/
DEBUG("destroy sender %x with queue %x (%d)", s, s->queue, s->keepsending);
- if(!s->keepsending)return;
+ LOCK (&s->mtx);
+ sockfd=s->sockfd;
+ // check s->isrunning
+ DEBUG("keepsending %d\tisrunning %d", s->keepsending, s->isrunning);
+ if(!s->keepsending) {
+ UNLOCK (&s->mtx);
+ return;
+ }
s->keepsending=0;
+ UNLOCK (&s->mtx);
queue_finish(s->queue);
DEBUG("queue finished");
- s->sockfd = -1;
if(sockfd>=0) {
- shutdown(sockfd, 2); /* needed on linux, since the recv won't shutdown on sys_closesocket() alone */
+ int err=shutdown(sockfd, 2); /* needed on linux, since the recv won't shutdown on sys_closesocket() alone */
sys_closesocket(sockfd);
}
+
pthread_join(s->thread, NULL);
DEBUG("thread joined");
queue_destroy(s->queue);
+ pthread_mutex_destroy (&s->mtx);
+
memset(s, 0, sizeof(t_iemnet_sender));
freebytes(s, sizeof(t_iemnet_sender));
s=NULL;
@@ -138,6 +166,7 @@ void iemnet__sender_destroy(t_iemnet_sender*s) {
t_iemnet_sender*iemnet__sender_create(int sock) {
+ static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
t_iemnet_sender*result=(t_iemnet_sender*)getbytes(sizeof(t_iemnet_sender));
int res=0;
DEBUG("create sender %x", result);
@@ -151,6 +180,8 @@ t_iemnet_sender*iemnet__sender_create(int sock) {
result->keepsending =1;
result->isrunning=1;
DEBUG("create_sender queue=%x", result->queue);
+
+ memcpy(&result->mtx , &mtx, sizeof(pthread_mutex_t));
res=pthread_create(&result->thread, 0, iemnet__sender_sendthread, result);
if(0==res) {