aboutsummaryrefslogtreecommitdiff
path: root/iemnet_data.c
diff options
context:
space:
mode:
authorIOhannes m zmölnig <zmoelnig@users.sourceforge.net>2010-08-20 16:54:42 +0000
committerIOhannes m zmölnig <zmoelnig@users.sourceforge.net>2010-08-20 16:54:42 +0000
commite82deafc3a75cd51ff3d93069a60c37c334fef5c (patch)
tree1ea23e256b8e84cb8220c3b9369a50f6fccd132d /iemnet_data.c
parenta9421cbe6bad6c9578431873aaea718f90040cd5 (diff)
fixed bugs in multithreading code
svn path=/trunk/externals/iem/iemnet/; revision=13871
Diffstat (limited to 'iemnet_data.c')
-rw-r--r--iemnet_data.c166
1 files changed, 102 insertions, 64 deletions
diff --git a/iemnet_data.c b/iemnet_data.c
index 9969e9a..e671d7d 100644
--- a/iemnet_data.c
+++ b/iemnet_data.c
@@ -203,6 +203,7 @@ struct _iemnet_queue {
/* push a chunk into the queue
+ * this will return the current queue size
*/
int queue_push(
t_iemnet_queue* const _this,
@@ -211,37 +212,39 @@ int queue_push(
t_node* tail;
t_node* n=NULL;
int size=-1;
- if(_this) {
- size=_this->size;
+ if(NULL == _this)return size;
- if(NULL == data) return size;
- //fprintf(stderr, "pushing %d bytes\n", data->size);
+ pthread_mutex_lock(&_this->mtx);
+ size=_this->size;
+ pthread_mutex_unlock(&_this->mtx);
- n=(t_node*)getbytes(sizeof(t_node));
+ if(NULL == data) return size;
- n->next = 0;
- n->data = data;
- pthread_mutex_lock(&_this->mtx);
- if (! (tail = _this->tail)) {
- _this->head = n;
- } else {
- tail->next = n;
- }
- _this->tail = n;
-
- _this->size+=data->size;
- size=_this->size;
+ n=(t_node*)getbytes(sizeof(t_node));
- //fprintf(stderr, "pushed %d bytes\n", data->size);
+ n->next = 0;
+ n->data = data;
- pthread_mutex_unlock(&_this->mtx);
- pthread_cond_signal(&_this->cond);
+ pthread_mutex_lock(&_this->mtx);
+ if (! (tail = _this->tail)) {
+ _this->head = n;
+ } else {
+ tail->next = n;
}
+ _this->tail = n;
+
+ _this->size+=data->size;
+ size=_this->size;
+
+ // added new chunk, so tell waiting threads that they can pop the data
+ pthread_cond_signal(&_this->cond);
+ pthread_mutex_unlock(&_this->mtx);
+
return size;
}
-/* push a chunk from the queue
+/* pop a chunk from the queue
* if the queue is empty, this will block until
* something has been pushed
* OR the queue is "done" (in which case NULL is returned)
@@ -252,61 +255,93 @@ t_iemnet_chunk* queue_pop_block(
t_node* head=0;
t_iemnet_chunk*data=0;
- if(_this) {
- pthread_mutex_lock(&_this->mtx);
-
- /* wait until there is something in the queue or the "done" flag is set */
- while (! (head = _this->head)) {
- if(_this->done) {
- pthread_mutex_unlock(&_this->mtx);
- return NULL;
- } else {
- pthread_cond_wait(&_this->cond, &_this->mtx);
- }
+ if(NULL == _this)return NULL;
+
+ pthread_mutex_lock(&_this->mtx);
+#if 1
+
+ /* if the queue is empty, wait */
+ if(NULL == _this->head) {
+ pthread_cond_wait(&_this->cond, &_this->mtx);
+ /* somebody signaled us, that we should do some work
+ * either the queue has been filled, or we are done...
+ */
+ if(_this->done) {
+ pthread_mutex_unlock(&_this->mtx);
+ return NULL;
}
+ }
+ /* save the head, below we gonna work on this */
+ head = _this->head;
- if (! (_this->head = head->next)) {
- _this->tail = 0;
- }
- if(head && head->data) {
- _this->size-=head->data->size;
- }
+ /* update _this */
+ if (! (_this->head = head->next)) {
+ _this->tail = 0;
+ }
+ if(head && head->data) {
+ _this->size-=head->data->size;
+ }
- pthread_mutex_unlock(&_this->mtx);
- if(head) {
- data=head->data;
- freebytes(head, sizeof(t_node));
- head=NULL;
+ pthread_mutex_unlock(&_this->mtx);
+#else
+ /* wait until there is something in the queue or the "done" flag is set */
+ while (! (head = _this->head)) {
+ if(_this->done) {
+ pthread_mutex_unlock(&_this->mtx);
+ return NULL;
+ } else {
+ pthread_cond_wait(&_this->cond, &_this->mtx);
}
}
+
+ if (! (_this->head = head->next)) {
+ _this->tail = 0;
+ }
+ if(head && head->data) {
+ _this->size-=head->data->size;
+ }
+
+ pthread_mutex_unlock(&_this->mtx);
+#endif
+
+ if(head) {
+ data=head->data;
+ freebytes(head, sizeof(t_node));
+ head=NULL;
+ }
return data;
}
-
+/* pop a chunk from the queue
+ * if the queue is empty, this will immediately return NULL
+ * (note that despite of the name this does block for synchronization)
+ */
t_iemnet_chunk* queue_pop_noblock(
t_iemnet_queue* const _this
) {
t_node* head=0;
t_iemnet_chunk*data=0;
- if(_this) {
- pthread_mutex_lock(&_this->mtx);
- if (! (head = _this->head)) {
- // empty head
- pthread_mutex_unlock(&_this->mtx);
- return NULL;
- }
- if (! (_this->head = head->next)) {
- _this->tail = 0;
- }
- if(head && head->data) {
- _this->size-=head->data->size;
- }
+ if(NULL == _this)return NULL;
+ pthread_mutex_lock(&_this->mtx);
+ if (! (head = _this->head)) {
+ // empty head
pthread_mutex_unlock(&_this->mtx);
- if(head) {
- data=head->data;
- freebytes(head, sizeof(t_node));
- head=NULL;
- }
+ return NULL;
+ }
+
+ if (! (_this->head = head->next)) {
+ _this->tail = 0;
+ }
+ if(head && head->data) {
+ _this->size-=head->data->size;
+ }
+
+ pthread_mutex_unlock(&_this->mtx);
+
+ if(head) {
+ data=head->data;
+ freebytes(head, sizeof(t_node));
+ head=NULL;
}
return data;
}
@@ -326,12 +361,15 @@ int queue_getsize(t_iemnet_queue* const _this) {
}
void queue_finish(t_iemnet_queue* q) {
DEBUG("queue_finish: %x", q);
- if(NULL==q)
- return;
+ if(NULL==q) return;
+
+ pthread_mutex_lock(&q->mtx);
q->done=1;
DEBUG("queue signaling: %x", q);
pthread_cond_signal(&q->cond);
DEBUG("queue signaled: %x", q);
+ pthread_mutex_unlock(&q->mtx);
+ DEBUG("queue_finished: %x", q);
}
void queue_destroy(t_iemnet_queue* q) {