/* flext - C++ layer for Max/MSP and pd (pure data) externals Copyright (c) 2001-2005 Thomas Grill (gr@grrrr.org) For information on usage and redistribution, and for a DISCLAIMER OF ALL WARRANTIES, see the file, "license.txt," in this distribution. */ /*! \file flqueue.cpp \brief Implementation of the flext message queuing functionality. \todo Let's see if queuing can be implemented for Max/MSP with defer_low if FLEXT_PDLOCK is defined, the new PD thread lock functions are used */ #include "flext.h" #include "flinternal.h" #include "flcontainers.h" #include <string.h> // for memcpy #ifdef FLEXT_THREADS //! Thread id of message queue thread flext::thrid_t flext::thrmsgid = 0; #endif #ifdef FLEXT_SHARED /* For the shared version it _should_ be possible to have only one queue for all externals. Yet I don't know how to do this cross-platform */ #define PERMANENTIDLE #endif static void Trigger(); class qmsg: public flext, public Fifo::Cell { public: inline qmsg &Set(flext_base *t,int o,const t_symbol *s,int ac,const t_atom *av) { th = t; out = o; msg(s,ac,av); return *this; } inline qmsg &Set(const t_symbol *r,const t_symbol *s,int ac,const t_atom *av) { th = NULL; recv = r; msg(s,ac,av); return *this; } // \note PD sys lock must already be held by caller inline void Send() const { if(th) { if(out < 0) // message to self th->CbMethodHandler(-1-out,msg.Header(),msg.Count(),msg.Atoms()); else // message to outlet th->ToSysAnything(out,msg.Header(),msg.Count(),msg.Atoms()); } else flext::Forward(recv,msg,true); } private: flext_base *th; union { int out; const t_symbol *recv; }; AtomAnything msg; }; typedef PooledFifo<qmsg> QueueFifo; class Queue: public flext, public QueueFifo { public: inline bool Empty() const { return Size() == 0; } inline void Push(flext_base *t,int o,const t_symbol *s,int ac,const t_atom *av) { qmsg *m = QueueFifo::New(); FLEXT_ASSERT(m); m->Set(t,o,s,ac,av); Put(m); Trigger(); } inline void Push(const t_symbol *r,const t_symbol *s,int ac,const t_atom *av) { qmsg *m = QueueFifo::New(); FLEXT_ASSERT(m); m->Set(r,s,ac,av); Put(m); Trigger(); } inline void Push(flext_base *th,int o) // bang { Push(th,o,sym_bang,0,NULL); } inline void Push(flext_base *th,int o,float dt) { t_atom at; SetFloat(at,dt); Push(th,o,sym_float,1,&at); } inline void Push(flext_base *th,int o,int dt) { t_atom at; SetInt(at,dt); const t_symbol *sym; #if FLEXT_SYS == FLEXT_SYS_PD sym = sym_float; #elif FLEXT_SYS == FLEXT_SYS_MAX sym = sym_int; #else #error Not implemented! #endif Push(th,o,sym,1,&at); } inline void Push(flext_base *th,int o,const t_symbol *dt) { t_atom at; SetSymbol(at,dt); Push(th,o,sym_symbol,1,&at); } void Push(flext_base *th,int o,const t_atom &a) { const t_symbol *sym; if(IsSymbol(a)) sym = sym_symbol; else if(IsFloat(a)) sym = sym_float; #if FLEXT_SYS == FLEXT_SYS_MAX else if(IsInt(a)) sym = sym_int; #endif #if FLEXT_SYS == FLEXT_SYS_PD else if(IsPointer(a)) sym = sym_pointer; #endif else { error("atom type not supported"); return; } Push(th,o,sym,1,&a); } inline void Push(flext_base *th,int o,int argc,const t_atom *argv) { Push(th,o,sym_list,argc,argv); } }; static Queue queue; #if FLEXT_QMODE == 2 static flext::ThrCond qthrcond; #elif FLEXT_QMODE == 0 //static t_qelem *qclk = NULL; static t_clock *qclk = NULL; #endif #define CHUNK 10 static void QWork(bool syslock) { for(;;) { // Since qcnt can only be increased from any other function than QWork // qc will be a minimum guaranteed number of present queue elements. // On the other hand, if new queue elements are added by the methods called // in the loop, these will be sent in the next tick to avoid recursion overflow. size_t qc = queue.Size(); if(!qc) break; #if FLEXT_QMODE == 2 if(syslock) flext::Lock(); #endif qmsg *q; while((q = queue.Get()) != NULL) { q->Send(); queue.Free(q); } #if FLEXT_QMODE == 2 if(syslock) flext::Unlock(); #endif } } #if FLEXT_QMODE == 0 #if FLEXT_SYS == FLEXT_SYS_JMAX static void QTick(fts_object_t *c,int winlet, fts_symbol_t s, int ac, const fts_atom_t *at) { #else static void QTick(flext_base *c) { #endif #ifdef FLEXT_THREADS FLEXT_ASSERT(flext::IsSystemThread()); #endif QWork(false); } #elif FLEXT_QMODE == 1 #ifndef PERMANENTIDLE static bool qtickactive = false; #endif static t_int QTick(t_int *) { #ifndef PERMANENTIDLE qtickactive = false; #endif QWork(false); #ifdef PERMANENTIDLE // will be run in the next idle cycle return 2; #else // won't be run again // for non-shared externals assume that there's rarely a message waiting // so it's better to delete the callback meanwhile return 0; #endif } #endif /* It would be sufficient to only flush messages belonging to object th But then the order of sent messages is not as intended */ void flext_base::QFlush(flext_base *th) { #ifdef FLEXT_THREADS if(!IsSystemThread()) { error("flext - Queue flush called by wrong thread!"); return; } #endif while(!queue.Empty()) QWork(false); } static void Trigger() { #if FLEXT_SYS == FLEXT_SYS_PD #if FLEXT_QMODE == 2 // wake up worker thread qthrcond.Signal(); #elif FLEXT_QMODE == 1 && !defined(PERMANENTIDLE) if(!qtickactive) { sys_callback(QTick,NULL,0); qtickactive = true; } #elif FLEXT_QMODE == 0 clock_delay(qclk,0); #endif #elif FLEXT_SYS == FLEXT_SYS_MAX #if FLEXT_QMODE == 0 // qelem_front(qclk); clock_delay(qclk,0); #endif #elif FLEXT_SYS == FLEXT_SYS_JMAX #if FLEXT_QMODE == 0 // this is dangerous because there may be other timers on this object! fts_timebase_add_call(fts_get_timebase(), (fts_object_t *)thisHdr(), QTick, NULL, 0); #endif #else #error Not implemented #endif } #if FLEXT_QMODE == 2 void flext_base::QWorker(thr_params *) { thrmsgid = GetThreadId(); for(;;) { qthrcond.Wait(); QWork(true); } } #endif void flext_base::StartQueue() { static bool started = false; if(started) return; else started = true; #if FLEXT_QMODE == 1 #ifdef PERMANENTIDLE sys_callback(QTick,NULL,0); #endif #elif FLEXT_QMODE == 2 LaunchThread(QWorker,NULL); #elif FLEXT_QMODE == 0 && (FLEXT_SYS == FLEXT_SYS_PD || FLEXT_SYS == FLEXT_SYS_MAX) // qclk = (t_qelem *)(qelem_new(NULL,(t_method)QTick)); qclk = (t_clock *)(clock_new(NULL,(t_method)QTick)); #else #error Not implemented! #endif } void flext_base::ToQueueBang(int o) const { queue.Push(const_cast<flext_base *>(this),o); } void flext_base::ToQueueFloat(int o,float f) const { queue.Push(const_cast<flext_base *>(this),o,f); } void flext_base::ToQueueInt(int o,int f) const { queue.Push(const_cast<flext_base *>(this),o,f); } void flext_base::ToQueueSymbol(int o,const t_symbol *s) const { queue.Push(const_cast<flext_base *>(this),o,s); } void flext_base::ToQueueAtom(int o,const t_atom &at) const { queue.Push(const_cast<flext_base *>(this),o,at); } void flext_base::ToQueueList(int o,int argc,const t_atom *argv) const { queue.Push(const_cast<flext_base *>(this),o,argc,argv); } void flext_base::ToQueueAnything(int o,const t_symbol *s,int argc,const t_atom *argv) const { queue.Push(const_cast<flext_base *>(this),o,s,argc,argv); } bool flext::Forward(const t_symbol *recv,const t_symbol *s,int argc,const t_atom *argv,bool direct) { if(direct || IsSystemThread()) { void *cl = recv->s_thing; if(!cl) return false; #if FLEXT_SYS == FLEXT_SYS_PD pd_typedmess((t_class **)cl,(t_symbol *)s,argc,(t_atom *)argv); #elif FLEXT_SYS == FLEXT_SYS_MAX typedmess(recv->s_thing,(t_symbol *)s,argc,(t_atom *)argv); #else #error Not implemented #endif } else // send over queue queue.Push(recv,s,argc,argv); return true; }