diff options
Diffstat (limited to 'puredata/pdp_queue.c')
-rw-r--r-- | puredata/pdp_queue.c | 386 |
1 files changed, 0 insertions, 386 deletions
diff --git a/puredata/pdp_queue.c b/puredata/pdp_queue.c deleted file mode 100644 index b66289a..0000000 --- a/puredata/pdp_queue.c +++ /dev/null @@ -1,386 +0,0 @@ -/* - * Pure Data Packet - processor queue module. - * Copyright (c) by Tom Schouten <pdp@zzz.kotnet.org> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - * - */ - - - -/* - this is a the processor queue pdp system module - it receives tasks from objects that are schedules to - be computed in another thread. the object is signalled back - when the task is completed, using a polling mechanism - based on a pd clock. - - the queue object can be reused. the pdp system however only - has one instance (one pdp queue. pdp remains a serial program, though - it can run in a separate thread) - - */ - - -#include <string.h> - -#include "pdp_queue.h" -#include "pdp_mem.h" - - -#define D if (0) - -#ifdef __cplusplus -extern "C" -{ -#endif - -#define PDP_QUEUE_LOGSIZE 10 -#define PDP_QUEUE_DELTIME 10.0f - - -/* there are 3 synchro methods, which can be used i.e. to ensure - all processing is done before shared resources are freed. - - all 3 wait for the processing thread to finish, and - - _wait: leaves callback queue untouched - _finish: clears the queue_id item in the callback queue - _flush: waits for thread and calls callbacks - and loops until callback list is empty - -*/ - - - -/********************* general purpose pd process queue class *********************/ - -void pdp_procqueue_wait(t_pdp_procqueue *q) -{ - D post("pdp_procqueue_wait(%x): waiting for pdp_queue_thread to finish processing", q); - pthread_mutex_lock(&q->mut); - while(((q->curr - q->head) & q->mask) != 0){ - - pthread_cond_wait(&q->cond_processingdone, &q->mut); - } - pthread_mutex_unlock(&q->mut); - D post("pdp_procqueue_wait(%x): pdp_procqueue_thread has finished processing", q); - -} -void pdp_procqueue_finish(t_pdp_procqueue *q, int index) -{ - - if (-1 == index) { - //post("pdp_pq_remove: index == -1"); - return; - } - /* wait for processing thread to finish*/ - pdp_procqueue_wait(q); - - /* invalidate callback at index */ - q->q[index & q->mask].x_callback = 0; - q->q[index & q->mask].x_queue_id = 0; - -} - -static void pdp_procqueue_callback (t_pdp_procqueue *q); - -void pdp_procqueue_flush(t_pdp_procqueue *q) -{ - /* wait once */ - pdp_procqueue_wait(q); - - do { - - /* process callbacks and wait again - in case the callbacks introduced new tasks */ - pdp_procqueue_callback(q); - pdp_procqueue_wait(q); - - } - /* repeat if callback list is not empty */ - while ((q->curr - q->head) & q->mask); - - D post("pdp_procqueue_flush: done"); -} - -static void pdp_procqueue_signal_processor(t_pdp_procqueue *q) -{ - - //NOTE: uncommenting these post statements causes a libc crash - //in mutex lock in putc - //D post("pdp_procqueue_signal_processor(%x): signalling process thread", q); - pthread_mutex_lock(&q->mut); - pthread_cond_signal(&q->cond_dataready); - pthread_mutex_unlock(&q->mut); - //D post("pdp_procqueue_signal_processor(%x): signalling done", q); - - -} - -static void pdp_procqueue_wait_for_feeder(t_pdp_procqueue *q) -{ - - - /* only use locking when there is no data */ - if(((q->curr - q->head) & q->mask) == 0){ - - /* signal processing done */ - D post("pdp_procqueue_wait_for_feeder(%x): signalling processing is done", q); - pthread_mutex_lock(&q->mut); - pthread_cond_signal(&q->cond_processingdone); - - /* wait until there is an item in the queue */ - while(((q->curr - q->head) & q->mask) == 0){ - pthread_cond_wait(&q->cond_dataready, &q->mut); - } - - pthread_mutex_unlock(&q->mut); - D post("pdp_procqueue_wait_for_feeder(%x): waiting done", q); - - } -} - - -int pdp_procqueue_full(t_pdp_procqueue *q) -{ - return (1 == ((q->tail - q->head) & q->mask)); -} - - -void pdp_procqueue_add(t_pdp_procqueue *q, void *owner, void *process, void *callback, int *queue_id) -{ - int i; - - /* if processing is in not in thread, just call the funcs */ - if (!q->use_thread){ - D post("pdp_procqueue_add(%q): calling processing routine directly", q); - if (queue_id) *queue_id = -1; - if (process) ((t_pdpmethod) process)(owner); - if (callback) ((t_pdpmethod) callback)(owner); - return; - } - - - /* if queue is full, print an error message and return */ - if (pdp_procqueue_full(q)) { - post("pdp_procqueue_add: WARNING: processing queue (%x) is full.\n", q); - post("pdp_procqueue_add: WARNING: tail %08x, head %08x (%08x), mask %08x.\n", q->tail, q->head, q->head & q->mask, q->mask); - post("pdp_procqueue_add: WARNING: skipping process method, calling callback directly.\n"); - if (queue_id) *queue_id = -1; - if (callback) ((t_pdpmethod) callback)(owner); - return; - //exit(1); - } - - /* schedule method in thread queue */ - i = q->head & q->mask; - q->q[i].x_owner = owner; - q->q[i].x_process = process; - q->q[i].x_callback = callback; - q->q[i].x_queue_id = queue_id; - if (queue_id) *queue_id = i; - //post("pdp_queue_add: added method to queue, index %d", i); - - - // increase the packet count - q->packets++; - - // move head forward - q->head++; - - pdp_procqueue_signal_processor(q); - -} - - -/* processing thread */ -static void *pdp_procqueue_thread(void *vq) -{ - t_pdp_procqueue *q = (t_pdp_procqueue *)vq; - - D post("pdp_procqueue_thread(%x): thread started", q); - - while(1){ - t_process_queue_item *p; - - - D post("pdp_procqueue_thread(%x): waiting for feeder", q); - - /* wait until there is data available */ - pdp_procqueue_wait_for_feeder(q); - - - D post("pdp_procqueue_thread(%x): processing %d", q, q->curr & q->mask); - - - /* call the process routine */ - p = &q->q[q->curr & q->mask]; - if (p->x_process) - (p->x_process)(p->x_owner); - - /* advance */ - q->curr++; - - - } - return 0; -} - - -/* call back all the callbacks */ -static void pdp_procqueue_callback (t_pdp_procqueue *q) -{ - - /* call callbacks for finished packets */ - while(0 != ((q->curr - q->tail) & q->mask)) - { - int i = q->tail & q->mask; - /* invalidate queue id */ - if(q->q[i].x_queue_id) *q->q[i].x_queue_id = -1; - /* call callback */ - if(q->q[i].x_callback) (q->q[i].x_callback)(q->q[i].x_owner); - //else post("pdp_pq_tick: callback %d is disabled",i ); - q->tail++; - } - -} - -/* the clock method */ -static void pdp_procqueue_tick (t_pdp_procqueue *q) -{ - /* do work */ - //if (!(ticks % 1000)) post("pdp tick %d", ticks); - - if (!q->use_thread) return; - - /* call callbacks */ - pdp_procqueue_callback(q); - - /* increase counter */ - q->ticks++; - - /* set clock for next update */ - clock_delay(q->pdp_clock, q->deltime); -} - - - -void pdp_procqueue_use_thread(t_pdp_procqueue* q, int t) -{ - /* if thread usage is being disabled, - wait for thread to finish processing first */ - if (t == 0) { - pdp_procqueue_wait(q); - q->use_thread = 0; - pdp_procqueue_callback(q); - clock_unset(q->pdp_clock); - } - else { - clock_unset(q->pdp_clock); - clock_delay(q->pdp_clock, q->deltime); - q->use_thread = 1; - } - -} - -void pdp_procqueue_init(t_pdp_procqueue *q, double milliseconds, int logsize) -{ - pthread_attr_t attr; - int size = 1 << logsize; - - /* setup pdp queue processor object */ - q->ticks = 0; - q->deltime = milliseconds; - - /* setup queue data */ - q->mask = size - 1; - q->head = 0; - q->tail = 0; - q->curr = 0; - q->q = pdp_alloc(size * sizeof(t_process_queue_item)); - memset(q->q, 0, size * sizeof(t_process_queue_item)); - - /* enable threads */ - q->use_thread = 1; - - /* setup synchro stuff */ - pthread_mutex_init(&q->mut, NULL); - pthread_cond_init(&q->cond_dataready, NULL); - pthread_cond_init(&q->cond_processingdone, NULL); - - - /* allocate the clock */ - q->pdp_clock = clock_new(q, (t_method)pdp_procqueue_tick); - - /* set the clock */ - clock_delay(q->pdp_clock, 0); - - /* start processing thread */ - - /* glibc doc says SCHED_OTHER is default, - but it seems not to be when initiated from a RT thread - so we explicitly set it here */ - pthread_attr_init (&attr); - //pthread_attr_setschedpolicy(&attr, SCHED_FIFO); - pthread_attr_setschedpolicy(&attr, SCHED_OTHER); - - D post("pdp_procqueue_init(%x): starting thread", q); - pthread_create(&q->thread_id, &attr, pdp_procqueue_thread, (void *)q); - D post("pdp_procqueue_init(%x): back in pd thread", q); - - /* wait for processing thread to finish */ - //pdp_procqueue_wait(q); - - /* set default disable/enable thread here */ - //post("pdp_queue: THREAD PROCESSING ON BY DEFAULT!!"); - pdp_procqueue_use_thread(q,0); - -} - - - - -/* the (static) pdp queue object */ -static t_pdp_procqueue pdp_queue; - - -/* get the default queue */ -t_pdp_procqueue *pdp_queue_get_queue(void){return &pdp_queue;} - - -#if 1 -/* default pdp queue shortcut methods */ -void pdp_queue_wait() {pdp_procqueue_wait(&pdp_queue);} -void pdp_queue_finish(int index) { pdp_procqueue_finish(&pdp_queue, index);} -void pdp_queue_add(void *owner, void *process, void *callback, int *queue_id) { - pdp_procqueue_add(&pdp_queue, owner, process, callback, queue_id); -} -void pdp_queue_use_thread(int t) {pdp_procqueue_use_thread(&pdp_queue, t);} -void pdp_queue_setup(void){ - pdp_procqueue_init(&pdp_queue, PDP_QUEUE_DELTIME, PDP_QUEUE_LOGSIZE); - pdp_procqueue_use_thread(&pdp_queue,0); -} -#endif - - - - - - - -#ifdef __cplusplus -} -#endif |