diff options
author | Tom Schouten <doelie@users.sourceforge.net> | 2006-09-01 13:45:31 +0000 |
---|---|---|
committer | Tom Schouten <doelie@users.sourceforge.net> | 2006-09-01 13:45:31 +0000 |
commit | 7591a024f184bd385d35583d19d86c1d5f2531ba (patch) | |
tree | 77aa0c44ccb700eb9a2b16e1b246e3c8026c40ed /modules/generic | |
parent | 91dd6b68f0f209ad015a303095bb1df018dca71e (diff) |
pdp current darcs merge
svn path=/trunk/externals/pdp/; revision=5816
Diffstat (limited to 'modules/generic')
-rw-r--r-- | modules/generic/Makefile | 2 | ||||
-rw-r--r-- | modules/generic/pdp_del.c | 18 | ||||
-rw-r--r-- | modules/generic/pdp_rawin.c | 302 | ||||
-rw-r--r-- | modules/generic/pdp_rawout.c | 376 |
4 files changed, 503 insertions, 195 deletions
diff --git a/modules/generic/Makefile b/modules/generic/Makefile index 035f970..e264828 100644 --- a/modules/generic/Makefile +++ b/modules/generic/Makefile @@ -4,7 +4,7 @@ include ../../Makefile.config PDP_MOD = pdp_reg.o pdp_del.o pdp_snap.o pdp_trigger.o \ pdp_route.o pdp_inspect.o pdp_loop.o pdp_description.o pdp_convert.o \ - pdp_udp_send.o pdp_udp_receive.o pdp_rawin.o pdp_rawout.o + pdp_udp_send.o pdp_udp_receive.o pdp_rawin.o pdp_rawout.o pdp_metro.o # build generic modules all_modules: $(PDP_MOD) diff --git a/modules/generic/pdp_del.c b/modules/generic/pdp_del.c index 4b51023..6b7aeb2 100644 --- a/modules/generic/pdp_del.c +++ b/modules/generic/pdp_del.c @@ -71,7 +71,14 @@ static void pdp_del_input_0(t_pdp_del *x, t_symbol *s, t_floatarg f) else if (s == gensym("process")){ out = (((x->x_head + x->x_delay)) % x->x_order); packet = x->x_packet[out]; - pdp_packet_pass_if_valid(x->x_outlet0, &x->x_packet[out]); + + // originally, we wouldn't keep the packet in the delay line to save memory + // however, this behaviour is very annoying, and doesn't allow ''scratching'' + // so we send out a copy instead. + // pdp_packet_pass_if_valid(x->x_outlet0, &x->x_packet[out]); + int p = pdp_packet_copy_ro(packet); + pdp_packet_pass_if_valid(x->x_outlet0, &p); + /* if (-1 != packet){ @@ -88,6 +95,15 @@ static void pdp_del_input_0(t_pdp_del *x, t_symbol *s, t_floatarg f) */ x->x_head = (x->x_head + x->x_order - 1) % x->x_order; + +/* + int i; + for (i=0; i<x->x_order; i++){ + fprintf(stderr, " %d", x->x_packet[i]); + } + fprintf(stderr, "\n"); +*/ + } diff --git a/modules/generic/pdp_rawin.c b/modules/generic/pdp_rawin.c index 28ef8fb..04d8775 100644 --- a/modules/generic/pdp_rawin.c +++ b/modules/generic/pdp_rawin.c @@ -1,5 +1,5 @@ /* - * Pure Data Packet module. packet forth console + * Pure Data Packet module. Raw packet input * Copyright (c) by Tom Schouten <pdp@zzz.kotnet.org> * * This program is free software; you can redistribute it and/or modify @@ -19,6 +19,7 @@ */ +#include <string.h> #include <pthread.h> #include <stdio.h> #include <unistd.h> @@ -40,6 +41,55 @@ #define D if (1) +/* TODO: add flow control + +reminder: pthread condition values for synchro + +writing to a queue goes like this: +* pthread_mutex_lock +* atomic op (write) +* pthread_cond_signal +* pthread_mutex_unlock + +reading from a queue goes like this: +* pthread_mutex_lock +* while (!CONDITION) pthread_cond_wait +* atomic op (read) +* pthread_mutex_unlock + + +in this case, there is a reader and a writer, AND a maximum +size of the buffer between them (1 atom) so both reader and writer +might block. compare this to the 'command queue' in libpf, where +only the reader blocks. + +so the course of events is: + +READER +* wait for data ready (COND_READ) this blocks pd, it is assumed data is always ready in normal operation +* consume +* signal writer (COND_WRITE) + + +WRITER +* wait for space ready (COND_WRITE) +* write +* signal reader + + +one remark though: +is this machinery really necessary? +----------------------------------- + +it might be easier to just read in the pd thread. +the only problem this gives is when data really arrives not in a single chunk, but as a stream. +to me that seems a really awkward special case which can be solved using another PROCESS to +do the buffering / dropping. + +so, for now, it's just synchronous, no threads for sync reading. + +*/ + /* raw input from a unix pipe */ @@ -54,16 +104,18 @@ typedef struct rawin_struct /* comm */ t_pdp_list *x_queue; // packet queue + int x_pipefd; /* thread */ pthread_mutex_t x_mut; pthread_attr_t x_attr; pthread_t x_thread; + /* sync */ + int x_mode; // 1-> sync to input int x_giveup; // 1-> terminate reader thread int x_active; // 1-> reader thread is launched - int x_done; // 1-> reader thread has exited /* config */ t_symbol *x_pipe; @@ -76,138 +128,125 @@ static inline void lock(t_rawin *x){pthread_mutex_lock(&x->x_mut);} static inline void unlock(t_rawin *x){pthread_mutex_unlock(&x->x_mut);} static void rawin_close(t_rawin *x); -static void tick(t_rawin *x) -{ - /* send all packets in queue to outlet */ - lock(x); - while (x->x_queue->elements){ - outlet_pdp_atom(x->x_outlet, x->x_queue->first); - pdp_list_pop(x->x_queue); // pop stale reference - } - unlock(x); - clock_delay(x->x_clock, PERIOD); - /* check if thread is done */ - if (x->x_done) rawin_close(x); -} -static void move_current_to_queue(t_rawin *x, int packet) -{ - lock(x); - pdp_list_add_back(x->x_queue, a_packet, (t_pdp_word)packet); - unlock(x); -} -static void *rawin_thread(void *y) -{ - int pipe; +static int read_packet(t_rawin *x){ + int packet = -1; - t_rawin *x = (t_rawin *)y; + void *data = 0; + int left = -1; int period_sec; int period_usec; - - //D pdp_post("pipe: %s", x->x_pipe->s_name); - //D pdp_post("type: %s", x->x_type->s_name); - - /* open pipe */ - if (-1 == (pipe = open(x->x_pipe->s_name, O_RDONLY|O_NONBLOCK))){ - perror(x->x_pipe->s_name); - goto exit; + /* create packet */ + if (-1 != packet){ + pdp_post("WARNING: deleting stale packet"); + pdp_packet_mark_unused(packet); + } + packet = pdp_factory_newpacket(x->x_type); + if (-1 == packet){ + pdp_post("ERROR: can't create packet. type = %s", x->x_type->s_name); + goto close; } + + /* fill packet */ + data = pdp_packet_data(packet); + left = pdp_packet_data_size(packet); + // D pdp_post("packet %d, data %x, size %d", packet, data, left); + + /* inner loop: pipe reads */ + while(left){ - /* main loop (packets) */ - while(1){ - void *data = 0; - int left = -1; + fd_set inset; + struct timeval tv = {0,10000}; - /* create packet */ - if (-1 != packet){ - pdp_post("WARNING: deleting stale packet"); + /* check if we need to stop */ + if (x->x_giveup){ pdp_packet_mark_unused(packet); + goto close; } - packet = pdp_factory_newpacket(x->x_type); - if (-1 == packet){ - pdp_post("ERROR: can't create packet. type = %s", x->x_type->s_name); - goto exit; + /* select, with timeout */ + FD_ZERO(&inset); + FD_SET(x->x_pipefd, &inset); + if (-1 == select(x->x_pipefd+1, &inset, NULL,NULL, &tv)){ + pdp_post("select error"); + goto close; } - - /* fill packet */ - data = pdp_packet_data(packet); - left = pdp_packet_data_size(packet); - // D pdp_post("packet %d, data %x, size %d", packet, data, left); - - /* inner loop: pipe reads */ - while(left){ - - fd_set inset; - struct timeval tv = {0,10000}; - - /* check if we need to stop */ - if (x->x_giveup){ - pdp_packet_mark_unused(packet); - goto close; - } - /* select, with timeout */ - FD_ZERO(&inset); - FD_SET(pipe, &inset); - if (-1 == select(pipe+1, &inset, NULL,NULL, &tv)){ - pdp_post("select error"); - goto close; - } - /* if ready, read, else retry */ - if (FD_ISSET(pipe, &inset)){ - int bytes = read(pipe, data, left); - if (!bytes){ - /* if no bytes are read, pipe is closed */ - goto close; - } - data += bytes; - left -= bytes; + /* if ready, read, else retry */ + if (FD_ISSET(x->x_pipefd, &inset)){ + int bytes = read(x->x_pipefd, data, left); + if (!bytes){ + /* if no bytes are read, pipe is closed */ + goto close; } + data += bytes; + left -= bytes; } - - /* move to queue */ - move_current_to_queue(x, packet); - packet = -1; + } + return packet; + close: + return -1; +} - +/* reader thread syncs to pipe */ +static void *rawin_thread(void *y) +{ + t_rawin *x = (t_rawin *)y; + int packet = -1; + + /* loop until error or close */ + while (-1 != (packet = read_packet(x))) { + lock(x); + pdp_list_add_back(x->x_queue, a_packet, (t_pdp_word)packet); + unlock(x); } - close: - /* close pipe */ - close(pipe); - - - exit: - x->x_done = 1; return 0; } +/* sync to stream: + tick polls the receive queue */ +static void rawin_tick(t_rawin *x) +{ + int p = -1; + + /* send all packets in queue to outlet */ + while (x->x_queue->elements){ + lock(x); + //pdp_post("%d", x->x_queue->elements); + p = pdp_list_pop(x->x_queue).w_packet; + unlock(x); + //pdp_post_n("%d ", p); + pdp_packet_pass_if_valid(x->x_outlet, &p); + //pdp_post("%d",p); + } + clock_delay(x->x_clock, PERIOD); +} -static void rawin_type(t_rawin *x, t_symbol *type) -{ - x->x_type = pdp_gensym(type->s_name); +/* sync to bang: + this runs the reader in the pd thread + */ +static void rawin_bang(t_rawin *x){ + if (!x->x_active) return; + if (x->x_mode) return; + int packet = read_packet(x); + if (-1 == packet) rawin_close(x); // close on error + pdp_packet_pass_if_valid(x->x_outlet, &packet); + } -static void rawin_open(t_rawin *x, t_symbol *pipe) -{ - /* save pipe name if not empty */ - if (pipe->s_name[0]) {x->x_pipe = pipe;} - if (x->x_active) { - pdp_post("already open"); - return; - } - /* start thread */ - x->x_giveup = 0; - x->x_done = 0; - pthread_create(&x->x_thread, &x->x_attr, rawin_thread , x); - x->x_active = 1; + + + +static void rawin_type(t_rawin *x, t_symbol *type) +{ + x->x_type = pdp_gensym(type->s_name); } static void rawin_close(t_rawin *x) @@ -217,19 +256,52 @@ static void rawin_close(t_rawin *x) /* stop thread: set giveup + wait */ x->x_giveup = 1; - pthread_join(x->x_thread, NULL); + if (x->x_mode) pthread_join(x->x_thread, NULL); x->x_active = 0; + /* close pipe */ + close(x->x_pipefd); + /* notify */ outlet_bang(x->x_sync_outlet); - pdp_post("connection to %s closed", x->x_pipe->s_name); - + pdp_post("pdp_rawin: connection to %s closed", x->x_pipe->s_name); +} + + +static void rawin_open(t_rawin *x, t_symbol *spipe) +{ + /* save pipe name if not empty */ + if (spipe->s_name[0]) {x->x_pipe = spipe;} + if (x->x_active) { + pdp_post("pdp_rawin: already open"); + return; + } + /* open pipe */ + if (-1 == (x->x_pipefd = open(x->x_pipe->s_name, O_RDONLY|O_NONBLOCK))){ + perror(x->x_pipe->s_name); + return; + } + + /* thread control vars */ + x->x_giveup = 0; + x->x_active = 1; + + /* start thread if sync mode */ + if (x->x_mode) + pthread_create(&x->x_thread, &x->x_attr, rawin_thread , x); } + +static void rawin_sync(t_rawin *x, t_float fmode){ + rawin_close(x); + x->x_mode = (int)fmode; +} + + static void rawin_free(t_rawin *x) { rawin_close(x); @@ -241,21 +313,21 @@ static void rawin_free(t_rawin *x) t_class *rawin_class; -static void *rawin_new(t_symbol *pipe, t_symbol *type) +static void *rawin_new(t_symbol *spipe, t_symbol *type) { t_rawin *x; - pdp_post("%s %s", pipe->s_name, type->s_name); + pdp_post("%s %s", spipe->s_name, type->s_name); /* allocate & init */ x = (t_rawin *)pd_new(rawin_class); x->x_outlet = outlet_new(&x->x_obj, &s_anything); x->x_sync_outlet = outlet_new(&x->x_obj, &s_anything); - x->x_clock = clock_new(x, (t_method)tick); + x->x_clock = clock_new(x, (t_method)rawin_tick); x->x_queue = pdp_list_new(0); x->x_active = 0; x->x_giveup = 0; - x->x_done = 0; + x->x_mode = 0; x->x_type = pdp_gensym("image/YCrCb/320x240"); //default x->x_pipe = gensym("/tmp/pdpraw"); // default pthread_attr_init(&x->x_attr); @@ -264,7 +336,7 @@ static void *rawin_new(t_symbol *pipe, t_symbol *type) /* args */ rawin_type(x, type); - if (pipe->s_name[0]) x->x_pipe = pipe; + if (spipe->s_name[0]) x->x_pipe = spipe; return (void *)x; @@ -286,10 +358,12 @@ void pdp_rawin_setup(void) rawin_class = class_new(gensym("pdp_rawin"), (t_newmethod)rawin_new, (t_method)rawin_free, sizeof(t_rawin), 0, A_DEFSYMBOL, A_DEFSYMBOL, A_NULL); - /* add global message handler */ + /* add global message handlers */ class_addmethod(rawin_class, (t_method)rawin_type, gensym("type"), A_SYMBOL, A_NULL); class_addmethod(rawin_class, (t_method)rawin_open, gensym("open"), A_DEFSYMBOL, A_NULL); class_addmethod(rawin_class, (t_method)rawin_close, gensym("close"), A_NULL); + class_addmethod(rawin_class, (t_method)rawin_sync, gensym("sync"), A_FLOAT, A_NULL); + class_addmethod(rawin_class, (t_method)rawin_bang, gensym("bang"), A_NULL); } diff --git a/modules/generic/pdp_rawout.c b/modules/generic/pdp_rawout.c index e1e9edf..01d7ca0 100644 --- a/modules/generic/pdp_rawout.c +++ b/modules/generic/pdp_rawout.c @@ -1,5 +1,5 @@ /* - * Pure Data Packet module. packet forth console + * Pure Data Packet module. Raw packet output. * Copyright (c) by Tom Schouten <pdp@zzz.kotnet.org> * * This program is free software; you can redistribute it and/or modify @@ -19,6 +19,31 @@ */ +/* + This is the most straightforward way to get data out of pdp. + The internals follow the simple reader/writer pattern + * writer: runs in pd thread, accepts packets from inlet and stores in queue + * reader: runs in own thread, reads packets from queue and writes to pipe + + Since there is no communication from reader to writer, we need a watchdog + timer to check the status of the writer. Mainly to close when necessary. + + To enable audio recording, pdp_rawout will also produce interleaved 16bit + audio. You will need to instantiate it with [pdp_rawout~ nbchans] + + + +*/ + +/* TODO: + + make audio buffer smaller (128 bytes writes is too heavy) + +*/ + + + +#include <string.h> #include <pthread.h> #include <stdio.h> #include <unistd.h> @@ -39,10 +64,9 @@ #define D if (1) -#define MAX_QUEUESIZE 4 +#define DEF_QUEUESIZE 100 #define PIPE_BLOCKSIZE 4096 - - +#define POLLTIME 20 /* raw input from a unix pipe */ @@ -51,11 +75,19 @@ typedef struct rawout_struct { /* pd */ t_object x_obj; + t_float x_f; + //t_outlet *x_outlet; t_outlet *x_sync_outlet; /* comm */ - t_pdp_list *x_queue; // packet queue + t_pdp_list *x_queue; // packet queue + t_clock *x_clock; // watchdog timer + t_float x_deltime; // watchdog period + int x_verbose; + int x_packet; // dsp fillup state + int x_countdown; // amount of packet filled up + short int * x_needle; // start writing here /* thread */ pthread_mutex_t x_mut; @@ -68,8 +100,10 @@ typedef struct rawout_struct int x_done; // 1-> writer thread has exited /* config */ - t_symbol *x_pipe; - t_pdp_symbol *x_type; + t_symbol *x_pipe; + int x_chans; // nb audio channels + t_pdp_symbol *x_tmptype; + int x_max_queuesize; // buffer size ( < 0 = infty ) } t_rawout; @@ -78,55 +112,41 @@ static inline void lock(t_rawout *x){pthread_mutex_lock(&x->x_mut);} static inline void unlock(t_rawout *x){pthread_mutex_unlock(&x->x_mut);} static void rawout_close(t_rawout *x); -static void pdp_in(t_rawout *x, t_symbol *s, t_float f) -{ - /* save packet to pdp queue, if size is smaller than maxsize */ - if (s == S_REGISTER_RO){ - if (x->x_queue->elements < MAX_QUEUESIZE){ - int p = (int)f; - p = pdp_packet_copy_ro(p); - if (p != -1){ - lock(x); - pdp_list_add_back(x->x_queue, a_packet, (t_pdp_word)p); - unlock(x); - } - } - else { - pdp_post("pdp_rawout: dropping packet: (queue full)", MAX_QUEUESIZE); - } - - } - - /* check if thread is done */ - if (x->x_done) rawout_close(x); - -} - +/* READER THREAD: reads from queue, writes to pipe */ static void *rawout_thread(void *y) { - int pipe; + int pipefd; int packet = -1; t_rawout *x = (t_rawout *)y; int period_sec; int period_usec; - sigset_t sigvec; /* signal handling */ + sigset_t _sigvec; /* signal handling */ + + char me[1024]; + sprintf(me, "pdp_rawout: %s", x->x_pipe->s_name); /* ignore pipe signal */ - sigemptyset(&sigvec); - sigaddset(&sigvec,SIGPIPE); - pthread_sigmask(SIG_BLOCK, &sigvec, 0); + sigemptyset(&_sigvec); + sigaddset(&_sigvec,SIGPIPE); + pthread_sigmask(SIG_BLOCK, &_sigvec, 0); //D pdp_post("pipe: %s", x->x_pipe->s_name); //D pdp_post("type: %s", x->x_type->s_name); + /* open pipe */ - if (-1 == (pipe = open(x->x_pipe->s_name, O_WRONLY|O_NONBLOCK))){ - perror(x->x_pipe->s_name); - goto exit; + if (-1 == (pipefd = open(x->x_pipe->s_name, O_WRONLY|O_NONBLOCK|O_APPEND))){ + if (-1 == (pipefd = open(x->x_pipe->s_name, O_WRONLY|O_CREAT))){ + perror(me); + goto exit; + } } + pdp_post("pdp_rawout: opened %s", x->x_pipe->s_name); + + /* main loop (packets) */ while(1){ void *data = 0; @@ -147,10 +167,15 @@ static void *rawout_thread(void *y) lock(x); packet = pdp_list_pop(x->x_queue).w_packet; unlock(x); - + /* send packet */ + //t_pdp *h = pdp_packet_header(packet); + //fprintf(stderr, "users %d\n", h->users); + data = pdp_packet_data(packet); left = pdp_packet_data_size(packet); + //int i; for (i = 0; i<left/2; i++) fprintf(stderr, "%06x ", ((short int *)data)[i]); fprintf(stderr, "\n"); + /* inner loop: pipe reads */ while(left){ @@ -166,19 +191,20 @@ static void *rawout_thread(void *y) /* select, with timeout */ FD_ZERO(&outset); - FD_SET(pipe, &outset); - if (-1 == select(pipe+1, NULL, &outset, NULL, &tv)){ + FD_SET(pipefd, &outset); + if (-1 == select(pipefd+1, NULL, &outset, NULL, &tv)){ pdp_post("select error"); goto close; } /* if ready, read, else retry */ - if (FD_ISSET(pipe, &outset)){ - int bytes = write(pipe, data, left); + if (FD_ISSET(pipefd, &outset)){ + + int bytes = write(pipefd, data, left); /* handle errors */ if (bytes <= 0){ - perror(x->x_pipe->s_name); - if (bytes != EAGAIN) goto close; + perror(me); + if (errno != EAGAIN) goto close; } /* or update pointers */ else{ @@ -200,7 +226,7 @@ static void *rawout_thread(void *y) close: /* close pipe */ - close(pipe); + close(pipefd); exit: @@ -209,16 +235,121 @@ static void *rawout_thread(void *y) } +/* DSP INPUT */ +#define DSP_ARG(type, name, source) type name = (type)source -static void rawout_type(t_rawout *x, t_symbol *type) +static t_int *rawout_perform(t_int *w); +static void rawout_dsp(t_rawout *x, t_signal **sp){ + int nargs = 2 + x->x_chans; + t_int args[nargs]; + args[0] = (int)x; + args[1] = (int)sp[0]->s_n; + float **in = (float **)(args+2); + int i; + for (i=0; i<x->x_chans; i++) in[i] = sp[i]->s_vec; + dsp_addv(rawout_perform, nargs, args); +} + +static t_int *rawout_perform(t_int *w) { - x->x_type = pdp_gensym(type->s_name); + DSP_ARG(t_rawout*, x, w[1]); + DSP_ARG(t_int, n, w[2]); + DSP_ARG(t_float**, in, &w[3]); + + short int *out; + int i,c,k; + + + if (x->x_queue->elements >= x->x_max_queuesize){ + // drop + if (x->x_verbose && x->x_active) pdp_post_n("."); + } + else { + + // create packet + if (x->x_countdown) { + out = x->x_needle; + } + else { + int p = pdp_factory_newpacket(x->x_tmptype); + pdp_packet_mark_unused(x->x_packet); + // if (-1 == p) pdp_post("pdp_rawout~: can't create packet"); + x->x_needle = out = (short int *)pdp_packet_data(p); + x->x_packet = p; + x->x_countdown = pdp_packet_data_size(p) / 2; + } + + //pdp_post("data size = %d bytes", pdp_packet_data_size(p)); + + //memset(out, 0, pdp_packet_data_size(p)); + + + // convert & interleave + for (k=0,i=0; i<n; i++){ + for (c=0; c<x->x_chans; c++,k++){ + float val = (in[c])[i]; + val *= (float)((1<<15)-1); + out[k] = (short int)(val); + //out[k] = 0x1234; + //fprintf(stderr, "(%d,%d,%d) %d\n", c, i, k, (int)out[k]); + } + } + + x->x_needle += k; + x->x_countdown -= k; + if (!x->x_countdown){ + // transfer + lock(x); + pdp_list_add_back(x->x_queue, a_packet, (t_pdp_word)x->x_packet); + x->x_packet = -1; + unlock(x); + } + } + + return w+3+x->x_chans; } -static void rawout_open(t_rawout *x, t_symbol *pipe) + +/* PACKET INPUT */ + + +static void pdp_in(t_rawout *x, t_symbol *s, t_float f) +{ + /* save packet to pdp queue, if size is smaller than maxsize */ + if (s == S_REGISTER_RO){ + if (x->x_queue->elements < x->x_max_queuesize){ + int p = (int)f; + p = pdp_packet_copy_ro(p); + if (p != -1){ + lock(x); + pdp_list_add_back(x->x_queue, a_packet, (t_pdp_word)p); + unlock(x); + } + } + else { + //pdp_post("pdp_rawout: dropping packet: (queue full)", x->x_max_queuesize); + if (x->x_active && x->x_verbose) pdp_post_n("."); + } + + } + + /* check if thread is done */ + if (x->x_done) rawout_close(x); + +} + +/* CONTROL */ + +//static void rawout_type(t_rawout *x, t_symbol *type) +//{ + //x->x_type = pdp_gensym(type->s_name); +//} + +static void clear_queue(t_rawout *x); +static void rawout_open(t_rawout *x, t_symbol *spipe) { /* save pipe name if not empty */ - if (pipe->s_name[0]) {x->x_pipe = pipe;} + if (spipe->s_name[0]) {x->x_pipe = spipe;} if (x->x_active) { pdp_post("already open"); @@ -227,67 +358,133 @@ static void rawout_open(t_rawout *x, t_symbol *pipe) /* start thread */ x->x_giveup = 0; x->x_done = 0; + clear_queue(x); pthread_create(&x->x_thread, &x->x_attr, rawout_thread , x); x->x_active = 1; } -static void rawout_close(t_rawout *x) -{ - +static void rawout_stopthread(t_rawout *x){ if (!x->x_active) return; /* stop thread: set giveup + wait */ x->x_giveup = 1; pthread_join(x->x_thread, NULL); x->x_active = 0; + clear_queue(x); +} + +static void rawout_close(t_rawout *x) +{ + if (!x->x_active) return; + rawout_stopthread(x); /* notify */ outlet_bang(x->x_sync_outlet); - pdp_post("connection to %s closed", x->x_pipe->s_name); - + pdp_post("pdp_rawout: closed %s", x->x_pipe->s_name); +} +static void rawout_verbose(t_rawout *x, t_float fverbose){ + x->x_verbose = (int)fverbose; +} +static void rawout_tick(t_rawout *x){ + if (x->x_done) rawout_close(x); + clock_delay(x->x_clock, x->x_deltime); +} - +static void clear_queue(t_rawout *x){ + lock(x); + while(x->x_queue->elements){ + pdp_packet_mark_unused(pdp_list_pop(x->x_queue).w_packet); + } + unlock(x); +} + +#define MAXINT (0x80 << ((sizeof(int)-1)*8)) +static void rawout_queuesize(t_rawout *x, t_float fbufsize){ + int bufsize = (int)fbufsize; + if (fbufsize < 1) fbufsize = MAXINT; + x->x_max_queuesize = bufsize; } static void rawout_free(t_rawout *x) { - rawout_close(x); - pdp_tree_strip_packets(x->x_queue); + // terminate thread + rawout_stopthread(x); + + // cleanup + clock_unset(x->x_clock); + clear_queue(x); pdp_tree_free(x->x_queue); + pdp_packet_mark_unused(x->x_packet); } t_class *rawout_class; +t_class *rawout_dsp_class; - -static void *rawout_new(t_symbol *pipe, t_symbol *type) -{ - t_rawout *x; - - pdp_post("%s %s", pipe->s_name, type->s_name); - - /* allocate & init */ - x = (t_rawout *)pd_new(rawout_class); +// shared stuff +static void rawout_init(t_rawout *x, t_symbol *spipe){ //x->x_outlet = outlet_new(&x->x_obj, &s_anything); x->x_sync_outlet = outlet_new(&x->x_obj, &s_anything); x->x_queue = pdp_list_new(0); x->x_active = 0; x->x_giveup = 0; x->x_done = 0; - x->x_type = pdp_gensym("image/YCrCb/320x240"); //default + x->x_packet = -1; + x->x_countdown = 0; + x->x_max_queuesize = DEF_QUEUESIZE; + //x->x_type = pdp_gensym("image/YCrCb/320x240"); //default x->x_pipe = gensym("/tmp/pdpraw"); // default + x->x_deltime = POLLTIME; + x->x_clock = clock_new(x, (t_method)rawout_tick); pthread_attr_init(&x->x_attr); pthread_mutex_init(&x->x_mut, NULL); /* args */ - rawout_type(x, type); - if (pipe->s_name[0]) x->x_pipe = pipe; + //rawout_type(x, type); + if (spipe && spipe->s_name[0]) x->x_pipe = spipe; + rawout_tick(x); + rawout_verbose(x,0); +} +// [pdp_rawout] +static void *rawout_new(t_symbol *spipe /* , t_symbol *type */) +{ + t_rawout *x; + /* allocate & init */ + x = (t_rawout *)pd_new(rawout_class); + rawout_init(x, spipe); return (void *)x; } +// [pdp_rawout~] + + +// HUH??? why do i get the symbol first, then the float???? +// see http://lists.puredata.info/pipermail/pd-dev/2003-09/001618.html +//static void *rawout_dsp_new(t_float fchans, t_symbol *spipe){ +static void *rawout_dsp_new(t_symbol *spipe, t_float fchans){ + int chans = (int)fchans; + if (chans < 1) chans = 1; + if (chans > 64) return 0; // this is just a safety measure + + t_rawout *x = (t_rawout *)pd_new(rawout_dsp_class); + rawout_init(x, spipe); + + // hack: temp packet + char temp_packet[1024]; + sprintf(temp_packet, "image/grey/256x%d", 8 * chans); + pdp_post("pdp_rawout: using fake packet %s", temp_packet); + x->x_tmptype = pdp_gensym(temp_packet); + + // create audio inlets + x->x_chans = chans; + while (--chans) inlet_new(&x->x_obj, &x->x_obj.ob_pd, gensym("signal"), gensym("signal")); + + return (void *)x; +} + #ifdef __cplusplus @@ -296,25 +493,46 @@ extern "C" #endif -void pdp_rawout_setup(void) -{ + +#define COMMON(base_class)\ + class_addmethod(base_class, (t_method)rawout_open, gensym("open"), A_DEFSYMBOL, A_NULL);\ + class_addmethod(base_class, (t_method)rawout_close, gensym("close"), A_NULL);\ + class_addmethod(base_class, (t_method)rawout_verbose, gensym("verbose"), A_FLOAT, A_NULL);\ + class_addmethod(base_class, (t_method)rawout_queuesize, gensym("bufsize"), A_FLOAT, A_NULL); + +void pdp_rawout_setup(void){ + int i; + /* PACKETS */ + /* create a standard pd class: [pdp_rawout pipe type] */ rawout_class = class_new(gensym("pdp_rawout"), (t_newmethod)rawout_new, - (t_method)rawout_free, sizeof(t_rawout), 0, A_DEFSYMBOL, A_DEFSYMBOL, A_NULL); + (t_method)rawout_free, sizeof(t_rawout), 0, A_DEFSYMBOL, A_NULL); /* add global message handler */ - class_addmethod(rawout_class, (t_method)pdp_in, - gensym("pdp"), A_SYMBOL, A_FLOAT, A_NULL); + class_addmethod(rawout_class, (t_method)pdp_in, gensym("pdp"), A_SYMBOL, A_FLOAT, A_NULL); + COMMON(rawout_class); + + /* DSP */ - class_addmethod(rawout_class, (t_method)rawout_type, gensym("type"), A_SYMBOL, A_NULL); - class_addmethod(rawout_class, (t_method)rawout_open, gensym("open"), A_DEFSYMBOL, A_NULL); - class_addmethod(rawout_class, (t_method)rawout_close, gensym("close"), A_NULL); + /* create a standard pd class: [pdp_rawout pipe type] */ + rawout_dsp_class = class_new(gensym("pdp_rawout~"), (t_newmethod)rawout_dsp_new, + (t_method)rawout_free, sizeof(t_rawout), 0, A_DEFFLOAT, A_DEFSYMBOL, A_NULL); + /* add signal input */ + CLASS_MAINSIGNALIN(rawout_dsp_class, t_rawout, x_f); + class_addmethod(rawout_dsp_class, (t_method)rawout_dsp, gensym("dsp"), 0); + COMMON(rawout_dsp_class); } + + + + + + #ifdef __cplusplus } #endif |