From 1fa5251f469a756d09f7f7c98113a69186688206 Mon Sep 17 00:00:00 2001 From: Thomas Grill Date: Sat, 11 Sep 2004 04:09:17 +0000 Subject: "" svn path=/trunk/; revision=2023 --- externals/grill/flext/source/flqueue.cpp | 359 ++++++++++++++----------------- 1 file changed, 161 insertions(+), 198 deletions(-) (limited to 'externals/grill/flext/source/flqueue.cpp') diff --git a/externals/grill/flext/source/flqueue.cpp b/externals/grill/flext/source/flqueue.cpp index 41941734..4165e968 100755 --- a/externals/grill/flext/source/flqueue.cpp +++ b/externals/grill/flext/source/flqueue.cpp @@ -19,55 +19,160 @@ WARRANTIES, see the file, "license.txt," in this distribution. #include "flext.h" #include "flinternal.h" + #ifdef FLEXT_THREADS //! Thread id of message queue thread flext::thrid_t flext::thrmsgid = 0; #endif + +#define QUEUE_LENGTH 256 +#define QUEUE_ATOMS 1024 + class qmsg { public: - qmsg(flext_base *b): nxt(NULL),th(b),tp(tp_none) {} - ~qmsg(); - - qmsg *nxt; - - void Clear(); + void Set(flext_base *t,int o,const t_symbol *s,int ac,const t_atom *av) { th = t,out = o,sym = s,argc = ac,argv = av; } + + // \note PD lock must already be held by caller + void Send() const + { + if(out < 0) + // message to self + th->m_methodmain(-1-out,sym,argc,argv); + else + // message to outlet + th->ToSysAnything(out,sym,argc,argv); + } - void SetBang(int o) { Clear(); out = o; tp = tp_bang; } - void SetFloat(int o,float f) { Clear(); out = o; tp = tp_float; _float = f; } - void SetInt(int o,int i) { Clear(); out = o; tp = tp_int; _int = i; } - void SetSymbol(int o,const t_symbol *s) { Clear(); out = o; tp = tp_sym; _sym = s; } - void SetList(int o,int argc,const t_atom *argv) { Clear(); out = o; tp = tp_list; _list.argc = argc,_list.argv = flext::CopyList(argc,argv); } - void SetAny(int o,const t_symbol *s,int argc,const t_atom *argv) { Clear(); out = o; tp = tp_any; _any.s = s,_any.argc = argc,_any.argv = flext::CopyList(argc,argv); } + int Args() const { return argc; } +private: flext_base *th; int out; - enum { tp_none,tp_bang,tp_float,tp_int,tp_sym,tp_list,tp_any } tp; - union { - float _float; - int _int; - const t_symbol *_sym; - struct { int argc; t_atom *argv; } _list; - struct { const t_symbol *s; int argc; t_atom *argv; } _any; - }; + const t_symbol *sym; + int argc; + const t_atom *argv; }; -qmsg::~qmsg() -{ - Clear(); - if(nxt) delete nxt; -} +// _should_ work without locks.... have yet to check if it really does.... +class Queue: + public flext +{ +public: + Queue() + { + qhead = qtail = 0; + ahead = atail = 0; + } -void qmsg::Clear() -{ - if(tp == tp_list) { if(_list.argv) delete[] _list.argv; } - else if(tp == tp_any) { if(_any.argv) delete[] _any.argv; } - tp = tp_none; -} + bool Empty() const { return qhead == qtail; } + + int Count() const + { + int c = qtail-qhead; + return c >= 0?c:c+QUEUE_LENGTH; + } + + const qmsg &Head() { return lst[qhead]; } + + void Pop() + { + PopAtoms(Head().Args()); + qhead = (qhead+1)%QUEUE_LENGTH; + } + + void Push(flext_base *th,int o) // bang + { + Set(th,o,sym_bang,0,NULL); + } + + void Push(flext_base *th,int o,float dt) + { + t_atom *at = GetAtoms(1); + SetFloat(*at,dt); + Set(th,o,sym_float,1,at); + } + + void Push(flext_base *th,int o,int dt) + { + t_atom *at = GetAtoms(1); + SetInt(*at,dt); +#if FLEXT_SYS == FLEXT_SYS_PD + const t_symbol *sym = sym_float; +#elif FLEXT_SYS == FLEXT_SYS_MAX + const t_symbol *sym = sym_int; +#else +#error Not implemented! +#endif + Set(th,o,sym,1,at); + } + + void Push(flext_base *th,int o,const t_symbol *dt) + { + t_atom *at = GetAtoms(1); + SetSymbol(*at,dt); + Set(th,o,sym_symbol,1,at); + } + + void Push(flext_base *th,int o,int argc,const t_atom *argv) + { + t_atom *at = GetAtoms(argc); + memcpy(at,argv,argc*sizeof(t_atom)); + Set(th,o,sym_list,argc,at); + } + + void Push(flext_base *th,int o,const t_symbol *sym,int argc,const t_atom *argv) + { + t_atom *at = GetAtoms(argc); + memcpy(at,argv,argc*sizeof(t_atom)); + Set(th,o,sym,argc,at); + } + +protected: + void Set(flext_base *th,int o,const t_symbol *sym,int argc,const t_atom *argv) + { + FLEXT_ASSERT(Count() < QUEUE_LENGTH-1); + lst[qtail].Set(th,o,sym,argc,argv); + qtail = (qtail+1)%QUEUE_LENGTH; + } + + int CntAtoms() const + { + int c = atail-ahead; + return c >= 0?c:c+QUEUE_ATOMS; + } + + // must return contiguous region + t_atom *GetAtoms(int argc) + { + // \todo check for available space + + if(atail+argc >= QUEUE_ATOMS) { + atail = argc; + return atoms; + } + else { + t_atom *at = atoms+atail; + atail += argc; + return at; + } + } + + void PopAtoms(int argc) + { + const int p = ahead+argc; + ahead = p >= QUEUE_ATOMS?argc:p; + } + + int qhead,qtail; + qmsg lst[QUEUE_LENGTH]; + int ahead,atail; + t_atom atoms[QUEUE_ATOMS]; +}; + +static Queue queue; -static volatile int qcnt = 0; -static qmsg *volatile qhead = NULL,*volatile qtail = NULL; #ifdef FLEXT_QTHR static flext::ThrCond qthrcond; @@ -75,102 +180,33 @@ static flext::ThrCond qthrcond; static t_qelem *qclk = NULL; #endif -#ifdef FLEXT_THREADS -static flext::ThrMutex qmutex; -#endif #define CHUNK 10 -static void QWork(bool qlock,bool syslock) +static void QWork(bool syslock) { for(;;) { // Since qcnt can only be increased from any other function than QWork // qc will be a minimum guaranteed number of present queue elements. // On the other hand, if new queue elements are added by the methods called // in the loop, these will be sent in the next tick to avoid recursion overflow. - int qc = qcnt; + int qc = queue.Count(); if(!qc) break; #ifdef FLEXT_QTHR if(syslock) sys_lock(); #endif - for(int i = 0; i < qc && qhead; ++i) { - #ifdef FLEXT_THREADS - if(qlock) qmutex.Lock(); - #endif - qmsg *m = qhead; - qcnt--; - qhead = m->nxt; - if(!qhead) qtail = NULL; - m->nxt = NULL; - #ifdef FLEXT_THREADS - if(qlock) qmutex.Unlock(); - #endif - - if(m->out < 0) { - // message to self - - const int n = -1-m->out; - t_atom tmp; - - switch(m->tp) { - case qmsg::tp_bang: - m->th->m_methodmain(n,flext::sym_bang,0,&tmp); - break; - case qmsg::tp_float: - flext::SetFloat(tmp,m->_float); - m->th->m_methodmain(n,flext::sym_float,1,&tmp); - break; - case qmsg::tp_int: - flext::SetInt(tmp,m->_int); - #if FLEXT_SYS == FLEXT_SYS_PD - m->th->m_methodmain(n,flext::sym_float,1,&tmp); - #elif FLEXT_SYS == FLEXT_SYS_MAX - m->th->m_methodmain(n,flext::sym_int,1,&tmp); - #else - #error Not implemented! - #endif - case qmsg::tp_sym: - flext::SetSymbol(tmp,m->_sym); - m->th->m_methodmain(n,flext::sym_symbol,1,&tmp); - break; - case qmsg::tp_list: - m->th->m_methodmain(n,flext::sym_list,m->_list.argc,m->_list.argv); - break; - case qmsg::tp_any: - m->th->m_methodmain(n,m->_any.s,m->_any.argc,m->_any.argv); - break; - #ifdef FLEXT_DEBUG - default: ERRINTERNAL(); - #endif - } - } - else { - // message to outlet - - switch(m->tp) { - case qmsg::tp_bang: m->th->ToSysBang(m->out); break; - case qmsg::tp_float: m->th->ToSysFloat(m->out,m->_float); break; - case qmsg::tp_int: m->th->ToSysInt(m->out,m->_int); break; - case qmsg::tp_sym: m->th->ToSysSymbol(m->out,m->_sym); break; - case qmsg::tp_list: m->th->ToSysList(m->out,m->_list.argc,m->_list.argv); break; - case qmsg::tp_any: m->th->ToSysAnything(m->out,m->_any.s,m->_any.argc,m->_any.argv); break; - #ifdef FLEXT_DEBUG - default: ERRINTERNAL(); - #endif - } - } - - // delete processed queue element - delete m; + for(int i = 0; i < qc; ++i) { + queue.Head().Send(); + queue.Pop(); } // inner loop #ifdef FLEXT_QTHR if(syslock) sys_unlock(); #endif - } // for(;;) + } } #if !defined(FLEXT_QTHR) @@ -181,11 +217,10 @@ static void QTick(fts_object_t *c,int winlet, fts_symbol_t s, int ac, const fts_ static void QTick(flext_base *c) { #endif -// post("qtick"); #ifdef FLEXT_THREADS FLEXT_ASSERT(flext::IsSystemThread()); #endif - QWork(true,false); + QWork(false); } #endif @@ -201,30 +236,12 @@ void flext_base::QFlush(flext_base *th) return; } #endif -#ifdef FLEXT_THREADS - qmutex.Lock(); -#endif - while(qcnt) QWork(false,false); -#ifdef FLEXT_THREADS - qmutex.Unlock(); -#endif + + while(!queue.Empty()) QWork(false); } -static void Queue(qmsg *m) +static void Trigger() { -// post("Queue"); - -#ifdef FLEXT_THREADS - qmutex.Lock(); -#endif - if(qtail) qtail->nxt = m; - else qhead = m; - qtail = m; - qcnt++; -#ifdef FLEXT_THREADS - qmutex.Unlock(); -#endif - #if FLEXT_SYS == FLEXT_SYS_PD #ifdef FLEXT_QTHR // wake up a worker thread @@ -249,7 +266,7 @@ void flext_base::QWorker(thr_params *) thrmsgid = GetThreadId(); for(;;) { qthrcond.Wait(); - QWork(true,true); + QWork(true); } } #endif @@ -260,10 +277,6 @@ void flext_base::StartQueue() if(started) return; else started = true; - // message queue ticker - qhead = qtail = NULL; - qcnt = 0; - #ifdef FLEXT_QTHR LaunchThread(QWorker,NULL); #else @@ -277,86 +290,36 @@ void flext_base::StartQueue() void flext_base::ToQueueBang(int o) const { - FLEXT_ASSERT(o >= 0); - qmsg *m = new qmsg(const_cast(this)); - m->SetBang(o); - Queue(m); + queue.Push(const_cast(this),o); + Trigger(); } void flext_base::ToQueueFloat(int o,float f) const { - FLEXT_ASSERT(o >= 0); - qmsg *m = new qmsg(const_cast(this)); - m->SetFloat(o,f); - Queue(m); + queue.Push(const_cast(this),o,f); + Trigger(); } void flext_base::ToQueueInt(int o,int f) const { - FLEXT_ASSERT(o >= 0); - qmsg *m = new qmsg(const_cast(this)); - m->SetInt(o,f); - Queue(m); + queue.Push(const_cast(this),o,f); + Trigger(); } void flext_base::ToQueueSymbol(int o,const t_symbol *s) const { - FLEXT_ASSERT(o >= 0); - qmsg *m = new qmsg(const_cast(this)); - m->SetSymbol(o,s); - Queue(m); + queue.Push(const_cast(this),o,s); + Trigger(); } void flext_base::ToQueueList(int o,int argc,const t_atom *argv) const { - FLEXT_ASSERT(o >= 0); - qmsg *m = new qmsg(const_cast(this)); - m->SetList(o,argc,argv); - Queue(m); + queue.Push(const_cast(this),o,argc,argv); + Trigger(); } void flext_base::ToQueueAnything(int o,const t_symbol *s,int argc,const t_atom *argv) const { - FLEXT_ASSERT(o >= 0); - qmsg *m = new qmsg(const_cast(this)); - m->SetAny(o,s,argc,argv); - Queue(m); -} - - -void flext_base::ToSelfBang(int n) const -{ - FLEXT_ASSERT(n >= 0); - ToQueueBang(-1-n); -} - -void flext_base::ToSelfFloat(int n,float f) const -{ - FLEXT_ASSERT(n >= 0); - ToQueueFloat(-1-n,f); -} - -void flext_base::ToSelfInt(int n,int f) const -{ - FLEXT_ASSERT(n >= 0); - ToQueueInt(-1-n,f); + queue.Push(const_cast(this),o,s,argc,argv); + Trigger(); } - -void flext_base::ToSelfSymbol(int n,const t_symbol *s) const -{ - FLEXT_ASSERT(n >= 0); - ToQueueSymbol(-1-n,s); -} - -void flext_base::ToSelfList(int n,int argc,const t_atom *argv) const -{ - FLEXT_ASSERT(n >= 0); - ToQueueList(-1-n,argc,argv); -} - -void flext_base::ToSelfAnything(int n,const t_symbol *s,int argc,const t_atom *argv) const -{ - FLEXT_ASSERT(n >= 0); - ToQueueAnything(-1-n,s,argc,argv); -} - -- cgit v1.2.1