diff options
Diffstat (limited to 'system/kernel/pdp_queue.c')
-rw-r--r-- | system/kernel/pdp_queue.c | 347 |
1 files changed, 347 insertions, 0 deletions
diff --git a/system/kernel/pdp_queue.c b/system/kernel/pdp_queue.c new file mode 100644 index 0000000..665a236 --- /dev/null +++ b/system/kernel/pdp_queue.c @@ -0,0 +1,347 @@ +/* + * Pure Data Packet - processor queue module. + * Copyright (c) by Tom Schouten <pdp@zzz.kotnet.org> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + */ + + + +/* + this is a the processor queue pdp system module + it receives tasks from objects that are schedules to + be computed in another thread. the object is signalled back + when the task is completed. + + this is not a standard pd class. it is a sigleton class + using a standard pd clock to poll for compleded methods on + every scheduler run. this is a hack to do thread synchronization + in a thread unsafe pd. + + the queue object can be reused. the pdp system however only + has one instance (one pdp queue. pdp remains a serial program.) + + */ + +#include "pdp.h" +#include <pthread.h> +#include <unistd.h> +#include <stdio.h> + + +#define D if (0) + +#ifdef __cplusplus +extern "C" +{ +#endif + +#define PDP_QUEUE_LOGSIZE 10 +#define PDP_QUEUE_DELTIME 10.0f + + + + +/********************* general purpose pd process queue class *********************/ + +void pdp_procqueue_wait(t_pdp_procqueue *q) +{ + D post("pdp_procqueue_wait(%x): waiting for pdp_queue_thread to finish processing", q); + pthread_mutex_lock(&q->mut); + while(((q->curr - q->head) & q->mask) != 0){ + + pthread_cond_wait(&q->cond_processingdone, &q->mut); + } + pthread_mutex_unlock(&q->mut); + D post("pdp_procqueue_wait(%x): pdp_procqueue_thread has finished processing", q); + +} +void pdp_procqueue_finish(t_pdp_procqueue *q, int index) +{ + + if (-1 == index) { + //post("pdp_pq_remove: index == -1"); + return; + } + /* wait for processing thread to finish*/ + pdp_procqueue_wait(q); + + /* invalidate callback at index */ + q->q[index & q->mask].x_callback = 0; + q->q[index & q->mask].x_queue_id = 0; + +} + +static void pdp_procqueue_signal_processor(t_pdp_procqueue *q) +{ + + //NOTE: uncommenting these post statements causes a libc crash + //in mutex lock in putc + //D post("pdp_procqueue_signal_processor(%x): signalling process thread", q); + pthread_mutex_lock(&q->mut); + pthread_cond_signal(&q->cond_dataready); + pthread_mutex_unlock(&q->mut); + //D post("pdp_procqueue_signal_processor(%x): signalling done", q); + + +} + +static void pdp_procqueue_wait_for_feeder(t_pdp_procqueue *q) +{ + + + /* only use locking when there is no data */ + if(((q->curr - q->head) & q->mask) == 0){ + + /* signal processing done */ + D post("pdp_procqueue_wait_for_feeder(%x): signalling processing is done", q); + pthread_mutex_lock(&q->mut); + pthread_cond_signal(&q->cond_processingdone); + + /* wait until there is an item in the queue */ + while(((q->curr - q->head) & q->mask) == 0){ + pthread_cond_wait(&q->cond_dataready, &q->mut); + } + + pthread_mutex_unlock(&q->mut); + D post("pdp_procqueue_wait_for_feeder(%x): waiting done", q); + + } +} + +void pdp_procqueue_add(t_pdp_procqueue *q, void *owner, void *process, void *callback, int *queue_id) +{ + int i; + + /* if processing is in not in thread, just call the funcs */ + if (!q->use_thread){ + D post("pdp_procqueue_add(%q): calling processing routine directly", q); + if (queue_id) *queue_id = -1; + if (process) ((t_pdpmethod) process)(owner); + if (callback) ((t_pdpmethod) callback)(owner); + return; + } + + + /* if queue is full, call directly */ + if (1 == ((q->tail - q->head) & q->mask)) { + //post("pdp_procqueue_add: WARNING: processing queue (%x) is full.\n", q); + //post("pdp_procqueue_add: WARNING: tail %x, head %x, mask %x.\n", q->tail, q->head, q->mask); + //post("pdp_procqueue_add: WARNING: skipping process method, calling callback directly.\n"); + if (queue_id) *queue_id = -1; + if (callback) ((t_pdpmethod) callback)(owner); + return; + //exit(1); + } + + /* schedule method in thread queue */ + i = q->head & q->mask; + q->q[i].x_owner = owner; + q->q[i].x_process = process; + q->q[i].x_callback = callback; + q->q[i].x_queue_id = queue_id; + if (queue_id) *queue_id = i; + //post("pdp_queue_add: added method to queue, index %d", i); + + + // increase the packet count + q->packets++; + + // move head forward + q->head++; + + pdp_procqueue_signal_processor(q); + +} + + +/* processing thread */ +static void *pdp_procqueue_thread(void *vq) +{ + t_pdp_procqueue *q = (t_pdp_procqueue *)vq; + + D post("pdp_procqueue_thread(%x): thread started", q); + + while(1){ + t_process_queue_item *p; + + + D post("pdp_procqueue_thread(%x): waiting for feeder", q); + + /* wait until there is data available */ + pdp_procqueue_wait_for_feeder(q); + + + D post("pdp_procqueue_thread(%x): processing %d", q, q->curr & q->mask); + + + /* call the process routine */ + p = &q->q[q->curr & q->mask]; + if (p->x_process) + (p->x_process)(p->x_owner); + + /* advance */ + q->curr++; + + + } +} + + +/* call back all the callbacks */ +static void pdp_procqueue_callback (t_pdp_procqueue *q) +{ + + /* call callbacks for finished packets */ + while(0 != ((q->curr - q->tail) & q->mask)) + { + int i = q->tail & q->mask; + /* invalidate queue id */ + if(q->q[i].x_queue_id) *q->q[i].x_queue_id = -1; + /* call callback */ + if(q->q[i].x_callback) (q->q[i].x_callback)(q->q[i].x_owner); + //else post("pdp_pq_tick: callback %d is disabled",i ); + q->tail++; + } + +} + +/* the clock method */ +static void pdp_procqueue_tick (t_pdp_procqueue *q) +{ + /* do work */ + //if (!(ticks % 1000)) post("pdp tick %d", ticks); + + if (!q->use_thread) return; + + /* call callbacks */ + pdp_procqueue_callback(q); + + /* increase counter */ + q->ticks++; + + /* set clock for next update */ + clock_delay(q->pdp_clock, q->deltime); +} + + + +void pdp_procqueue_use_thread(t_pdp_procqueue* q, int t) +{ + /* if thread usage is being disabled, + wait for thread to finish processing first */ + if (t == 0) { + pdp_procqueue_wait(q); + q->use_thread = 0; + pdp_procqueue_callback(q); + clock_unset(q->pdp_clock); + } + else { + clock_unset(q->pdp_clock); + clock_delay(q->pdp_clock, q->deltime); + q->use_thread = 1; + } + +} + +void pdp_procqueue_init(t_pdp_procqueue *q, double milliseconds, int logsize) +{ + pthread_attr_t attr; + int size = 1 << logsize; + + /* setup pdp queue processor object */ + q->ticks = 0; + q->deltime = milliseconds; + + /* setup queue data */ + q->mask = size - 1; + q->head = 0; + q->tail = 0; + q->curr = 0; + q->q = pdp_alloc(size * sizeof(t_process_queue_item)); + memset(q->q, 0, size * sizeof(t_process_queue_item)); + + /* enable threads */ + q->use_thread = 1; + + /* setup synchro stuff */ + pthread_mutex_init(&q->mut, NULL); + pthread_cond_init(&q->cond_dataready, NULL); + pthread_cond_init(&q->cond_processingdone, NULL); + + + /* allocate the clock */ + q->pdp_clock = clock_new(q, (t_method)pdp_procqueue_tick); + + /* set the clock */ + clock_delay(q->pdp_clock, 0); + + /* start processing thread */ + + /* glibc doc says SCHED_OTHER is default, + but it seems not to be when initiated from a RT thread + so we explicitly set it here */ + pthread_attr_init (&attr); + //pthread_attr_setschedpolicy(&attr, SCHED_FIFO); + pthread_attr_setschedpolicy(&attr, SCHED_OTHER); + + D post("pdp_procqueue_init(%x): starting thread", q); + pthread_create(&q->thread_id, &attr, pdp_procqueue_thread, (void *)q); + D post("pdp_procqueue_init(%x): back in pd thread", q); + + /* wait for processing thread to finish */ + //pdp_procqueue_wait(q); + + /* set default disable/enable thread here */ + //post("pdp_queue: THREAD PROCESSING ON BY DEFAULT!!"); + pdp_procqueue_use_thread(q,0); + +} + + + + +/* the (static) pdp queue object */ +static t_pdp_procqueue pdp_queue; + + +/* get the default queue */ +t_pdp_procqueue *pdp_queue_get_queue(void){return &pdp_queue;} + + +#if 1 +/* default pdp queue shortcut methods */ +void pdp_queue_wait() {pdp_procqueue_wait(&pdp_queue);} +void pdp_queue_finish(int index) { pdp_procqueue_finish(&pdp_queue, index);} +void pdp_queue_add(void *owner, void *process, void *callback, int *queue_id) { + pdp_procqueue_add(&pdp_queue, owner, process, callback, queue_id); +} +void pdp_queue_use_thread(int t) {pdp_procqueue_use_thread(&pdp_queue, t);} +void pdp_queue_setup(void){ + pdp_procqueue_init(&pdp_queue, PDP_QUEUE_DELTIME, PDP_QUEUE_LOGSIZE); + pdp_procqueue_use_thread(&pdp_queue,0); +} +#endif + + + + + + + +#ifdef __cplusplus +} +#endif |