aboutsummaryrefslogtreecommitdiff
path: root/puredata/pdp_queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'puredata/pdp_queue.c')
-rw-r--r--puredata/pdp_queue.c386
1 files changed, 386 insertions, 0 deletions
diff --git a/puredata/pdp_queue.c b/puredata/pdp_queue.c
new file mode 100644
index 0000000..b66289a
--- /dev/null
+++ b/puredata/pdp_queue.c
@@ -0,0 +1,386 @@
+/*
+ * Pure Data Packet - processor queue module.
+ * Copyright (c) by Tom Schouten <pdp@zzz.kotnet.org>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+
+
+/*
+ this is a the processor queue pdp system module
+ it receives tasks from objects that are schedules to
+ be computed in another thread. the object is signalled back
+ when the task is completed, using a polling mechanism
+ based on a pd clock.
+
+ the queue object can be reused. the pdp system however only
+ has one instance (one pdp queue. pdp remains a serial program, though
+ it can run in a separate thread)
+
+ */
+
+
+#include <string.h>
+
+#include "pdp_queue.h"
+#include "pdp_mem.h"
+
+
+#define D if (0)
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+#define PDP_QUEUE_LOGSIZE 10
+#define PDP_QUEUE_DELTIME 10.0f
+
+
+/* there are 3 synchro methods, which can be used i.e. to ensure
+ all processing is done before shared resources are freed.
+
+ all 3 wait for the processing thread to finish, and
+
+ _wait: leaves callback queue untouched
+ _finish: clears the queue_id item in the callback queue
+ _flush: waits for thread and calls callbacks
+ and loops until callback list is empty
+
+*/
+
+
+
+/********************* general purpose pd process queue class *********************/
+
+void pdp_procqueue_wait(t_pdp_procqueue *q)
+{
+ D post("pdp_procqueue_wait(%x): waiting for pdp_queue_thread to finish processing", q);
+ pthread_mutex_lock(&q->mut);
+ while(((q->curr - q->head) & q->mask) != 0){
+
+ pthread_cond_wait(&q->cond_processingdone, &q->mut);
+ }
+ pthread_mutex_unlock(&q->mut);
+ D post("pdp_procqueue_wait(%x): pdp_procqueue_thread has finished processing", q);
+
+}
+void pdp_procqueue_finish(t_pdp_procqueue *q, int index)
+{
+
+ if (-1 == index) {
+ //post("pdp_pq_remove: index == -1");
+ return;
+ }
+ /* wait for processing thread to finish*/
+ pdp_procqueue_wait(q);
+
+ /* invalidate callback at index */
+ q->q[index & q->mask].x_callback = 0;
+ q->q[index & q->mask].x_queue_id = 0;
+
+}
+
+static void pdp_procqueue_callback (t_pdp_procqueue *q);
+
+void pdp_procqueue_flush(t_pdp_procqueue *q)
+{
+ /* wait once */
+ pdp_procqueue_wait(q);
+
+ do {
+
+ /* process callbacks and wait again
+ in case the callbacks introduced new tasks */
+ pdp_procqueue_callback(q);
+ pdp_procqueue_wait(q);
+
+ }
+ /* repeat if callback list is not empty */
+ while ((q->curr - q->head) & q->mask);
+
+ D post("pdp_procqueue_flush: done");
+}
+
+static void pdp_procqueue_signal_processor(t_pdp_procqueue *q)
+{
+
+ //NOTE: uncommenting these post statements causes a libc crash
+ //in mutex lock in putc
+ //D post("pdp_procqueue_signal_processor(%x): signalling process thread", q);
+ pthread_mutex_lock(&q->mut);
+ pthread_cond_signal(&q->cond_dataready);
+ pthread_mutex_unlock(&q->mut);
+ //D post("pdp_procqueue_signal_processor(%x): signalling done", q);
+
+
+}
+
+static void pdp_procqueue_wait_for_feeder(t_pdp_procqueue *q)
+{
+
+
+ /* only use locking when there is no data */
+ if(((q->curr - q->head) & q->mask) == 0){
+
+ /* signal processing done */
+ D post("pdp_procqueue_wait_for_feeder(%x): signalling processing is done", q);
+ pthread_mutex_lock(&q->mut);
+ pthread_cond_signal(&q->cond_processingdone);
+
+ /* wait until there is an item in the queue */
+ while(((q->curr - q->head) & q->mask) == 0){
+ pthread_cond_wait(&q->cond_dataready, &q->mut);
+ }
+
+ pthread_mutex_unlock(&q->mut);
+ D post("pdp_procqueue_wait_for_feeder(%x): waiting done", q);
+
+ }
+}
+
+
+int pdp_procqueue_full(t_pdp_procqueue *q)
+{
+ return (1 == ((q->tail - q->head) & q->mask));
+}
+
+
+void pdp_procqueue_add(t_pdp_procqueue *q, void *owner, void *process, void *callback, int *queue_id)
+{
+ int i;
+
+ /* if processing is in not in thread, just call the funcs */
+ if (!q->use_thread){
+ D post("pdp_procqueue_add(%q): calling processing routine directly", q);
+ if (queue_id) *queue_id = -1;
+ if (process) ((t_pdpmethod) process)(owner);
+ if (callback) ((t_pdpmethod) callback)(owner);
+ return;
+ }
+
+
+ /* if queue is full, print an error message and return */
+ if (pdp_procqueue_full(q)) {
+ post("pdp_procqueue_add: WARNING: processing queue (%x) is full.\n", q);
+ post("pdp_procqueue_add: WARNING: tail %08x, head %08x (%08x), mask %08x.\n", q->tail, q->head, q->head & q->mask, q->mask);
+ post("pdp_procqueue_add: WARNING: skipping process method, calling callback directly.\n");
+ if (queue_id) *queue_id = -1;
+ if (callback) ((t_pdpmethod) callback)(owner);
+ return;
+ //exit(1);
+ }
+
+ /* schedule method in thread queue */
+ i = q->head & q->mask;
+ q->q[i].x_owner = owner;
+ q->q[i].x_process = process;
+ q->q[i].x_callback = callback;
+ q->q[i].x_queue_id = queue_id;
+ if (queue_id) *queue_id = i;
+ //post("pdp_queue_add: added method to queue, index %d", i);
+
+
+ // increase the packet count
+ q->packets++;
+
+ // move head forward
+ q->head++;
+
+ pdp_procqueue_signal_processor(q);
+
+}
+
+
+/* processing thread */
+static void *pdp_procqueue_thread(void *vq)
+{
+ t_pdp_procqueue *q = (t_pdp_procqueue *)vq;
+
+ D post("pdp_procqueue_thread(%x): thread started", q);
+
+ while(1){
+ t_process_queue_item *p;
+
+
+ D post("pdp_procqueue_thread(%x): waiting for feeder", q);
+
+ /* wait until there is data available */
+ pdp_procqueue_wait_for_feeder(q);
+
+
+ D post("pdp_procqueue_thread(%x): processing %d", q, q->curr & q->mask);
+
+
+ /* call the process routine */
+ p = &q->q[q->curr & q->mask];
+ if (p->x_process)
+ (p->x_process)(p->x_owner);
+
+ /* advance */
+ q->curr++;
+
+
+ }
+ return 0;
+}
+
+
+/* call back all the callbacks */
+static void pdp_procqueue_callback (t_pdp_procqueue *q)
+{
+
+ /* call callbacks for finished packets */
+ while(0 != ((q->curr - q->tail) & q->mask))
+ {
+ int i = q->tail & q->mask;
+ /* invalidate queue id */
+ if(q->q[i].x_queue_id) *q->q[i].x_queue_id = -1;
+ /* call callback */
+ if(q->q[i].x_callback) (q->q[i].x_callback)(q->q[i].x_owner);
+ //else post("pdp_pq_tick: callback %d is disabled",i );
+ q->tail++;
+ }
+
+}
+
+/* the clock method */
+static void pdp_procqueue_tick (t_pdp_procqueue *q)
+{
+ /* do work */
+ //if (!(ticks % 1000)) post("pdp tick %d", ticks);
+
+ if (!q->use_thread) return;
+
+ /* call callbacks */
+ pdp_procqueue_callback(q);
+
+ /* increase counter */
+ q->ticks++;
+
+ /* set clock for next update */
+ clock_delay(q->pdp_clock, q->deltime);
+}
+
+
+
+void pdp_procqueue_use_thread(t_pdp_procqueue* q, int t)
+{
+ /* if thread usage is being disabled,
+ wait for thread to finish processing first */
+ if (t == 0) {
+ pdp_procqueue_wait(q);
+ q->use_thread = 0;
+ pdp_procqueue_callback(q);
+ clock_unset(q->pdp_clock);
+ }
+ else {
+ clock_unset(q->pdp_clock);
+ clock_delay(q->pdp_clock, q->deltime);
+ q->use_thread = 1;
+ }
+
+}
+
+void pdp_procqueue_init(t_pdp_procqueue *q, double milliseconds, int logsize)
+{
+ pthread_attr_t attr;
+ int size = 1 << logsize;
+
+ /* setup pdp queue processor object */
+ q->ticks = 0;
+ q->deltime = milliseconds;
+
+ /* setup queue data */
+ q->mask = size - 1;
+ q->head = 0;
+ q->tail = 0;
+ q->curr = 0;
+ q->q = pdp_alloc(size * sizeof(t_process_queue_item));
+ memset(q->q, 0, size * sizeof(t_process_queue_item));
+
+ /* enable threads */
+ q->use_thread = 1;
+
+ /* setup synchro stuff */
+ pthread_mutex_init(&q->mut, NULL);
+ pthread_cond_init(&q->cond_dataready, NULL);
+ pthread_cond_init(&q->cond_processingdone, NULL);
+
+
+ /* allocate the clock */
+ q->pdp_clock = clock_new(q, (t_method)pdp_procqueue_tick);
+
+ /* set the clock */
+ clock_delay(q->pdp_clock, 0);
+
+ /* start processing thread */
+
+ /* glibc doc says SCHED_OTHER is default,
+ but it seems not to be when initiated from a RT thread
+ so we explicitly set it here */
+ pthread_attr_init (&attr);
+ //pthread_attr_setschedpolicy(&attr, SCHED_FIFO);
+ pthread_attr_setschedpolicy(&attr, SCHED_OTHER);
+
+ D post("pdp_procqueue_init(%x): starting thread", q);
+ pthread_create(&q->thread_id, &attr, pdp_procqueue_thread, (void *)q);
+ D post("pdp_procqueue_init(%x): back in pd thread", q);
+
+ /* wait for processing thread to finish */
+ //pdp_procqueue_wait(q);
+
+ /* set default disable/enable thread here */
+ //post("pdp_queue: THREAD PROCESSING ON BY DEFAULT!!");
+ pdp_procqueue_use_thread(q,0);
+
+}
+
+
+
+
+/* the (static) pdp queue object */
+static t_pdp_procqueue pdp_queue;
+
+
+/* get the default queue */
+t_pdp_procqueue *pdp_queue_get_queue(void){return &pdp_queue;}
+
+
+#if 1
+/* default pdp queue shortcut methods */
+void pdp_queue_wait() {pdp_procqueue_wait(&pdp_queue);}
+void pdp_queue_finish(int index) { pdp_procqueue_finish(&pdp_queue, index);}
+void pdp_queue_add(void *owner, void *process, void *callback, int *queue_id) {
+ pdp_procqueue_add(&pdp_queue, owner, process, callback, queue_id);
+}
+void pdp_queue_use_thread(int t) {pdp_procqueue_use_thread(&pdp_queue, t);}
+void pdp_queue_setup(void){
+ pdp_procqueue_init(&pdp_queue, PDP_QUEUE_DELTIME, PDP_QUEUE_LOGSIZE);
+ pdp_procqueue_use_thread(&pdp_queue,0);
+}
+#endif
+
+
+
+
+
+
+
+#ifdef __cplusplus
+}
+#endif