From d1297bad7d860014ff6a4aa3ce66002125c7d0ac Mon Sep 17 00:00:00 2001 From: Thomas Grill Date: Mon, 12 Dec 2005 00:18:21 +0000 Subject: new flext::CopyAtoms function fixed dangerous spot (also memory leak) with message queuing flext::Forward has more incarnations now fixed and cleaned up library related stuff, especially co-existance of Max message and DSP library objects some minor changes after valgrind run more pthreads V2 fixes added message bundles (flext::MsgBundle) save some space saving inlet pointers fix uninitialized pointer update docs svn path=/trunk/; revision=4189 --- externals/grill/flext/source/flqueue.cpp | 349 +++++++++++++++++++++++-------- 1 file changed, 262 insertions(+), 87 deletions(-) (limited to 'externals/grill/flext/source/flqueue.cpp') diff --git a/externals/grill/flext/source/flqueue.cpp b/externals/grill/flext/source/flqueue.cpp index a83aa01e..b0080ba1 100755 --- a/externals/grill/flext/source/flqueue.cpp +++ b/externals/grill/flext/source/flqueue.cpp @@ -38,93 +38,81 @@ static bool qustarted = false; static void Trigger(); -class qmsg: + +class flext::MsgBundle; + +typedef PooledFifo QueueFifo; + +class Queue: public flext, - public Fifo::Cell + public QueueFifo { public: - inline qmsg &Set(flext_base *t,int o,const t_symbol *s,int ac,const t_atom *av) - { - th = t; - out = o; - msg(s,ac,av); - return *this; - } + inline bool Empty() const { return Size() == 0; } - inline qmsg &Set(const t_symbol *r,const t_symbol *s,int ac,const t_atom *av) + inline void Push(MsgBundle *m) { - th = NULL; - recv = r; - msg(s,ac,av); - return *this; - } - - // \note PD sys lock must already be held by caller - inline void Send() const - { - if(th) { - if(out < 0) - // message to self - th->CbMethodHandler(-1-out,msg.Header(),msg.Count(),msg.Atoms()); - else - // message to outlet - th->ToSysAnything(out,msg.Header(),msg.Count(),msg.Atoms()); + if(m) { + Put(m); + Trigger(); } - else - flext::Forward(recv,msg,true); } - -private: - flext_base *th; - union { - int out; - const t_symbol *recv; - }; - AtomAnything msg; }; +static Queue queue; -typedef PooledFifo QueueFifo; +#define STATSIZE 8 -class Queue: +class flext::MsgBundle: public flext, - public QueueFifo + public Fifo::Cell { public: - inline bool Empty() const { return Size() == 0; } + static MsgBundle *New() + { + MsgBundle *m = queue.New(); + m->msg.Init(); + return m; + } + + static void Free(MsgBundle *m) + { + for(Msg *mi = m->msg.nxt; mi; ) { + Msg *mn = mi->nxt; + mi->Free(); + delete mi; + mi = mn; + } + m->msg.Free(); + queue.Free(m); + } - inline void Push(flext_base *t,int o,const t_symbol *s,int ac,const t_atom *av) + inline MsgBundle &Add(flext_base *t,int o,const t_symbol *s,int ac,const t_atom *av) { - qmsg *m = QueueFifo::New(); - FLEXT_ASSERT(m); - m->Set(t,o,s,ac,av); - Put(m); - Trigger(); + Get()->Set(t,o,s,ac,av); + return *this; } - inline void Push(const t_symbol *r,const t_symbol *s,int ac,const t_atom *av) + inline MsgBundle &Add(const t_symbol *r,const t_symbol *s,int ac,const t_atom *av) { - qmsg *m = QueueFifo::New(); - FLEXT_ASSERT(m); - m->Set(r,s,ac,av); - Put(m); - Trigger(); + Get()->Set(r,s,ac,av); + return *this; } - inline void Push(flext_base *th,int o) // bang + inline MsgBundle &Add(flext_base *th,int o) // bang { - Push(th,o,sym_bang,0,NULL); + return Add(th,o,sym_bang,0,NULL); } - inline void Push(flext_base *th,int o,float dt) + inline MsgBundle &Add(flext_base *th,int o,float dt) { t_atom at; SetFloat(at,dt); - Push(th,o,sym_float,1,&at); + return Add(th,o,sym_float,1,&at); } - inline void Push(flext_base *th,int o,int dt) + inline MsgBundle &Add(flext_base *th,int o,int dt) { t_atom at; SetInt(at,dt); @@ -136,17 +124,17 @@ public: #else #error Not implemented! #endif - Push(th,o,sym,1,&at); + return Add(th,o,sym,1,&at); } - inline void Push(flext_base *th,int o,const t_symbol *dt) + inline MsgBundle &Add(flext_base *th,int o,const t_symbol *dt) { t_atom at; SetSymbol(at,dt); - Push(th,o,sym_symbol,1,&at); + return Add(th,o,sym_symbol,1,&at); } - void Push(flext_base *th,int o,const t_atom &a) + inline MsgBundle &Add(flext_base *th,int o,const t_atom &a) { const t_symbol *sym; if(IsSymbol(a)) @@ -163,18 +151,119 @@ public: #endif else { error("atom type not supported"); - return; + return *this; } - Push(th,o,sym,1,&a); + return Add(th,o,sym,1,&a); } - inline void Push(flext_base *th,int o,int argc,const t_atom *argv) + inline MsgBundle &Add(flext_base *th,int o,int argc,const t_atom *argv) { - Push(th,o,sym_list,argc,argv); + return Add(th,o,sym_list,argc,argv); } -}; -static Queue queue; + // \note PD sys lock must already be held by caller + inline void Send() const + { + FLEXT_ASSERT(msg.Ok()); + + const Msg *m = &msg; + do { + m->Send(); + m = m->nxt; + } while(m); + } + +private: + + class Msg { + public: + inline bool Ok() const { return th || recv; } + + void Init() + { + th = NULL; + recv = NULL; + nxt = NULL; + argc = 0; + } + + void Free() + { + if(argc > STATSIZE) { + FLEXT_ASSERT(argv); + delete[] argv; + } + } + + void Set(flext_base *t,int o,const t_symbol *s,int ac,const t_atom *av) + { + FLEXT_ASSERT(t); + th = t; + out = o; + SetMsg(s,ac,av); +} + + void Set(const t_symbol *r,const t_symbol *s,int ac,const t_atom *av) + { + FLEXT_ASSERT(r); + th = NULL; + recv = r; + SetMsg(s,ac,av); + } + + void Send() const + { + if(th) { + if(out < 0) + // message to self + th->CbMethodHandler(-1-out,sym,argc,argc > STATSIZE?argv:argl); + else + // message to outlet + th->ToSysAnything(out,sym,argc,argc > STATSIZE?argv:argl); + } + else + flext::SysForward(recv,sym,argc,argc > STATSIZE?argv:argl); + } + + Msg *nxt; + + protected: + flext_base *th; + union { + int out; + const t_symbol *recv; + }; + const t_symbol *sym; + int argc; + union { + t_atom *argv; + t_atom argl[STATSIZE]; + }; + + void SetMsg(const t_symbol *s,int cnt,const t_atom *lst) + { + sym = s; + argc = cnt; + if(cnt > STATSIZE) { + argv = new t_atom[cnt]; + flext::CopyAtoms(cnt,argv,lst); + } + else + flext::CopyAtoms(cnt,argl,lst); + } + + } msg; + + Msg *Get() + { + Msg *m = &msg; + if(m->Ok()) { + for(; m->nxt; m = m->nxt) {} + m = m->nxt = new Msg; + } + return m; + } +}; #if FLEXT_QMODE == 2 @@ -201,10 +290,10 @@ static void QWork(bool syslock) if(syslock) flext::Lock(); #endif - qmsg *q; + flext::MsgBundle *q; while((q = queue.Get()) != NULL) { q->Send(); - queue.Free(q); + flext::MsgBundle::Free(q); } #if FLEXT_QMODE == 2 @@ -329,58 +418,144 @@ void flext_base::StartQueue() #endif } + +flext::MsgBundle *flext::MsgNew() +{ + return MsgBundle::New(); +} + +void flext::MsgFree(MsgBundle *m) +{ + MsgBundle::Free(m); +} + +void flext::ToSysMsg(MsgBundle *m) +{ + m->Send(); + queue.Free(m); +} + +void flext::ToQueueMsg(MsgBundle *m) +{ + queue.Push(m); +} + + + void flext_base::ToQueueBang(int o) const { - queue.Push(const_cast(this),o); + MsgBundle *m = MsgBundle::New(); + m->Add(const_cast(this),o); + queue.Push(m); } void flext_base::ToQueueFloat(int o,float f) const { - queue.Push(const_cast(this),o,f); + MsgBundle *m = MsgBundle::New(); + m->Add(const_cast(this),o,f); + queue.Push(m); } void flext_base::ToQueueInt(int o,int f) const { - queue.Push(const_cast(this),o,f); + MsgBundle *m = MsgBundle::New(); + m->Add(const_cast(this),o,f); + queue.Push(m); } void flext_base::ToQueueSymbol(int o,const t_symbol *s) const { - queue.Push(const_cast(this),o,s); + MsgBundle *m = MsgBundle::New(); + m->Add(const_cast(this),o,s); + queue.Push(m); } void flext_base::ToQueueAtom(int o,const t_atom &at) const { - queue.Push(const_cast(this),o,at); + MsgBundle *m = MsgBundle::New(); + m->Add(const_cast(this),o,at); + queue.Push(m); } void flext_base::ToQueueList(int o,int argc,const t_atom *argv) const { - queue.Push(const_cast(this),o,argc,argv); + MsgBundle *m = MsgBundle::New(); + m->Add(const_cast(this),o,argc,argv); + queue.Push(m); } void flext_base::ToQueueAnything(int o,const t_symbol *s,int argc,const t_atom *argv) const { - queue.Push(const_cast(this),o,s,argc,argv); + MsgBundle *m = MsgBundle::New(); + m->Add(const_cast(this),o,s,argc,argv); + queue.Push(m); +} + + +void flext_base::MsgAddBang(MsgBundle *m,int n) const +{ + m->Add(const_cast(this),n); +} + +void flext_base::MsgAddFloat(MsgBundle *m,int n,float f) const +{ + m->Add(const_cast(this),n,f); +} + +void flext_base::MsgAddInt(MsgBundle *m,int n,int f) const +{ + m->Add(const_cast(this),n,f); +} + +void flext_base::MsgAddSymbol(MsgBundle *m,int n,const t_symbol *s) const +{ + m->Add(const_cast(this),n,s); } +void flext_base::MsgAddAtom(MsgBundle *m,int n,const t_atom &at) const +{ + m->Add(const_cast(this),n,at); +} -bool flext::Forward(const t_symbol *recv,const t_symbol *s,int argc,const t_atom *argv,bool direct) +void flext_base::MsgAddList(MsgBundle *m,int n,int argc,const t_atom *argv) const { - if(direct || IsSystemThread()) { - void *cl = recv->s_thing; - if(!cl) return false; - + m->Add(const_cast(this),n,argc,argv); +} + +void flext_base::MsgAddAnything(MsgBundle *m,int n,const t_symbol *s,int argc,const t_atom *argv) const +{ + m->Add(const_cast(this),n,s,argc,argv); +} + + + + +bool flext::SysForward(const t_symbol *recv,const t_symbol *s,int argc,const t_atom *argv) +{ + void *cl = recv->s_thing; + if(!cl) return false; + #if FLEXT_SYS == FLEXT_SYS_PD - pd_typedmess((t_class **)cl,(t_symbol *)s,argc,(t_atom *)argv); + pd_typedmess((t_class **)cl,(t_symbol *)s,argc,(t_atom *)argv); #elif FLEXT_SYS == FLEXT_SYS_MAX - typedmess(recv->s_thing,(t_symbol *)s,argc,(t_atom *)argv); + typedmess(recv->s_thing,(t_symbol *)s,argc,(t_atom *)argv); #else #error Not implemented #endif - } - else - // send over queue - queue.Push(recv,s,argc,argv); + return true; +} + +bool flext::QueueForward(const t_symbol *recv,const t_symbol *s,int argc,const t_atom *argv) +{ + MsgBundle *m = MsgBundle::New(); + m->Add(recv,s,argc,argv); + // send over queue + queue.Push(m); + return true; +} + +bool flext::MsgForward(MsgBundle *m,const t_symbol *recv,const t_symbol *s,int argc,const t_atom *argv) +{ + m->Add(recv,s,argc,argv); return true; } -- cgit v1.2.1