diff options
author | Tom Schouten <doelie@users.sourceforge.net> | 2003-01-21 10:27:33 +0000 |
---|---|---|
committer | Tom Schouten <doelie@users.sourceforge.net> | 2003-01-21 10:27:33 +0000 |
commit | 9b8745d5250c9d0b60c9aa5a77f58a3fcddf1076 (patch) | |
tree | 8372b6a414a7124cec57efc9c80845e2bc1b157d /system/pdp_queue.c |
This commit was generated by cvs2svn to compensate for changes in r352,svn2git-root
which included commits to RCS files with non-trunk default branches.
svn path=/trunk/externals/pdp/; revision=353
Diffstat (limited to 'system/pdp_queue.c')
-rw-r--r-- | system/pdp_queue.c | 337 |
1 files changed, 337 insertions, 0 deletions
diff --git a/system/pdp_queue.c b/system/pdp_queue.c new file mode 100644 index 0000000..2932728 --- /dev/null +++ b/system/pdp_queue.c @@ -0,0 +1,337 @@ +/* + * Pure Data Packet - processor queue module. + * Copyright (c) by Tom Schouten <pdp@zzz.kotnet.org> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + */ + + + +/* + this is a the processor queue pdp system module + it receives tasks from objects that are schedules to + be computed in another thread. the object is signalled back + when the task is completed. + + this is not a standard pd class. it is a sigleton class + using a standard pd clock to poll for compleded methods on + every scheduler run. this is a hack to do thread synchronization + in a thread unsafe pd. + + */ + +#include "pdp.h" +#include <pthread.h> +#include <unistd.h> +#include <stdio.h> + + +#ifdef __cplusplus +extern "C" +{ +#endif + +#define PDP_QUEUE_SIZE 1024 +#define PDP_QUEUE_DELTIME 1.0f; + + + + +/********************* pdp process queue data *********************/ + +typedef void (*t_pdpmethod)(void *client); + +/* the process queue data record */ +typedef struct process_queue_struct +{ + void *x_owner; /* the object we are dealing with */ + t_pdpmethod x_process; /* the process method */ + t_pdpmethod x_callback; /* the function to be called when finished */ + int *x_queue_id; /* place to store the queue id for task */ +} t_process_queue; + + + +/* clock members */ +static t_clock *pdp_clock; +static double deltime; + +/* some bookkeeping vars */ +static long long ticks; +static long long packets; + +/* queue members */ +static t_process_queue *q; /* queue */ +static int mask; +static int head; /* last entry in queue + 1 */ +static int tail; /* first entry in queque */ +static int curr; /* the object currently processed in other thread */ + +/* pthread vars */ +static pthread_mutex_t mut; +static pthread_cond_t cond_dataready; +static pthread_cond_t cond_processingdone; +static pthread_t thread_id; + +/* synchro pipes */ +static int pipe_fd[2]; + +/* toggle for thread usage */ +static int use_thread; + + + +/* the methods */ +void pdp_queue_wait() +{ + //post("pdp_pq_wait: waiting for pdp_queue_thread to finish processing"); + pthread_mutex_lock(&mut); + while(((curr - head) & mask) != 0){ + + pthread_cond_wait(&cond_processingdone, &mut); + } + pthread_mutex_unlock(&mut); + //post("pdp_pq_wait: pdp_queue_thread has finished processing"); + +} +void pdp_queue_finish(int index) +{ + + if (-1 == index) { + //post("pdp_pq_remove: index == -1"); + return; + } + /* wait for processing thread to finish*/ + pdp_queue_wait(); + + /* invalidate callback at index */ + q[index & mask].x_callback = 0; + q[index & mask].x_queue_id = 0; + +} + +static void pdp_queue_signal_processor(void) +{ + + pthread_mutex_lock(&mut); + //post("signalling process thread"); + pthread_cond_signal(&cond_dataready); + pthread_mutex_unlock(&mut); + //post("signalling process thread done"); + +} + +static void pdp_queue_wait_for_feeder(void) +{ + + + /* only use locking when there is no data */ + if(((curr - head) & mask) == 0){ + pthread_mutex_lock(&mut); + + /* signal processing done */ + //post("pdp_queue_thread: signalling processing is done"); + pthread_cond_signal(&cond_processingdone); + + /* wait until there is an item in the queue */ + while(((curr - head) & mask) == 0){ + //post("waiting for feeder"); + pthread_cond_wait(&cond_dataready, &mut); + //post("waiting for feeder done"); + } + + pthread_mutex_unlock(&mut); + + } +} + +void pdp_queue_add(void *owner, void *process, void *callback, int *queue_id) +{ + int i; + + /* if processing is in not in thread, just call the funcs */ + if (!use_thread){ + //post("pdp_queue_add: calling processing routine directly"); + *queue_id = -1; + ((t_pdpmethod) process)(owner); + ((t_pdpmethod) callback)(owner); + return; + } + + + + /* schedule method in thread queue */ + if (1 == ((tail - head) & mask)) { + post("pdp_queue_add: WARNING: processing queue is full.\n"); + post("pdp_queue_add: WARNING: skipping process method, calling callback directly.\n"); + *queue_id = -1; + ((t_pdpmethod) callback)(owner); + } + + + + i = head & mask; + q[i].x_owner = owner; + q[i].x_process = process; + q[i].x_callback = callback; + q[i].x_queue_id = queue_id; + *queue_id = i; + //post("pdp_queue_add: added method to queue, index %d", i); + + + // increase the packet count + packets++; + + // move head forward + head++; + + pdp_queue_signal_processor(); + +} + + +/* processing thread */ +static void *pdp_queue_thread(void *dummy) +{ + while(1){ + + + /* wait until there is data available */ + pdp_queue_wait_for_feeder(); + + + //post("pdp_queue_thread: processing %d", curr); + + + /* call the process routine */ + (q[curr & mask].x_process)(q[curr & mask].x_owner); + + /* advance */ + curr++; + + + } +} + + +/* call back all the callbacks */ +static void pdp_queue_callback (void) +{ + + /* call callbacks for finished packets */ + while(0 != ((curr - tail) & mask)) + { + int i = tail & mask; + /* invalidate queue id */ + if(q[i].x_queue_id) *q[i].x_queue_id = -1; + /* call callback */ + if(q[i].x_callback) (q[i].x_callback)(q[i].x_owner); + //else post("pdp_pq_tick: callback %d is disabled",i ); + tail++; + } + +} + +/* the clock method */ +static void pdp_queue_tick (void) +{ + /* do work */ + //if (!(ticks % 1000)) post("pdp tick %d", ticks); + + if (!use_thread) return; + + /* call callbacks */ + pdp_queue_callback(); + + /* increase counter */ + ticks++; + + /* set clock for next update */ + clock_delay(pdp_clock, deltime); +} + + +void pdp_queue_use_thread(int t) +{ + /* if thread usage is being disabled, + wait for thread to finish processing first */ + if (t == 0) { + pdp_queue_wait(); + use_thread = 0; + pdp_queue_callback(); + clock_unset(pdp_clock); + } + else { + clock_unset(pdp_clock); + clock_delay(pdp_clock, deltime); + use_thread = 1; + } + +} + +void pdp_queue_setup(void) +{ + pthread_attr_t attr; + + /* setup pdp queue processor object */ + ticks = 0; + deltime = PDP_QUEUE_DELTIME; + + /* setup queue data */ + mask = PDP_QUEUE_SIZE - 1; + head = 0; + tail = 0; + curr = 0; + q = getbytes(PDP_QUEUE_SIZE * sizeof(*q)); + + /* use threads by default */ + use_thread = 1; + + /* setup synchro stuff */ + pthread_mutex_init(&mut, NULL); + pthread_cond_init(&cond_dataready, NULL); + pthread_cond_init(&cond_processingdone, NULL); + + + /* allocate the clock */ + pdp_clock = clock_new(0, (t_method)pdp_queue_tick); + + /* set the clock */ + clock_delay(pdp_clock, 0); + + /* start processing thread */ + + /* glibc doc says SCHED_OTHER is default, + but it seems not to be when initiated from a RT thread + so we explicitly set it here */ + pthread_attr_init (&attr); + //pthread_attr_setschedpolicy(&attr, SCHED_FIFO); + pthread_attr_setschedpolicy(&attr, SCHED_OTHER); + pthread_create(&thread_id, &attr, pdp_queue_thread, (void *)0); + + + +} + + + + + + + +#ifdef __cplusplus +} +#endif |