/* flext - C++ layer for Max/MSP and pd (pure data) externals Copyright (c) 2001-2005 Thomas Grill (gr@grrrr.org) For information on usage and redistribution, and for a DISCLAIMER OF ALL WARRANTIES, see the file, "license.txt," in this distribution. */ /*! \file flqueue.cpp \brief Implementation of the flext message queuing functionality. \todo Let's see if queuing can be implemented for Max/MSP with defer_low if FLEXT_PDLOCK is defined, the new PD thread lock functions are used */ #include "flext.h" #include "flinternal.h" #include "flcontainers.h" #include // for memcpy #ifdef FLEXT_THREADS //! Thread id of message queue thread flext::thrid_t flext::thrmsgid; #endif static bool qustarted = false; #ifdef FLEXT_SHARED /* For the shared version it _should_ be possible to have only one queue for all externals. Yet I don't know how to do this cross-platform */ #define PERMANENTIDLE #endif static void Trigger(); class flext::MsgBundle; typedef PooledFifo QueueFifo; class Queue: public flext, public QueueFifo { public: inline bool Empty() const { return Size() == 0; } inline void Push(MsgBundle *m); // defined after MsgBundle (gcc 3.3. won't take it otherwise...) }; static Queue queue; #define STATSIZE 8 class flext::MsgBundle: public flext, public Fifo::Cell { public: static MsgBundle *New() { MsgBundle *m = queue.New(); m->msg.Init(); return m; } static void Free(MsgBundle *m) { for(Msg *mi = m->msg.nxt; mi; ) { Msg *mn = mi->nxt; mi->Free(); delete mi; mi = mn; } m->msg.Free(); queue.Free(m); } inline MsgBundle &Add(flext_base *t,int o,const t_symbol *s,int ac,const t_atom *av) { Get()->Set(t,o,s,ac,av); return *this; } inline MsgBundle &Add(const t_symbol *r,const t_symbol *s,int ac,const t_atom *av) { Get()->Set(r,s,ac,av); return *this; } inline MsgBundle &Add(flext_base *th,int o) // bang { return Add(th,o,sym_bang,0,NULL); } inline MsgBundle &Add(flext_base *th,int o,float dt) { t_atom at; SetFloat(at,dt); return Add(th,o,sym_float,1,&at); } inline MsgBundle &Add(flext_base *th,int o,int dt) { t_atom at; SetInt(at,dt); const t_symbol *sym; #if FLEXT_SYS == FLEXT_SYS_PD sym = sym_float; #elif FLEXT_SYS == FLEXT_SYS_MAX sym = sym_int; #else #error Not implemented! #endif return Add(th,o,sym,1,&at); } inline MsgBundle &Add(flext_base *th,int o,const t_symbol *dt) { t_atom at; SetSymbol(at,dt); return Add(th,o,sym_symbol,1,&at); } inline MsgBundle &Add(flext_base *th,int o,const t_atom &a) { const t_symbol *sym; if(IsSymbol(a)) sym = sym_symbol; else if(IsFloat(a)) sym = sym_float; #if FLEXT_SYS == FLEXT_SYS_MAX else if(IsInt(a)) sym = sym_int; #endif #if FLEXT_SYS == FLEXT_SYS_PD else if(IsPointer(a)) sym = sym_pointer; #endif else { error("atom type not supported"); return *this; } return Add(th,o,sym,1,&a); } inline MsgBundle &Add(flext_base *th,int o,int argc,const t_atom *argv) { return Add(th,o,sym_list,argc,argv); } // \note PD sys lock must already be held by caller inline void Send() const { if(!msg.Ok()) return; // Empty! const Msg *m = &msg; do { m->Send(); m = m->nxt; } while(m); } private: class Msg { public: inline bool Ok() const { return th || recv; } void Init() { th = NULL; recv = NULL; nxt = NULL; argc = 0; } void Free() { if(argc > STATSIZE) { FLEXT_ASSERT(argv); delete[] argv; } } void Set(flext_base *t,int o,const t_symbol *s,int ac,const t_atom *av) { FLEXT_ASSERT(t); th = t; out = o; SetMsg(s,ac,av); } void Set(const t_symbol *r,const t_symbol *s,int ac,const t_atom *av) { FLEXT_ASSERT(r); th = NULL; recv = r; SetMsg(s,ac,av); } void Send() const { if(th) { if(out < 0) // message to self th->CbMethodHandler(-1-out,sym,argc,argc > STATSIZE?argv:argl); else // message to outlet th->ToSysAnything(out,sym,argc,argc > STATSIZE?argv:argl); } else flext::SysForward(recv,sym,argc,argc > STATSIZE?argv:argl); } Msg *nxt; protected: flext_base *th; union { int out; const t_symbol *recv; }; const t_symbol *sym; int argc; union { t_atom *argv; t_atom argl[STATSIZE]; }; void SetMsg(const t_symbol *s,int cnt,const t_atom *lst) { sym = s; argc = cnt; if(cnt > STATSIZE) { argv = new t_atom[cnt]; flext::CopyAtoms(cnt,argv,lst); } else flext::CopyAtoms(cnt,argl,lst); } } msg; Msg *Get() { Msg *m = &msg; if(m->Ok()) { for(; m->nxt; m = m->nxt) {} m = m->nxt = new Msg; m->Init(); } return m; } }; inline void Queue::Push(MsgBundle *m) { if(m) { Put(m); Trigger(); } } #if FLEXT_QMODE == 2 static flext::ThrCond qthrcond; #elif FLEXT_QMODE == 0 //static t_qelem *qclk = NULL; static t_clock *qclk = NULL; #endif #define CHUNK 10 static void QWork(bool syslock) { for(;;) { // Since qcnt can only be increased from any other function than QWork // qc will be a minimum guaranteed number of present queue elements. // On the other hand, if new queue elements are added by the methods called // in the loop, these will be sent in the next tick to avoid recursion overflow. size_t qc = queue.Size(); if(!qc) break; #if FLEXT_QMODE == 2 if(syslock) flext::Lock(); #endif flext::MsgBundle *q; while((q = queue.Get()) != NULL) { q->Send(); flext::MsgBundle::Free(q); } #if FLEXT_QMODE == 2 if(syslock) flext::Unlock(); #endif } } #if FLEXT_QMODE == 0 #if FLEXT_SYS == FLEXT_SYS_JMAX static void QTick(fts_object_t *c,int winlet, fts_symbol_t s, int ac, const fts_atom_t *at) { #else static void QTick(flext_base *c) { #endif #ifdef FLEXT_THREADS FLEXT_ASSERT(flext::IsSystemThread()); #endif QWork(false); } #elif FLEXT_QMODE == 1 #ifndef PERMANENTIDLE static bool qtickactive = false; #endif static t_int QTick(t_int *) { #ifndef PERMANENTIDLE qtickactive = false; #endif QWork(false); #ifdef PERMANENTIDLE // will be run in the next idle cycle return 2; #else // won't be run again // for non-shared externals assume that there's rarely a message waiting // so it's better to delete the callback meanwhile return 0; #endif } #endif /* It would be sufficient to only flush messages belonging to object th But then the order of sent messages is not as intended */ void flext_base::QFlush(flext_base *th) { #ifdef FLEXT_THREADS if(!IsSystemThread()) { error("flext - Queue flush called by wrong thread!"); return; } #endif while(!queue.Empty()) QWork(false); } static void Trigger() { #if FLEXT_SYS == FLEXT_SYS_PD #if FLEXT_QMODE == 2 // wake up worker thread qthrcond.Signal(); #elif FLEXT_QMODE == 1 && !defined(PERMANENTIDLE) if(!qtickactive) { sys_callback(QTick,NULL,0); qtickactive = true; } #elif FLEXT_QMODE == 0 clock_delay(qclk,0); #endif #elif FLEXT_SYS == FLEXT_SYS_MAX #if FLEXT_QMODE == 0 // qelem_front(qclk); clock_delay(qclk,0); #endif #else #error Not implemented #endif } #if FLEXT_QMODE == 2 void flext_base::QWorker(thr_params *) { thrmsgid = GetThreadId(); qustarted = true; for(;;) { qthrcond.Wait(); QWork(true); } } #endif void flext_base::StartQueue() { if(qustarted) return; #if FLEXT_QMODE == 1 #ifdef PERMANENTIDLE sys_callback(QTick,NULL,0); qustarted = true; #endif #elif FLEXT_QMODE == 2 LaunchThread(QWorker,NULL); // very unelegant... but waiting should be ok, since happens only on loading while(!qustarted) Sleep(0.001); #elif FLEXT_QMODE == 0 && (FLEXT_SYS == FLEXT_SYS_PD || FLEXT_SYS == FLEXT_SYS_MAX) // qclk = (t_qelem *)(qelem_new(NULL,(t_method)QTick)); qclk = (t_clock *)(clock_new(NULL,(t_method)QTick)); qustarted = true; #else #error Not implemented! #endif } flext::MsgBundle *flext::MsgNew() { return MsgBundle::New(); } void flext::MsgFree(MsgBundle *m) { MsgBundle::Free(m); } void flext::ToSysMsg(MsgBundle *m) { m->Send(); queue.Free(m); } void flext::ToQueueMsg(MsgBundle *m) { queue.Push(m); } void flext_base::ToQueueBang(int o) const { MsgBundle *m = MsgBundle::New(); m->Add(const_cast(this),o); queue.Push(m); } void flext_base::ToQueueFloat(int o,float f) const { MsgBundle *m = MsgBundle::New(); m->Add(const_cast(this),o,f); queue.Push(m); } void flext_base::ToQueueInt(int o,int f) const { MsgBundle *m = MsgBundle::New(); m->Add(const_cast(this),o,f); queue.Push(m); } void flext_base::ToQueueSymbol(int o,const t_symbol *s) const { MsgBundle *m = MsgBundle::New(); m->Add(const_cast(this),o,s); queue.Push(m); } void flext_base::ToQueueAtom(int o,const t_atom &at) const { MsgBundle *m = MsgBundle::New(); m->Add(const_cast(this),o,at); queue.Push(m); } void flext_base::ToQueueList(int o,int argc,const t_atom *argv) const { MsgBundle *m = MsgBundle::New(); m->Add(const_cast(this),o,argc,argv); queue.Push(m); } void flext_base::ToQueueAnything(int o,const t_symbol *s,int argc,const t_atom *argv) const { MsgBundle *m = MsgBundle::New(); m->Add(const_cast(this),o,s,argc,argv); queue.Push(m); } void flext_base::MsgAddBang(MsgBundle *m,int n) const { m->Add(const_cast(this),n); } void flext_base::MsgAddFloat(MsgBundle *m,int n,float f) const { m->Add(const_cast(this),n,f); } void flext_base::MsgAddInt(MsgBundle *m,int n,int f) const { m->Add(const_cast(this),n,f); } void flext_base::MsgAddSymbol(MsgBundle *m,int n,const t_symbol *s) const { m->Add(const_cast(this),n,s); } void flext_base::MsgAddAtom(MsgBundle *m,int n,const t_atom &at) const { m->Add(const_cast(this),n,at); } void flext_base::MsgAddList(MsgBundle *m,int n,int argc,const t_atom *argv) const { m->Add(const_cast(this),n,argc,argv); } void flext_base::MsgAddAnything(MsgBundle *m,int n,const t_symbol *s,int argc,const t_atom *argv) const { m->Add(const_cast(this),n,s,argc,argv); } bool flext::SysForward(const t_symbol *recv,const t_symbol *s,int argc,const t_atom *argv) { void *cl = recv->s_thing; if(!cl) return false; #if FLEXT_SYS == FLEXT_SYS_PD pd_typedmess((t_class **)cl,(t_symbol *)s,argc,(t_atom *)argv); #elif FLEXT_SYS == FLEXT_SYS_MAX typedmess(recv->s_thing,(t_symbol *)s,argc,(t_atom *)argv); #else #error Not implemented #endif return true; } bool flext::QueueForward(const t_symbol *recv,const t_symbol *s,int argc,const t_atom *argv) { MsgBundle *m = MsgBundle::New(); m->Add(recv,s,argc,argv); // send over queue queue.Push(m); return true; } bool flext::MsgForward(MsgBundle *m,const t_symbol *recv,const t_symbol *s,int argc,const t_atom *argv) { m->Add(recv,s,argc,argv); return true; }