From 7591a024f184bd385d35583d19d86c1d5f2531ba Mon Sep 17 00:00:00 2001 From: Tom Schouten Date: Fri, 1 Sep 2006 13:45:31 +0000 Subject: pdp current darcs merge svn path=/trunk/externals/pdp/; revision=5816 --- modules/generic/pdp_rawout.c | 376 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 297 insertions(+), 79 deletions(-) (limited to 'modules/generic/pdp_rawout.c') diff --git a/modules/generic/pdp_rawout.c b/modules/generic/pdp_rawout.c index e1e9edf..01d7ca0 100644 --- a/modules/generic/pdp_rawout.c +++ b/modules/generic/pdp_rawout.c @@ -1,5 +1,5 @@ /* - * Pure Data Packet module. packet forth console + * Pure Data Packet module. Raw packet output. * Copyright (c) by Tom Schouten * * This program is free software; you can redistribute it and/or modify @@ -19,6 +19,31 @@ */ +/* + This is the most straightforward way to get data out of pdp. + The internals follow the simple reader/writer pattern + * writer: runs in pd thread, accepts packets from inlet and stores in queue + * reader: runs in own thread, reads packets from queue and writes to pipe + + Since there is no communication from reader to writer, we need a watchdog + timer to check the status of the writer. Mainly to close when necessary. + + To enable audio recording, pdp_rawout will also produce interleaved 16bit + audio. You will need to instantiate it with [pdp_rawout~ nbchans] + + + +*/ + +/* TODO: + + make audio buffer smaller (128 bytes writes is too heavy) + +*/ + + + +#include #include #include #include @@ -39,10 +64,9 @@ #define D if (1) -#define MAX_QUEUESIZE 4 +#define DEF_QUEUESIZE 100 #define PIPE_BLOCKSIZE 4096 - - +#define POLLTIME 20 /* raw input from a unix pipe */ @@ -51,11 +75,19 @@ typedef struct rawout_struct { /* pd */ t_object x_obj; + t_float x_f; + //t_outlet *x_outlet; t_outlet *x_sync_outlet; /* comm */ - t_pdp_list *x_queue; // packet queue + t_pdp_list *x_queue; // packet queue + t_clock *x_clock; // watchdog timer + t_float x_deltime; // watchdog period + int x_verbose; + int x_packet; // dsp fillup state + int x_countdown; // amount of packet filled up + short int * x_needle; // start writing here /* thread */ pthread_mutex_t x_mut; @@ -68,8 +100,10 @@ typedef struct rawout_struct int x_done; // 1-> writer thread has exited /* config */ - t_symbol *x_pipe; - t_pdp_symbol *x_type; + t_symbol *x_pipe; + int x_chans; // nb audio channels + t_pdp_symbol *x_tmptype; + int x_max_queuesize; // buffer size ( < 0 = infty ) } t_rawout; @@ -78,55 +112,41 @@ static inline void lock(t_rawout *x){pthread_mutex_lock(&x->x_mut);} static inline void unlock(t_rawout *x){pthread_mutex_unlock(&x->x_mut);} static void rawout_close(t_rawout *x); -static void pdp_in(t_rawout *x, t_symbol *s, t_float f) -{ - /* save packet to pdp queue, if size is smaller than maxsize */ - if (s == S_REGISTER_RO){ - if (x->x_queue->elements < MAX_QUEUESIZE){ - int p = (int)f; - p = pdp_packet_copy_ro(p); - if (p != -1){ - lock(x); - pdp_list_add_back(x->x_queue, a_packet, (t_pdp_word)p); - unlock(x); - } - } - else { - pdp_post("pdp_rawout: dropping packet: (queue full)", MAX_QUEUESIZE); - } - - } - - /* check if thread is done */ - if (x->x_done) rawout_close(x); - -} - +/* READER THREAD: reads from queue, writes to pipe */ static void *rawout_thread(void *y) { - int pipe; + int pipefd; int packet = -1; t_rawout *x = (t_rawout *)y; int period_sec; int period_usec; - sigset_t sigvec; /* signal handling */ + sigset_t _sigvec; /* signal handling */ + + char me[1024]; + sprintf(me, "pdp_rawout: %s", x->x_pipe->s_name); /* ignore pipe signal */ - sigemptyset(&sigvec); - sigaddset(&sigvec,SIGPIPE); - pthread_sigmask(SIG_BLOCK, &sigvec, 0); + sigemptyset(&_sigvec); + sigaddset(&_sigvec,SIGPIPE); + pthread_sigmask(SIG_BLOCK, &_sigvec, 0); //D pdp_post("pipe: %s", x->x_pipe->s_name); //D pdp_post("type: %s", x->x_type->s_name); + /* open pipe */ - if (-1 == (pipe = open(x->x_pipe->s_name, O_WRONLY|O_NONBLOCK))){ - perror(x->x_pipe->s_name); - goto exit; + if (-1 == (pipefd = open(x->x_pipe->s_name, O_WRONLY|O_NONBLOCK|O_APPEND))){ + if (-1 == (pipefd = open(x->x_pipe->s_name, O_WRONLY|O_CREAT))){ + perror(me); + goto exit; + } } + pdp_post("pdp_rawout: opened %s", x->x_pipe->s_name); + + /* main loop (packets) */ while(1){ void *data = 0; @@ -147,10 +167,15 @@ static void *rawout_thread(void *y) lock(x); packet = pdp_list_pop(x->x_queue).w_packet; unlock(x); - + /* send packet */ + //t_pdp *h = pdp_packet_header(packet); + //fprintf(stderr, "users %d\n", h->users); + data = pdp_packet_data(packet); left = pdp_packet_data_size(packet); + //int i; for (i = 0; ix_pipe->s_name); - if (bytes != EAGAIN) goto close; + perror(me); + if (errno != EAGAIN) goto close; } /* or update pointers */ else{ @@ -200,7 +226,7 @@ static void *rawout_thread(void *y) close: /* close pipe */ - close(pipe); + close(pipefd); exit: @@ -209,16 +235,121 @@ static void *rawout_thread(void *y) } +/* DSP INPUT */ +#define DSP_ARG(type, name, source) type name = (type)source -static void rawout_type(t_rawout *x, t_symbol *type) +static t_int *rawout_perform(t_int *w); +static void rawout_dsp(t_rawout *x, t_signal **sp){ + int nargs = 2 + x->x_chans; + t_int args[nargs]; + args[0] = (int)x; + args[1] = (int)sp[0]->s_n; + float **in = (float **)(args+2); + int i; + for (i=0; ix_chans; i++) in[i] = sp[i]->s_vec; + dsp_addv(rawout_perform, nargs, args); +} + +static t_int *rawout_perform(t_int *w) { - x->x_type = pdp_gensym(type->s_name); + DSP_ARG(t_rawout*, x, w[1]); + DSP_ARG(t_int, n, w[2]); + DSP_ARG(t_float**, in, &w[3]); + + short int *out; + int i,c,k; + + + if (x->x_queue->elements >= x->x_max_queuesize){ + // drop + if (x->x_verbose && x->x_active) pdp_post_n("."); + } + else { + + // create packet + if (x->x_countdown) { + out = x->x_needle; + } + else { + int p = pdp_factory_newpacket(x->x_tmptype); + pdp_packet_mark_unused(x->x_packet); + // if (-1 == p) pdp_post("pdp_rawout~: can't create packet"); + x->x_needle = out = (short int *)pdp_packet_data(p); + x->x_packet = p; + x->x_countdown = pdp_packet_data_size(p) / 2; + } + + //pdp_post("data size = %d bytes", pdp_packet_data_size(p)); + + //memset(out, 0, pdp_packet_data_size(p)); + + + // convert & interleave + for (k=0,i=0; ix_chans; c++,k++){ + float val = (in[c])[i]; + val *= (float)((1<<15)-1); + out[k] = (short int)(val); + //out[k] = 0x1234; + //fprintf(stderr, "(%d,%d,%d) %d\n", c, i, k, (int)out[k]); + } + } + + x->x_needle += k; + x->x_countdown -= k; + if (!x->x_countdown){ + // transfer + lock(x); + pdp_list_add_back(x->x_queue, a_packet, (t_pdp_word)x->x_packet); + x->x_packet = -1; + unlock(x); + } + } + + return w+3+x->x_chans; } -static void rawout_open(t_rawout *x, t_symbol *pipe) + +/* PACKET INPUT */ + + +static void pdp_in(t_rawout *x, t_symbol *s, t_float f) +{ + /* save packet to pdp queue, if size is smaller than maxsize */ + if (s == S_REGISTER_RO){ + if (x->x_queue->elements < x->x_max_queuesize){ + int p = (int)f; + p = pdp_packet_copy_ro(p); + if (p != -1){ + lock(x); + pdp_list_add_back(x->x_queue, a_packet, (t_pdp_word)p); + unlock(x); + } + } + else { + //pdp_post("pdp_rawout: dropping packet: (queue full)", x->x_max_queuesize); + if (x->x_active && x->x_verbose) pdp_post_n("."); + } + + } + + /* check if thread is done */ + if (x->x_done) rawout_close(x); + +} + +/* CONTROL */ + +//static void rawout_type(t_rawout *x, t_symbol *type) +//{ + //x->x_type = pdp_gensym(type->s_name); +//} + +static void clear_queue(t_rawout *x); +static void rawout_open(t_rawout *x, t_symbol *spipe) { /* save pipe name if not empty */ - if (pipe->s_name[0]) {x->x_pipe = pipe;} + if (spipe->s_name[0]) {x->x_pipe = spipe;} if (x->x_active) { pdp_post("already open"); @@ -227,67 +358,133 @@ static void rawout_open(t_rawout *x, t_symbol *pipe) /* start thread */ x->x_giveup = 0; x->x_done = 0; + clear_queue(x); pthread_create(&x->x_thread, &x->x_attr, rawout_thread , x); x->x_active = 1; } -static void rawout_close(t_rawout *x) -{ - +static void rawout_stopthread(t_rawout *x){ if (!x->x_active) return; /* stop thread: set giveup + wait */ x->x_giveup = 1; pthread_join(x->x_thread, NULL); x->x_active = 0; + clear_queue(x); +} + +static void rawout_close(t_rawout *x) +{ + if (!x->x_active) return; + rawout_stopthread(x); /* notify */ outlet_bang(x->x_sync_outlet); - pdp_post("connection to %s closed", x->x_pipe->s_name); - + pdp_post("pdp_rawout: closed %s", x->x_pipe->s_name); +} +static void rawout_verbose(t_rawout *x, t_float fverbose){ + x->x_verbose = (int)fverbose; +} +static void rawout_tick(t_rawout *x){ + if (x->x_done) rawout_close(x); + clock_delay(x->x_clock, x->x_deltime); +} - +static void clear_queue(t_rawout *x){ + lock(x); + while(x->x_queue->elements){ + pdp_packet_mark_unused(pdp_list_pop(x->x_queue).w_packet); + } + unlock(x); +} + +#define MAXINT (0x80 << ((sizeof(int)-1)*8)) +static void rawout_queuesize(t_rawout *x, t_float fbufsize){ + int bufsize = (int)fbufsize; + if (fbufsize < 1) fbufsize = MAXINT; + x->x_max_queuesize = bufsize; } static void rawout_free(t_rawout *x) { - rawout_close(x); - pdp_tree_strip_packets(x->x_queue); + // terminate thread + rawout_stopthread(x); + + // cleanup + clock_unset(x->x_clock); + clear_queue(x); pdp_tree_free(x->x_queue); + pdp_packet_mark_unused(x->x_packet); } t_class *rawout_class; +t_class *rawout_dsp_class; - -static void *rawout_new(t_symbol *pipe, t_symbol *type) -{ - t_rawout *x; - - pdp_post("%s %s", pipe->s_name, type->s_name); - - /* allocate & init */ - x = (t_rawout *)pd_new(rawout_class); +// shared stuff +static void rawout_init(t_rawout *x, t_symbol *spipe){ //x->x_outlet = outlet_new(&x->x_obj, &s_anything); x->x_sync_outlet = outlet_new(&x->x_obj, &s_anything); x->x_queue = pdp_list_new(0); x->x_active = 0; x->x_giveup = 0; x->x_done = 0; - x->x_type = pdp_gensym("image/YCrCb/320x240"); //default + x->x_packet = -1; + x->x_countdown = 0; + x->x_max_queuesize = DEF_QUEUESIZE; + //x->x_type = pdp_gensym("image/YCrCb/320x240"); //default x->x_pipe = gensym("/tmp/pdpraw"); // default + x->x_deltime = POLLTIME; + x->x_clock = clock_new(x, (t_method)rawout_tick); pthread_attr_init(&x->x_attr); pthread_mutex_init(&x->x_mut, NULL); /* args */ - rawout_type(x, type); - if (pipe->s_name[0]) x->x_pipe = pipe; + //rawout_type(x, type); + if (spipe && spipe->s_name[0]) x->x_pipe = spipe; + rawout_tick(x); + rawout_verbose(x,0); +} +// [pdp_rawout] +static void *rawout_new(t_symbol *spipe /* , t_symbol *type */) +{ + t_rawout *x; + /* allocate & init */ + x = (t_rawout *)pd_new(rawout_class); + rawout_init(x, spipe); return (void *)x; } +// [pdp_rawout~] + + +// HUH??? why do i get the symbol first, then the float???? +// see http://lists.puredata.info/pipermail/pd-dev/2003-09/001618.html +//static void *rawout_dsp_new(t_float fchans, t_symbol *spipe){ +static void *rawout_dsp_new(t_symbol *spipe, t_float fchans){ + int chans = (int)fchans; + if (chans < 1) chans = 1; + if (chans > 64) return 0; // this is just a safety measure + + t_rawout *x = (t_rawout *)pd_new(rawout_dsp_class); + rawout_init(x, spipe); + + // hack: temp packet + char temp_packet[1024]; + sprintf(temp_packet, "image/grey/256x%d", 8 * chans); + pdp_post("pdp_rawout: using fake packet %s", temp_packet); + x->x_tmptype = pdp_gensym(temp_packet); + + // create audio inlets + x->x_chans = chans; + while (--chans) inlet_new(&x->x_obj, &x->x_obj.ob_pd, gensym("signal"), gensym("signal")); + + return (void *)x; +} + #ifdef __cplusplus @@ -296,25 +493,46 @@ extern "C" #endif -void pdp_rawout_setup(void) -{ + +#define COMMON(base_class)\ + class_addmethod(base_class, (t_method)rawout_open, gensym("open"), A_DEFSYMBOL, A_NULL);\ + class_addmethod(base_class, (t_method)rawout_close, gensym("close"), A_NULL);\ + class_addmethod(base_class, (t_method)rawout_verbose, gensym("verbose"), A_FLOAT, A_NULL);\ + class_addmethod(base_class, (t_method)rawout_queuesize, gensym("bufsize"), A_FLOAT, A_NULL); + +void pdp_rawout_setup(void){ + int i; + /* PACKETS */ + /* create a standard pd class: [pdp_rawout pipe type] */ rawout_class = class_new(gensym("pdp_rawout"), (t_newmethod)rawout_new, - (t_method)rawout_free, sizeof(t_rawout), 0, A_DEFSYMBOL, A_DEFSYMBOL, A_NULL); + (t_method)rawout_free, sizeof(t_rawout), 0, A_DEFSYMBOL, A_NULL); /* add global message handler */ - class_addmethod(rawout_class, (t_method)pdp_in, - gensym("pdp"), A_SYMBOL, A_FLOAT, A_NULL); + class_addmethod(rawout_class, (t_method)pdp_in, gensym("pdp"), A_SYMBOL, A_FLOAT, A_NULL); + COMMON(rawout_class); + + /* DSP */ - class_addmethod(rawout_class, (t_method)rawout_type, gensym("type"), A_SYMBOL, A_NULL); - class_addmethod(rawout_class, (t_method)rawout_open, gensym("open"), A_DEFSYMBOL, A_NULL); - class_addmethod(rawout_class, (t_method)rawout_close, gensym("close"), A_NULL); + /* create a standard pd class: [pdp_rawout pipe type] */ + rawout_dsp_class = class_new(gensym("pdp_rawout~"), (t_newmethod)rawout_dsp_new, + (t_method)rawout_free, sizeof(t_rawout), 0, A_DEFFLOAT, A_DEFSYMBOL, A_NULL); + /* add signal input */ + CLASS_MAINSIGNALIN(rawout_dsp_class, t_rawout, x_f); + class_addmethod(rawout_dsp_class, (t_method)rawout_dsp, gensym("dsp"), 0); + COMMON(rawout_dsp_class); } + + + + + + #ifdef __cplusplus } #endif -- cgit v1.2.1