/* * Pure Data Packet module. * Copyright (c) by Tom Schouten * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. * */ /* TODO: ADD THREAD SUPPORT */ /* this is the pd interface to forth processes / processors. in time, it should become the only interface between the processing functionality in pdp, and pd objects representing this functionality example object definitions: a forth process is a virtual machine operating on a private rotation stack, as found in hp rpn calcs (this is the machine state vector) the tick operation leaves the stack size and relative position of the elements invariant this is a very crude and simple framework. it is an experiment to see how far this forth thing can be pushed to solve the problem of dataflow processing in a simple way, by cutting out the fat. yes, chuck moore's ideas are viral.. a forth process has a setup code that will construct an initial stack template an object definition is a list of - list of symbolic input to stack location mappings (first = active) - list of symbolic output to stack location mappings - stack template init code (bootblock) - process code the stack is the machine memory the mapping between pdp's forth processors and pd object is handled by pdmaps. these can probably be reused to build other object oriented interfaces to the forth processors */ /* pdp forthprocs support thread processing. there is only one stack, which serves as the machine state and input/output storage. when processing in thread, the input/output event queue is used, when processing directly pd events are inserted into the stack directly and the queues are bypassed */ #include #include "pdp_pd.h" #include "pdp_comm.h" #include "pdp_packet.h" #include "pdp_mem.h" #include "pdp_forth.h" #include "pdp_pdmap.h" #include "pdp_queue.h" #include "pdp_debug.h" /* this object instantiates a forth processor */ typedef struct forthproc_struct { t_object x_obj; t_pdp_list *x_processor; // the processor definition t_pdp_list *x_pdmap; // the pd port mappings t_pdp_list *x_program; // the forth program t_pdp_list *x_stack; // the state stack t_pdp_list *x_stack_template; // a "fake" stack serving as a type template t_pdp_list *x_input_events; // current input event list t_pdp_list *x_queue_input; // queue of input event lists, event = (atom index, word) t_pdp_list *x_queue_output; // queue of output event lists, event = (outlet ptr, word) t_pdp_procqueue *x_q; // process queue pthread_mutex_t x_mut; // queue mutex int x_nb_outlets; // number of pd outlets t_outlet **x_outlet; int x_thread; // use thread processing or not t_symbol *x_protocol; // protocol used (pdp or dpd) /* pdp image legacy */ int x_chanmask; /* dpd */ int x_dpd_packet; t_outlet *x_dpd_outlet; } t_forthproc; static inline void lock(t_forthproc *x){pthread_mutex_lock(&x->x_mut);} static inline void unlock(t_forthproc *x){pthread_mutex_unlock(&x->x_mut);} /* send an atom to an outlet */ static void send_pdp_atom_to_outlet(t_outlet *out, t_pdp_atom *a) { outlet_pdp_atom(out, a); } /* output stack contents to outlet or output event list */ static void output_from_stack(t_forthproc *x) { t_pdp_list *outsym = pdp_forth_pdmap_outlist(x->x_pdmap); /* create an event list if we're in thread mode */ t_pdp_list *eventlist = x->x_thread ? pdp_list_new(0) : 0; static void _do_outlet(int index, t_pdp_atom *pname, t_pdp_list *eventlist){ t_outlet *out = x->x_outlet[index]; t_pdp_atom *a = pdp_forth_processor_stackatom_from_outputname( x->x_processor, x->x_stack, pname->w.w_symbol); PDP_ASSERT(a); /* bang in reverse order by using head recursion */ if (pname->next) _do_outlet(index+1, pname->next, eventlist); /* send the atom to the outlet if no event list */ if (!eventlist){ send_pdp_atom_to_outlet(out, a); } /* or add it to the event list */ else { t_pdp_list *ev = pdp_list_new(2); pdp_list_set_0(ev, a_pointer, (t_pdp_word)(void*)out); // store outlet ptr pdp_list_set_1(ev, a->t, a->w); // store atom pdp_list_add_back(eventlist, a_list, (t_pdp_word)ev); // store event in list /* if it was a packet, clear the stacks reference */ if (a->t == a_packet) a->w.w_packet = -1; } } _do_outlet(0, outsym->first, eventlist); /* add eventlist to output event queue */ if (eventlist){ lock(x); pdp_list_add_back(x->x_queue_output, a_list, (t_pdp_word)eventlist); unlock(x); } } /* legacy hack: setup channel mask for image processing */ static void setup_chanmask(t_forthproc *x) { if (x->x_chanmask != -1){ t_pdp_symbol *pname = pdp_forth_pdmap_get_pname(x->x_pdmap, pdp_gensym("pdp")); t_pdp_atom *a = pdp_forth_processor_stackatom_from_inputname(x->x_processor, x->x_stack, pname); int *packet; if (a && a->t == a_packet){ packet = &a->w.w_packet; pdp_packet_replace_with_writable(packet); // make sure it's a private copy pdp_packet_image_set_chanmask(*packet, x->x_chanmask); //post("chanmask set to %d", x->x_chanmask); } } } static void exec_program(t_forthproc *x) { int error; setup_chanmask(x); if (e_ok != (error = pdp_forth_execute_def(x->x_stack, x->x_program))){ post("error %d (%s) executing forth processor", error, pdp_forth_word_error(error)); post("PROGRAM:"); pdp_list_print(x->x_program); post("STACK:"); pdp_list_print(x->x_stack); post(""); /* delete stack and create a new one */ pdp_tree_strip_packets(x->x_stack); pdp_tree_free(x->x_stack); x->x_stack = pdp_forth_processor_setup_stack(x->x_processor); } } static void thread_exec(t_forthproc *x) { t_pdp_list *event_list; t_pdp_atom *a; /* get input event list from input event queue */ PDP_ASSERT(x->x_queue_input->elements); lock(x); event_list = pdp_list_pop(x->x_queue_input).w_list; unlock(x); /* add input events to state stack */ for (a=event_list->first; a; a=a->next){ int index = a->w.w_list->first->w.w_int; t_pdp_atom *src = a->w.w_list->first->next; t_pdp_atom *dest = x->x_stack->first; while (index--) dest = dest->next; PDP_ASSERT(dest->t == src->t); switch(src->t){ /* copy pure atoms */ case a_float: case a_int: case a_symbol: dest->w = src->w; break; /* move reference atoms */ case a_packet: pdp_packet_mark_unused(dest->w.w_packet); dest->w = src->w; src->w.w_packet = -1; break; /* ignored */ case a_pointer: case a_list: default: break; } } /* free event list */ pdp_tree_free(event_list); /* run the process */ exec_program(x); /* send output events to output event queue */ output_from_stack(x); } static void thread_output(t_forthproc *x) { t_pdp_list *event_list; t_pdp_atom *a; /* get output event list from output event queue */ PDP_ASSERT(x->x_queue_output->elements); lock(x); event_list = pdp_list_pop(x->x_queue_output).w_list; unlock(x); /* send */ for (a=event_list->first; a; a=a->next){ t_outlet *outlet = a->w.w_list->first->w.w_pointer; t_pdp_atom *outatom = a->w.w_list->first->next; PDP_ASSERT(outlet); PDP_ASSERT(outatom); send_pdp_atom_to_outlet(outlet, outatom); } /* free event list */ pdp_tree_strip_packets(event_list); pdp_tree_free(event_list); } /* handle dpd packet passing */ static void dpd_output(t_forthproc *x) { //post("checking dpd"); //post("protocol: %s, outlet: %08x, packet: %d", x->x_protocol->s_name, x->x_dpd_outlet, x->x_dpd_packet); if ((x->x_protocol != S_DPD) || (!x->x_dpd_outlet) || (x->x_dpd_packet == -1)) return; /* send the dpd packet to the special (first) outlet */ //post("sending dpd"); outlet_dpd(x->x_dpd_outlet, x->x_dpd_packet); x->x_dpd_packet = -1; } /* this method is called after an active event is received */ static void run(t_forthproc *x) { /* NO THREAD: no brainer: execute and output in one go */ if (!x->x_thread){ /* run the word */ exec_program(x); /* output stuff */ output_from_stack(x); dpd_output(x); } /* THREAD: start queueing operations this is a bit harder since we need to make a copy of the machine state (stack) before we send off a command to process it in the queue this case is handled separately, because processing without thread is obviously more efficient regarding memory usage and locality of reference. */ else { t_pdp_list *newstack; /* compared to the previous approach, no 'automatic' dropping is applied for forth processors. the input and output queues are of indefinite length. dropping is the responsability of the sender or a special object that uses the processing queue to synchronize (see 3dp_windowcontext) this allows for pipelining. a drawback is that feedback is a problem with this approach, but it already was with the previous one. the only exception to the dropping rule is when the procqueue is full */ /* make sure process queue is not full */ if (pdp_procqueue_full(x->x_q)){ post("forthproc: WARNING: procqueue is full. dropping input events."); /* clear the input event list */ pdp_tree_strip_packets(x->x_input_events); pdp_tree_free(x->x_input_events); x->x_input_events = pdp_list_new(0); /* exit */ return; } /* add collected input events to input event queue, and create a new empty input event list */ lock(x); pdp_list_add_back(x->x_queue_input, a_list, (t_pdp_word)x->x_input_events); x->x_input_events = pdp_list_new(0); unlock(x); /* queue the process method & callback */ pdp_procqueue_add(x->x_q, x, thread_exec, thread_output, 0); /* how to handle dpd packets? they both have direct and indirect output let's try this: the dpd input is always passed on directly. the other outputs are just like pdp outputs */ dpd_output(x); } } static int handle_special_message(t_forthproc *x, t_symbol *s, int argc, t_atom *argv) { /* handle the chanmask message. this is a legacy thingy */ if (s == S_CHANMASK){ if ((argc == 1) && (argv->a_type == A_FLOAT)){ x->x_chanmask = (int)argv->a_w.w_float; return 1; } } return 0; } /* pd message handler: receives a pd message and stores an event in the input queue or directly on the stack */ static void handle_pd_message(t_forthproc *x, t_symbol *s, int argc, t_atom *argv) { int active = 0; int index; int i; t_pdp_atom ta; t_pdp_symbol *message_id; t_pdp_symbol *pname; /* get the param name from the received symbol */ message_id = pdp_gensym(s->s_name); pname = pdp_forth_pdmap_get_pname(x->x_pdmap, message_id); /* if the parameter name is null, it should be interpreted as a active */ if (pname == PDP_SYM_NULL){ run(x); return; } /* get the stack atom index */ index = pdp_forth_processor_map_inputname_to_stackindex(x->x_processor, pname); t_pdp_atom *stack_atom; if (index < 0){ /* message is not in the pdmap: check any special messages */ if (!handle_special_message(x, s, argc, argv)) post("got invalid msg %s", s->s_name); return; } /* get stack atom if thread processing is on, get an atom from the template stack because the real stack is in an indeterminate state */ i=index; stack_atom = x->x_thread ? x->x_stack_template->first : x->x_stack->first; while (i--) stack_atom = stack_atom->next; /* store the type */ ta.t = stack_atom->t; /* check if it is an active inlet only floats, symbols, bangs, pdp and dpd messages can be active, the others will be translated to different selectors */ if ((&s_float == s) ||(&s_bang == s) ||(&s_symbol == s) ||(S_PDP == s) ||(S_DPD == s)) active = 1; /* interprete the anything message according to expected type (ta.t) and put the result in the input event queue w */ switch(ta.t){ case a_float: if ((argc != 1) || argv[0].a_type != A_FLOAT) post("bad float msg"); else ta.w.w_float = argv[0].a_w.w_float; break; case a_int: if ((argc != 1) || argv[0].a_type != A_FLOAT) post("bad float msg"); else ta.w.w_int = (int)argv[0].a_w.w_float; break; case a_symbol: if ((argc != 1) || argv[0].a_type != A_SYMBOL) post("bad symbol msg"); else ta.w.w_symbol = pdp_gensym(argv[0].a_w.w_symbol->s_name); case a_list: post("a_list: not supported yet"); break; case a_packet: if ((argc != 2) || argv[0].a_type != A_SYMBOL || argv[1].a_type != A_FLOAT) post ("bad pdp msg"); else{ t_symbol *command = argv[0].a_w.w_symbol; int packet = (int)argv[1].a_w.w_float; /* PDP */ /* register the pd packet readonly by default: stack owns a ro copy. the forth words should convert a packet to rw if they need to (we can't tell here) */ if (command == S_REGISTER_RO){ ta.w.w_packet = pdp_packet_copy_ro(packet); } /* DPD: does not work when multiple context outlets are involved */ /* register readonly just like pdp packets. but for dpd context processors it's understood that the packet can be modified. and store the reference to pass it along, if it's an active dpd packet */ else if (command == S_ACCUMULATE){ ta.w.w_packet = pdp_packet_copy_ro(packet); if (s == S_DPD){ // only store main (left) inlet's dpd messages x->x_dpd_packet = ta.w.w_packet; } } else { /* if it's not a register_ro (pdp) phase, or an accumulate (dpd) phase, we're not going to do anything with the paket */ ta.w.w_packet = -1; } /* only the pdp process message or dpd accumulate message can be active */ if ((command != S_PROCESS) && (command != S_ACCUMULATE)) active = 0; } break; default: post("unknown"); return; } /* check if we need to store the atom into the processor stack directly or should put it in the input event queue */ if (!x->x_thread){ /* handle packets */ if (ta.t == a_packet){ /* only copy if valid (if it was valid and it's a register_ro phase */ if (ta.w.w_packet != -1){ pdp_packet_mark_unused(stack_atom->w.w_packet); stack_atom->w = ta.w; } } /* handle other atoms: store directly */ else{ stack_atom->w = ta.w; } } else { /* don't store invalid packets */ if (!(ta.t == a_packet && ta.w.w_packet == -1)){ /* store atom + location in event list */ t_pdp_list *ev = pdp_list_new(2); pdp_list_set_0(ev, a_int, (t_pdp_word)index); // store atom location pdp_list_set_1(ev, ta.t, ta.w); // store atom pdp_list_add_back(x->x_input_events, a_list, (t_pdp_word)ev); // store event in list } } /* run the processor if it was an active input */ if (active) run(x); } /* OTHER METHODS */ /* these serve as a replacement for the pdp_base class the most prominent messages are: - debug - chanmask - thread on/off/default on a per object basis */ /* MEM & INIT */ static void forthproc_free(t_forthproc *x) { /* wait for thread processing to finish */ pdp_procqueue_flush(x->x_q); /* free state stack + template stack */ pdp_tree_strip_packets(x->x_stack); pdp_tree_free(x->x_stack); pdp_tree_free(x->x_stack_template); /* free input event list */ pdp_tree_strip_packets(x->x_input_events); pdp_tree_free(x->x_input_events); /* free input/output event queues */ pdp_tree_strip_packets(x->x_queue_input); pdp_tree_strip_packets(x->x_queue_output); pdp_tree_free(x->x_queue_input); pdp_tree_free(x->x_queue_output); /* delete outlet pointer array */ if (x->x_outlet) pdp_dealloc(x->x_outlet); } t_class *forthproc_class; static void *forthproc_new(t_symbol *protocol, int argc, t_atom *argv) { t_forthproc *x; t_pdp_atom *a; t_symbol *procname; int i; /* get processor name */ if ((argc < 1) || (argv[0].a_type != A_SYMBOL)) return 0; procname = argv[0].a_w.w_symbol; argc--; argv++; /* allocate */ x = (t_forthproc *)pd_new(forthproc_class); /* init */ x->x_stack = 0; x->x_stack_template = 0; x->x_outlet = 0; x->x_pdmap = 0; x->x_processor = 0; x->x_thread = 1; x->x_program = 0; x->x_chanmask = -1; x->x_dpd_outlet = 0; x->x_dpd_packet = -1; /* get the protocol */ x->x_protocol = protocol; //post("forthproc using protocol: %s", protocol->s_name); /* input event list */ x->x_input_events = pdp_list_new(0); /* input/output event queues */ x->x_queue_input = pdp_list_new(0); x->x_queue_output = pdp_list_new(0); /* default queue is pdp queue */ x->x_q = pdp_queue_get_queue(); /* get the pd mapper */ x->x_pdmap = pdp_forth_pdmap_getbyname(pdp_gensym(procname->s_name)); if (!x->x_pdmap) goto error; /* get the processor */ x->x_processor = pdp_forth_pdmap_get_processor(x->x_pdmap); if (!x->x_processor) goto error; /* get the program */ x->x_program = x->x_processor->first->next->next->next->w.w_list; /* create state stack template and remove packets so they don't consume resources */ x->x_stack_template = pdp_forth_processor_setup_stack(x->x_processor); pdp_tree_strip_packets(x->x_stack_template); /* create the state stack */ x->x_stack = pdp_forth_processor_setup_stack(x->x_processor); /* create additional inlets from description */ for (a = pdp_forth_pdmap_inlist(x->x_pdmap)->first; a; a=a->next){ t_pdp_symbol *message_id = a->w.w_symbol; t_symbol *dsym = gensym(message_id->s_name); t_symbol *ssym = 0; t_pdp_symbol *pname = pdp_forth_pdmap_get_pname(x->x_pdmap, message_id); t_pdp_atom *sa = pdp_forth_processor_stackatom_from_inputname(x->x_processor, x->x_stack, pname); if (sa) { switch(sa->t){ case a_float: case a_int: ssym = &s_float; break; case a_packet: ssym = S_PDP; break; default: post("unsupported type on stack"); break; } } /* add inlet */ if (ssym){ //post("adding %s inlet %s (forth processor param %s)", // ssym->s_name, dsym->s_name, pname->s_name); inlet_new(&x->x_obj, &x->x_obj.ob_pd, ssym, dsym); } else { post("error adding inlet"); } } /* create outlets */ if (x->x_protocol == S_DPD){ x->x_dpd_outlet = outlet_new(&x->x_obj, &s_anything); } x->x_nb_outlets = pdp_forth_pdmap_outlist(x->x_pdmap)->elements; if (x->x_nb_outlets){ x->x_outlet = pdp_alloc(x->x_nb_outlets * sizeof(*x->x_outlet)); for (i=0; ix_nb_outlets; i++){ x->x_outlet[i] = outlet_new(&x->x_obj, &s_anything); } } /* interpret arguments */ //pdp_list_print(pdp_forth_pdmap_arglist(x->x_pdmap)); for(a = pdp_forth_pdmap_arglist(x->x_pdmap)->first; a && argc; a=a->next, argv++, argc--){ t_pdp_atom *sa = pdp_forth_processor_stackatom_from_inputname (x->x_processor, x->x_stack, a->w.w_symbol); if (!sa) { post("parameter %s not found", a->w.w_symbol->s_name); continue; } //post("loading parameter %s", a->w.w_symbol->s_name); /* handle symbols */ if((sa->t == a_symbol) && (argv->a_type == A_SYMBOL)) sa->w.w_symbol = pdp_gensym(argv->a_w.w_symbol->s_name); /* handle floats */ else if (argv->a_type == A_FLOAT){ switch(sa->t){ case a_float: sa->w.w_float = argv->a_w.w_float; break; case a_int: sa->w.w_int = (int)argv->a_w.w_float; break; default: break; } } } /* finished */ return (void *)x; error: post ("error creating forth processor %s", procname->s_name); forthproc_free(x); return 0; } #ifdef __cplusplus extern "C" { #endif void pdp_forthproc_setup(void) { int i; /* create a standard pd class */ forthproc_class = class_new(gensym("pdp"), (t_newmethod)forthproc_new, (t_method)forthproc_free, sizeof(t_forthproc), 0, A_GIMME, A_NULL); class_addcreator((t_newmethod)forthproc_new, gensym("dpd"), A_GIMME, A_NULL); /* add global message handler */ class_addanything(forthproc_class, (t_method)handle_pd_message); } #ifdef __cplusplus } #endif