aboutsummaryrefslogtreecommitdiff
path: root/modules/generic
diff options
context:
space:
mode:
authorTom Schouten <doelie@users.sourceforge.net>2006-09-01 13:45:31 +0000
committerTom Schouten <doelie@users.sourceforge.net>2006-09-01 13:45:31 +0000
commit7591a024f184bd385d35583d19d86c1d5f2531ba (patch)
tree77aa0c44ccb700eb9a2b16e1b246e3c8026c40ed /modules/generic
parent91dd6b68f0f209ad015a303095bb1df018dca71e (diff)
pdp current darcs merge
svn path=/trunk/externals/pdp/; revision=5816
Diffstat (limited to 'modules/generic')
-rw-r--r--modules/generic/Makefile2
-rw-r--r--modules/generic/pdp_del.c18
-rw-r--r--modules/generic/pdp_rawin.c302
-rw-r--r--modules/generic/pdp_rawout.c376
4 files changed, 503 insertions, 195 deletions
diff --git a/modules/generic/Makefile b/modules/generic/Makefile
index 035f970..e264828 100644
--- a/modules/generic/Makefile
+++ b/modules/generic/Makefile
@@ -4,7 +4,7 @@ include ../../Makefile.config
PDP_MOD = pdp_reg.o pdp_del.o pdp_snap.o pdp_trigger.o \
pdp_route.o pdp_inspect.o pdp_loop.o pdp_description.o pdp_convert.o \
- pdp_udp_send.o pdp_udp_receive.o pdp_rawin.o pdp_rawout.o
+ pdp_udp_send.o pdp_udp_receive.o pdp_rawin.o pdp_rawout.o pdp_metro.o
# build generic modules
all_modules: $(PDP_MOD)
diff --git a/modules/generic/pdp_del.c b/modules/generic/pdp_del.c
index 4b51023..6b7aeb2 100644
--- a/modules/generic/pdp_del.c
+++ b/modules/generic/pdp_del.c
@@ -71,7 +71,14 @@ static void pdp_del_input_0(t_pdp_del *x, t_symbol *s, t_floatarg f)
else if (s == gensym("process")){
out = (((x->x_head + x->x_delay)) % x->x_order);
packet = x->x_packet[out];
- pdp_packet_pass_if_valid(x->x_outlet0, &x->x_packet[out]);
+
+ // originally, we wouldn't keep the packet in the delay line to save memory
+ // however, this behaviour is very annoying, and doesn't allow ''scratching''
+ // so we send out a copy instead.
+ // pdp_packet_pass_if_valid(x->x_outlet0, &x->x_packet[out]);
+ int p = pdp_packet_copy_ro(packet);
+ pdp_packet_pass_if_valid(x->x_outlet0, &p);
+
/*
if (-1 != packet){
@@ -88,6 +95,15 @@ static void pdp_del_input_0(t_pdp_del *x, t_symbol *s, t_floatarg f)
*/
x->x_head = (x->x_head + x->x_order - 1) % x->x_order;
+
+/*
+ int i;
+ for (i=0; i<x->x_order; i++){
+ fprintf(stderr, " %d", x->x_packet[i]);
+ }
+ fprintf(stderr, "\n");
+*/
+
}
diff --git a/modules/generic/pdp_rawin.c b/modules/generic/pdp_rawin.c
index 28ef8fb..04d8775 100644
--- a/modules/generic/pdp_rawin.c
+++ b/modules/generic/pdp_rawin.c
@@ -1,5 +1,5 @@
/*
- * Pure Data Packet module. packet forth console
+ * Pure Data Packet module. Raw packet input
* Copyright (c) by Tom Schouten <pdp@zzz.kotnet.org>
*
* This program is free software; you can redistribute it and/or modify
@@ -19,6 +19,7 @@
*/
+#include <string.h>
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
@@ -40,6 +41,55 @@
#define D if (1)
+/* TODO: add flow control
+
+reminder: pthread condition values for synchro
+
+writing to a queue goes like this:
+* pthread_mutex_lock
+* atomic op (write)
+* pthread_cond_signal
+* pthread_mutex_unlock
+
+reading from a queue goes like this:
+* pthread_mutex_lock
+* while (!CONDITION) pthread_cond_wait
+* atomic op (read)
+* pthread_mutex_unlock
+
+
+in this case, there is a reader and a writer, AND a maximum
+size of the buffer between them (1 atom) so both reader and writer
+might block. compare this to the 'command queue' in libpf, where
+only the reader blocks.
+
+so the course of events is:
+
+READER
+* wait for data ready (COND_READ) this blocks pd, it is assumed data is always ready in normal operation
+* consume
+* signal writer (COND_WRITE)
+
+
+WRITER
+* wait for space ready (COND_WRITE)
+* write
+* signal reader
+
+
+one remark though:
+is this machinery really necessary?
+-----------------------------------
+
+it might be easier to just read in the pd thread.
+the only problem this gives is when data really arrives not in a single chunk, but as a stream.
+to me that seems a really awkward special case which can be solved using another PROCESS to
+do the buffering / dropping.
+
+so, for now, it's just synchronous, no threads for sync reading.
+
+*/
+
/* raw input from a unix pipe */
@@ -54,16 +104,18 @@ typedef struct rawin_struct
/* comm */
t_pdp_list *x_queue; // packet queue
+ int x_pipefd;
/* thread */
pthread_mutex_t x_mut;
pthread_attr_t x_attr;
pthread_t x_thread;
+
/* sync */
+ int x_mode; // 1-> sync to input
int x_giveup; // 1-> terminate reader thread
int x_active; // 1-> reader thread is launched
- int x_done; // 1-> reader thread has exited
/* config */
t_symbol *x_pipe;
@@ -76,138 +128,125 @@ static inline void lock(t_rawin *x){pthread_mutex_lock(&x->x_mut);}
static inline void unlock(t_rawin *x){pthread_mutex_unlock(&x->x_mut);}
static void rawin_close(t_rawin *x);
-static void tick(t_rawin *x)
-{
- /* send all packets in queue to outlet */
- lock(x);
- while (x->x_queue->elements){
- outlet_pdp_atom(x->x_outlet, x->x_queue->first);
- pdp_list_pop(x->x_queue); // pop stale reference
- }
- unlock(x);
- clock_delay(x->x_clock, PERIOD);
- /* check if thread is done */
- if (x->x_done) rawin_close(x);
-}
-static void move_current_to_queue(t_rawin *x, int packet)
-{
- lock(x);
- pdp_list_add_back(x->x_queue, a_packet, (t_pdp_word)packet);
- unlock(x);
-}
-static void *rawin_thread(void *y)
-{
- int pipe;
+static int read_packet(t_rawin *x){
+
int packet = -1;
- t_rawin *x = (t_rawin *)y;
+ void *data = 0;
+ int left = -1;
int period_sec;
int period_usec;
-
- //D pdp_post("pipe: %s", x->x_pipe->s_name);
- //D pdp_post("type: %s", x->x_type->s_name);
-
- /* open pipe */
- if (-1 == (pipe = open(x->x_pipe->s_name, O_RDONLY|O_NONBLOCK))){
- perror(x->x_pipe->s_name);
- goto exit;
+ /* create packet */
+ if (-1 != packet){
+ pdp_post("WARNING: deleting stale packet");
+ pdp_packet_mark_unused(packet);
+ }
+ packet = pdp_factory_newpacket(x->x_type);
+ if (-1 == packet){
+ pdp_post("ERROR: can't create packet. type = %s", x->x_type->s_name);
+ goto close;
}
+
+ /* fill packet */
+ data = pdp_packet_data(packet);
+ left = pdp_packet_data_size(packet);
+ // D pdp_post("packet %d, data %x, size %d", packet, data, left);
+
+ /* inner loop: pipe reads */
+ while(left){
- /* main loop (packets) */
- while(1){
- void *data = 0;
- int left = -1;
+ fd_set inset;
+ struct timeval tv = {0,10000};
- /* create packet */
- if (-1 != packet){
- pdp_post("WARNING: deleting stale packet");
+ /* check if we need to stop */
+ if (x->x_giveup){
pdp_packet_mark_unused(packet);
+ goto close;
}
- packet = pdp_factory_newpacket(x->x_type);
- if (-1 == packet){
- pdp_post("ERROR: can't create packet. type = %s", x->x_type->s_name);
- goto exit;
+ /* select, with timeout */
+ FD_ZERO(&inset);
+ FD_SET(x->x_pipefd, &inset);
+ if (-1 == select(x->x_pipefd+1, &inset, NULL,NULL, &tv)){
+ pdp_post("select error");
+ goto close;
}
-
- /* fill packet */
- data = pdp_packet_data(packet);
- left = pdp_packet_data_size(packet);
- // D pdp_post("packet %d, data %x, size %d", packet, data, left);
-
- /* inner loop: pipe reads */
- while(left){
-
- fd_set inset;
- struct timeval tv = {0,10000};
-
- /* check if we need to stop */
- if (x->x_giveup){
- pdp_packet_mark_unused(packet);
- goto close;
- }
- /* select, with timeout */
- FD_ZERO(&inset);
- FD_SET(pipe, &inset);
- if (-1 == select(pipe+1, &inset, NULL,NULL, &tv)){
- pdp_post("select error");
- goto close;
- }
- /* if ready, read, else retry */
- if (FD_ISSET(pipe, &inset)){
- int bytes = read(pipe, data, left);
- if (!bytes){
- /* if no bytes are read, pipe is closed */
- goto close;
- }
- data += bytes;
- left -= bytes;
+ /* if ready, read, else retry */
+ if (FD_ISSET(x->x_pipefd, &inset)){
+ int bytes = read(x->x_pipefd, data, left);
+ if (!bytes){
+ /* if no bytes are read, pipe is closed */
+ goto close;
}
+ data += bytes;
+ left -= bytes;
}
-
- /* move to queue */
- move_current_to_queue(x, packet);
- packet = -1;
+ }
+ return packet;
+ close:
+ return -1;
+}
-
+/* reader thread syncs to pipe */
+static void *rawin_thread(void *y)
+{
+ t_rawin *x = (t_rawin *)y;
+ int packet = -1;
+
+ /* loop until error or close */
+ while (-1 != (packet = read_packet(x))) {
+ lock(x);
+ pdp_list_add_back(x->x_queue, a_packet, (t_pdp_word)packet);
+ unlock(x);
}
- close:
- /* close pipe */
- close(pipe);
-
-
- exit:
- x->x_done = 1;
return 0;
}
+/* sync to stream:
+ tick polls the receive queue */
+static void rawin_tick(t_rawin *x)
+{
+ int p = -1;
+
+ /* send all packets in queue to outlet */
+ while (x->x_queue->elements){
+ lock(x);
+ //pdp_post("%d", x->x_queue->elements);
+ p = pdp_list_pop(x->x_queue).w_packet;
+ unlock(x);
+ //pdp_post_n("%d ", p);
+ pdp_packet_pass_if_valid(x->x_outlet, &p);
+ //pdp_post("%d",p);
+ }
+ clock_delay(x->x_clock, PERIOD);
+}
-static void rawin_type(t_rawin *x, t_symbol *type)
-{
- x->x_type = pdp_gensym(type->s_name);
+/* sync to bang:
+ this runs the reader in the pd thread
+ */
+static void rawin_bang(t_rawin *x){
+ if (!x->x_active) return;
+ if (x->x_mode) return;
+ int packet = read_packet(x);
+ if (-1 == packet) rawin_close(x); // close on error
+ pdp_packet_pass_if_valid(x->x_outlet, &packet);
+
}
-static void rawin_open(t_rawin *x, t_symbol *pipe)
-{
- /* save pipe name if not empty */
- if (pipe->s_name[0]) {x->x_pipe = pipe;}
- if (x->x_active) {
- pdp_post("already open");
- return;
- }
- /* start thread */
- x->x_giveup = 0;
- x->x_done = 0;
- pthread_create(&x->x_thread, &x->x_attr, rawin_thread , x);
- x->x_active = 1;
+
+
+
+static void rawin_type(t_rawin *x, t_symbol *type)
+{
+ x->x_type = pdp_gensym(type->s_name);
}
static void rawin_close(t_rawin *x)
@@ -217,19 +256,52 @@ static void rawin_close(t_rawin *x)
/* stop thread: set giveup + wait */
x->x_giveup = 1;
- pthread_join(x->x_thread, NULL);
+ if (x->x_mode) pthread_join(x->x_thread, NULL);
x->x_active = 0;
+ /* close pipe */
+ close(x->x_pipefd);
+
/* notify */
outlet_bang(x->x_sync_outlet);
- pdp_post("connection to %s closed", x->x_pipe->s_name);
-
+ pdp_post("pdp_rawin: connection to %s closed", x->x_pipe->s_name);
+}
+
+
+static void rawin_open(t_rawin *x, t_symbol *spipe)
+{
+ /* save pipe name if not empty */
+ if (spipe->s_name[0]) {x->x_pipe = spipe;}
+ if (x->x_active) {
+ pdp_post("pdp_rawin: already open");
+ return;
+ }
+ /* open pipe */
+ if (-1 == (x->x_pipefd = open(x->x_pipe->s_name, O_RDONLY|O_NONBLOCK))){
+ perror(x->x_pipe->s_name);
+ return;
+ }
+
+ /* thread control vars */
+ x->x_giveup = 0;
+ x->x_active = 1;
+
+ /* start thread if sync mode */
+ if (x->x_mode)
+ pthread_create(&x->x_thread, &x->x_attr, rawin_thread , x);
}
+
+static void rawin_sync(t_rawin *x, t_float fmode){
+ rawin_close(x);
+ x->x_mode = (int)fmode;
+}
+
+
static void rawin_free(t_rawin *x)
{
rawin_close(x);
@@ -241,21 +313,21 @@ static void rawin_free(t_rawin *x)
t_class *rawin_class;
-static void *rawin_new(t_symbol *pipe, t_symbol *type)
+static void *rawin_new(t_symbol *spipe, t_symbol *type)
{
t_rawin *x;
- pdp_post("%s %s", pipe->s_name, type->s_name);
+ pdp_post("%s %s", spipe->s_name, type->s_name);
/* allocate & init */
x = (t_rawin *)pd_new(rawin_class);
x->x_outlet = outlet_new(&x->x_obj, &s_anything);
x->x_sync_outlet = outlet_new(&x->x_obj, &s_anything);
- x->x_clock = clock_new(x, (t_method)tick);
+ x->x_clock = clock_new(x, (t_method)rawin_tick);
x->x_queue = pdp_list_new(0);
x->x_active = 0;
x->x_giveup = 0;
- x->x_done = 0;
+ x->x_mode = 0;
x->x_type = pdp_gensym("image/YCrCb/320x240"); //default
x->x_pipe = gensym("/tmp/pdpraw"); // default
pthread_attr_init(&x->x_attr);
@@ -264,7 +336,7 @@ static void *rawin_new(t_symbol *pipe, t_symbol *type)
/* args */
rawin_type(x, type);
- if (pipe->s_name[0]) x->x_pipe = pipe;
+ if (spipe->s_name[0]) x->x_pipe = spipe;
return (void *)x;
@@ -286,10 +358,12 @@ void pdp_rawin_setup(void)
rawin_class = class_new(gensym("pdp_rawin"), (t_newmethod)rawin_new,
(t_method)rawin_free, sizeof(t_rawin), 0, A_DEFSYMBOL, A_DEFSYMBOL, A_NULL);
- /* add global message handler */
+ /* add global message handlers */
class_addmethod(rawin_class, (t_method)rawin_type, gensym("type"), A_SYMBOL, A_NULL);
class_addmethod(rawin_class, (t_method)rawin_open, gensym("open"), A_DEFSYMBOL, A_NULL);
class_addmethod(rawin_class, (t_method)rawin_close, gensym("close"), A_NULL);
+ class_addmethod(rawin_class, (t_method)rawin_sync, gensym("sync"), A_FLOAT, A_NULL);
+ class_addmethod(rawin_class, (t_method)rawin_bang, gensym("bang"), A_NULL);
}
diff --git a/modules/generic/pdp_rawout.c b/modules/generic/pdp_rawout.c
index e1e9edf..01d7ca0 100644
--- a/modules/generic/pdp_rawout.c
+++ b/modules/generic/pdp_rawout.c
@@ -1,5 +1,5 @@
/*
- * Pure Data Packet module. packet forth console
+ * Pure Data Packet module. Raw packet output.
* Copyright (c) by Tom Schouten <pdp@zzz.kotnet.org>
*
* This program is free software; you can redistribute it and/or modify
@@ -19,6 +19,31 @@
*/
+/*
+ This is the most straightforward way to get data out of pdp.
+ The internals follow the simple reader/writer pattern
+ * writer: runs in pd thread, accepts packets from inlet and stores in queue
+ * reader: runs in own thread, reads packets from queue and writes to pipe
+
+ Since there is no communication from reader to writer, we need a watchdog
+ timer to check the status of the writer. Mainly to close when necessary.
+
+ To enable audio recording, pdp_rawout will also produce interleaved 16bit
+ audio. You will need to instantiate it with [pdp_rawout~ nbchans]
+
+
+
+*/
+
+/* TODO:
+
+ make audio buffer smaller (128 bytes writes is too heavy)
+
+*/
+
+
+
+#include <string.h>
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
@@ -39,10 +64,9 @@
#define D if (1)
-#define MAX_QUEUESIZE 4
+#define DEF_QUEUESIZE 100
#define PIPE_BLOCKSIZE 4096
-
-
+#define POLLTIME 20
/* raw input from a unix pipe */
@@ -51,11 +75,19 @@ typedef struct rawout_struct
{
/* pd */
t_object x_obj;
+ t_float x_f;
+
//t_outlet *x_outlet;
t_outlet *x_sync_outlet;
/* comm */
- t_pdp_list *x_queue; // packet queue
+ t_pdp_list *x_queue; // packet queue
+ t_clock *x_clock; // watchdog timer
+ t_float x_deltime; // watchdog period
+ int x_verbose;
+ int x_packet; // dsp fillup state
+ int x_countdown; // amount of packet filled up
+ short int * x_needle; // start writing here
/* thread */
pthread_mutex_t x_mut;
@@ -68,8 +100,10 @@ typedef struct rawout_struct
int x_done; // 1-> writer thread has exited
/* config */
- t_symbol *x_pipe;
- t_pdp_symbol *x_type;
+ t_symbol *x_pipe;
+ int x_chans; // nb audio channels
+ t_pdp_symbol *x_tmptype;
+ int x_max_queuesize; // buffer size ( < 0 = infty )
} t_rawout;
@@ -78,55 +112,41 @@ static inline void lock(t_rawout *x){pthread_mutex_lock(&x->x_mut);}
static inline void unlock(t_rawout *x){pthread_mutex_unlock(&x->x_mut);}
static void rawout_close(t_rawout *x);
-static void pdp_in(t_rawout *x, t_symbol *s, t_float f)
-{
- /* save packet to pdp queue, if size is smaller than maxsize */
- if (s == S_REGISTER_RO){
- if (x->x_queue->elements < MAX_QUEUESIZE){
- int p = (int)f;
- p = pdp_packet_copy_ro(p);
- if (p != -1){
- lock(x);
- pdp_list_add_back(x->x_queue, a_packet, (t_pdp_word)p);
- unlock(x);
- }
- }
- else {
- pdp_post("pdp_rawout: dropping packet: (queue full)", MAX_QUEUESIZE);
- }
-
- }
-
- /* check if thread is done */
- if (x->x_done) rawout_close(x);
-
-}
-
+/* READER THREAD: reads from queue, writes to pipe */
static void *rawout_thread(void *y)
{
- int pipe;
+ int pipefd;
int packet = -1;
t_rawout *x = (t_rawout *)y;
int period_sec;
int period_usec;
- sigset_t sigvec; /* signal handling */
+ sigset_t _sigvec; /* signal handling */
+
+ char me[1024];
+ sprintf(me, "pdp_rawout: %s", x->x_pipe->s_name);
/* ignore pipe signal */
- sigemptyset(&sigvec);
- sigaddset(&sigvec,SIGPIPE);
- pthread_sigmask(SIG_BLOCK, &sigvec, 0);
+ sigemptyset(&_sigvec);
+ sigaddset(&_sigvec,SIGPIPE);
+ pthread_sigmask(SIG_BLOCK, &_sigvec, 0);
//D pdp_post("pipe: %s", x->x_pipe->s_name);
//D pdp_post("type: %s", x->x_type->s_name);
+
/* open pipe */
- if (-1 == (pipe = open(x->x_pipe->s_name, O_WRONLY|O_NONBLOCK))){
- perror(x->x_pipe->s_name);
- goto exit;
+ if (-1 == (pipefd = open(x->x_pipe->s_name, O_WRONLY|O_NONBLOCK|O_APPEND))){
+ if (-1 == (pipefd = open(x->x_pipe->s_name, O_WRONLY|O_CREAT))){
+ perror(me);
+ goto exit;
+ }
}
+ pdp_post("pdp_rawout: opened %s", x->x_pipe->s_name);
+
+
/* main loop (packets) */
while(1){
void *data = 0;
@@ -147,10 +167,15 @@ static void *rawout_thread(void *y)
lock(x);
packet = pdp_list_pop(x->x_queue).w_packet;
unlock(x);
-
+
/* send packet */
+ //t_pdp *h = pdp_packet_header(packet);
+ //fprintf(stderr, "users %d\n", h->users);
+
data = pdp_packet_data(packet);
left = pdp_packet_data_size(packet);
+ //int i; for (i = 0; i<left/2; i++) fprintf(stderr, "%06x ", ((short int *)data)[i]); fprintf(stderr, "\n");
+
/* inner loop: pipe reads */
while(left){
@@ -166,19 +191,20 @@ static void *rawout_thread(void *y)
/* select, with timeout */
FD_ZERO(&outset);
- FD_SET(pipe, &outset);
- if (-1 == select(pipe+1, NULL, &outset, NULL, &tv)){
+ FD_SET(pipefd, &outset);
+ if (-1 == select(pipefd+1, NULL, &outset, NULL, &tv)){
pdp_post("select error");
goto close;
}
/* if ready, read, else retry */
- if (FD_ISSET(pipe, &outset)){
- int bytes = write(pipe, data, left);
+ if (FD_ISSET(pipefd, &outset)){
+
+ int bytes = write(pipefd, data, left);
/* handle errors */
if (bytes <= 0){
- perror(x->x_pipe->s_name);
- if (bytes != EAGAIN) goto close;
+ perror(me);
+ if (errno != EAGAIN) goto close;
}
/* or update pointers */
else{
@@ -200,7 +226,7 @@ static void *rawout_thread(void *y)
close:
/* close pipe */
- close(pipe);
+ close(pipefd);
exit:
@@ -209,16 +235,121 @@ static void *rawout_thread(void *y)
}
+/* DSP INPUT */
+#define DSP_ARG(type, name, source) type name = (type)source
-static void rawout_type(t_rawout *x, t_symbol *type)
+static t_int *rawout_perform(t_int *w);
+static void rawout_dsp(t_rawout *x, t_signal **sp){
+ int nargs = 2 + x->x_chans;
+ t_int args[nargs];
+ args[0] = (int)x;
+ args[1] = (int)sp[0]->s_n;
+ float **in = (float **)(args+2);
+ int i;
+ for (i=0; i<x->x_chans; i++) in[i] = sp[i]->s_vec;
+ dsp_addv(rawout_perform, nargs, args);
+}
+
+static t_int *rawout_perform(t_int *w)
{
- x->x_type = pdp_gensym(type->s_name);
+ DSP_ARG(t_rawout*, x, w[1]);
+ DSP_ARG(t_int, n, w[2]);
+ DSP_ARG(t_float**, in, &w[3]);
+
+ short int *out;
+ int i,c,k;
+
+
+ if (x->x_queue->elements >= x->x_max_queuesize){
+ // drop
+ if (x->x_verbose && x->x_active) pdp_post_n(".");
+ }
+ else {
+
+ // create packet
+ if (x->x_countdown) {
+ out = x->x_needle;
+ }
+ else {
+ int p = pdp_factory_newpacket(x->x_tmptype);
+ pdp_packet_mark_unused(x->x_packet);
+ // if (-1 == p) pdp_post("pdp_rawout~: can't create packet");
+ x->x_needle = out = (short int *)pdp_packet_data(p);
+ x->x_packet = p;
+ x->x_countdown = pdp_packet_data_size(p) / 2;
+ }
+
+ //pdp_post("data size = %d bytes", pdp_packet_data_size(p));
+
+ //memset(out, 0, pdp_packet_data_size(p));
+
+
+ // convert & interleave
+ for (k=0,i=0; i<n; i++){
+ for (c=0; c<x->x_chans; c++,k++){
+ float val = (in[c])[i];
+ val *= (float)((1<<15)-1);
+ out[k] = (short int)(val);
+ //out[k] = 0x1234;
+ //fprintf(stderr, "(%d,%d,%d) %d\n", c, i, k, (int)out[k]);
+ }
+ }
+
+ x->x_needle += k;
+ x->x_countdown -= k;
+ if (!x->x_countdown){
+ // transfer
+ lock(x);
+ pdp_list_add_back(x->x_queue, a_packet, (t_pdp_word)x->x_packet);
+ x->x_packet = -1;
+ unlock(x);
+ }
+ }
+
+ return w+3+x->x_chans;
}
-static void rawout_open(t_rawout *x, t_symbol *pipe)
+
+/* PACKET INPUT */
+
+
+static void pdp_in(t_rawout *x, t_symbol *s, t_float f)
+{
+ /* save packet to pdp queue, if size is smaller than maxsize */
+ if (s == S_REGISTER_RO){
+ if (x->x_queue->elements < x->x_max_queuesize){
+ int p = (int)f;
+ p = pdp_packet_copy_ro(p);
+ if (p != -1){
+ lock(x);
+ pdp_list_add_back(x->x_queue, a_packet, (t_pdp_word)p);
+ unlock(x);
+ }
+ }
+ else {
+ //pdp_post("pdp_rawout: dropping packet: (queue full)", x->x_max_queuesize);
+ if (x->x_active && x->x_verbose) pdp_post_n(".");
+ }
+
+ }
+
+ /* check if thread is done */
+ if (x->x_done) rawout_close(x);
+
+}
+
+/* CONTROL */
+
+//static void rawout_type(t_rawout *x, t_symbol *type)
+//{
+ //x->x_type = pdp_gensym(type->s_name);
+//}
+
+static void clear_queue(t_rawout *x);
+static void rawout_open(t_rawout *x, t_symbol *spipe)
{
/* save pipe name if not empty */
- if (pipe->s_name[0]) {x->x_pipe = pipe;}
+ if (spipe->s_name[0]) {x->x_pipe = spipe;}
if (x->x_active) {
pdp_post("already open");
@@ -227,67 +358,133 @@ static void rawout_open(t_rawout *x, t_symbol *pipe)
/* start thread */
x->x_giveup = 0;
x->x_done = 0;
+ clear_queue(x);
pthread_create(&x->x_thread, &x->x_attr, rawout_thread , x);
x->x_active = 1;
}
-static void rawout_close(t_rawout *x)
-{
-
+static void rawout_stopthread(t_rawout *x){
if (!x->x_active) return;
/* stop thread: set giveup + wait */
x->x_giveup = 1;
pthread_join(x->x_thread, NULL);
x->x_active = 0;
+ clear_queue(x);
+}
+
+static void rawout_close(t_rawout *x)
+{
+ if (!x->x_active) return;
+ rawout_stopthread(x);
/* notify */
outlet_bang(x->x_sync_outlet);
- pdp_post("connection to %s closed", x->x_pipe->s_name);
-
+ pdp_post("pdp_rawout: closed %s", x->x_pipe->s_name);
+}
+static void rawout_verbose(t_rawout *x, t_float fverbose){
+ x->x_verbose = (int)fverbose;
+}
+static void rawout_tick(t_rawout *x){
+ if (x->x_done) rawout_close(x);
+ clock_delay(x->x_clock, x->x_deltime);
+}
-
+static void clear_queue(t_rawout *x){
+ lock(x);
+ while(x->x_queue->elements){
+ pdp_packet_mark_unused(pdp_list_pop(x->x_queue).w_packet);
+ }
+ unlock(x);
+}
+
+#define MAXINT (0x80 << ((sizeof(int)-1)*8))
+static void rawout_queuesize(t_rawout *x, t_float fbufsize){
+ int bufsize = (int)fbufsize;
+ if (fbufsize < 1) fbufsize = MAXINT;
+ x->x_max_queuesize = bufsize;
}
static void rawout_free(t_rawout *x)
{
- rawout_close(x);
- pdp_tree_strip_packets(x->x_queue);
+ // terminate thread
+ rawout_stopthread(x);
+
+ // cleanup
+ clock_unset(x->x_clock);
+ clear_queue(x);
pdp_tree_free(x->x_queue);
+ pdp_packet_mark_unused(x->x_packet);
}
t_class *rawout_class;
+t_class *rawout_dsp_class;
-
-static void *rawout_new(t_symbol *pipe, t_symbol *type)
-{
- t_rawout *x;
-
- pdp_post("%s %s", pipe->s_name, type->s_name);
-
- /* allocate & init */
- x = (t_rawout *)pd_new(rawout_class);
+// shared stuff
+static void rawout_init(t_rawout *x, t_symbol *spipe){
//x->x_outlet = outlet_new(&x->x_obj, &s_anything);
x->x_sync_outlet = outlet_new(&x->x_obj, &s_anything);
x->x_queue = pdp_list_new(0);
x->x_active = 0;
x->x_giveup = 0;
x->x_done = 0;
- x->x_type = pdp_gensym("image/YCrCb/320x240"); //default
+ x->x_packet = -1;
+ x->x_countdown = 0;
+ x->x_max_queuesize = DEF_QUEUESIZE;
+ //x->x_type = pdp_gensym("image/YCrCb/320x240"); //default
x->x_pipe = gensym("/tmp/pdpraw"); // default
+ x->x_deltime = POLLTIME;
+ x->x_clock = clock_new(x, (t_method)rawout_tick);
pthread_attr_init(&x->x_attr);
pthread_mutex_init(&x->x_mut, NULL);
/* args */
- rawout_type(x, type);
- if (pipe->s_name[0]) x->x_pipe = pipe;
+ //rawout_type(x, type);
+ if (spipe && spipe->s_name[0]) x->x_pipe = spipe;
+ rawout_tick(x);
+ rawout_verbose(x,0);
+}
+// [pdp_rawout]
+static void *rawout_new(t_symbol *spipe /* , t_symbol *type */)
+{
+ t_rawout *x;
+ /* allocate & init */
+ x = (t_rawout *)pd_new(rawout_class);
+ rawout_init(x, spipe);
return (void *)x;
}
+// [pdp_rawout~]
+
+
+// HUH??? why do i get the symbol first, then the float????
+// see http://lists.puredata.info/pipermail/pd-dev/2003-09/001618.html
+//static void *rawout_dsp_new(t_float fchans, t_symbol *spipe){
+static void *rawout_dsp_new(t_symbol *spipe, t_float fchans){
+ int chans = (int)fchans;
+ if (chans < 1) chans = 1;
+ if (chans > 64) return 0; // this is just a safety measure
+
+ t_rawout *x = (t_rawout *)pd_new(rawout_dsp_class);
+ rawout_init(x, spipe);
+
+ // hack: temp packet
+ char temp_packet[1024];
+ sprintf(temp_packet, "image/grey/256x%d", 8 * chans);
+ pdp_post("pdp_rawout: using fake packet %s", temp_packet);
+ x->x_tmptype = pdp_gensym(temp_packet);
+
+ // create audio inlets
+ x->x_chans = chans;
+ while (--chans) inlet_new(&x->x_obj, &x->x_obj.ob_pd, gensym("signal"), gensym("signal"));
+
+ return (void *)x;
+}
+
#ifdef __cplusplus
@@ -296,25 +493,46 @@ extern "C"
#endif
-void pdp_rawout_setup(void)
-{
+
+#define COMMON(base_class)\
+ class_addmethod(base_class, (t_method)rawout_open, gensym("open"), A_DEFSYMBOL, A_NULL);\
+ class_addmethod(base_class, (t_method)rawout_close, gensym("close"), A_NULL);\
+ class_addmethod(base_class, (t_method)rawout_verbose, gensym("verbose"), A_FLOAT, A_NULL);\
+ class_addmethod(base_class, (t_method)rawout_queuesize, gensym("bufsize"), A_FLOAT, A_NULL);
+
+void pdp_rawout_setup(void){
+
int i;
+ /* PACKETS */
+
/* create a standard pd class: [pdp_rawout pipe type] */
rawout_class = class_new(gensym("pdp_rawout"), (t_newmethod)rawout_new,
- (t_method)rawout_free, sizeof(t_rawout), 0, A_DEFSYMBOL, A_DEFSYMBOL, A_NULL);
+ (t_method)rawout_free, sizeof(t_rawout), 0, A_DEFSYMBOL, A_NULL);
/* add global message handler */
- class_addmethod(rawout_class, (t_method)pdp_in,
- gensym("pdp"), A_SYMBOL, A_FLOAT, A_NULL);
+ class_addmethod(rawout_class, (t_method)pdp_in, gensym("pdp"), A_SYMBOL, A_FLOAT, A_NULL);
+ COMMON(rawout_class);
+
+ /* DSP */
- class_addmethod(rawout_class, (t_method)rawout_type, gensym("type"), A_SYMBOL, A_NULL);
- class_addmethod(rawout_class, (t_method)rawout_open, gensym("open"), A_DEFSYMBOL, A_NULL);
- class_addmethod(rawout_class, (t_method)rawout_close, gensym("close"), A_NULL);
+ /* create a standard pd class: [pdp_rawout pipe type] */
+ rawout_dsp_class = class_new(gensym("pdp_rawout~"), (t_newmethod)rawout_dsp_new,
+ (t_method)rawout_free, sizeof(t_rawout), 0, A_DEFFLOAT, A_DEFSYMBOL, A_NULL);
+ /* add signal input */
+ CLASS_MAINSIGNALIN(rawout_dsp_class, t_rawout, x_f);
+ class_addmethod(rawout_dsp_class, (t_method)rawout_dsp, gensym("dsp"), 0);
+ COMMON(rawout_dsp_class);
}
+
+
+
+
+
+
#ifdef __cplusplus
}
#endif