aboutsummaryrefslogtreecommitdiff
path: root/externals/grill/flext/source/flqueue.cpp
diff options
context:
space:
mode:
authorThomas Grill <xovo@users.sourceforge.net>2004-09-11 04:09:17 +0000
committerThomas Grill <xovo@users.sourceforge.net>2004-09-11 04:09:17 +0000
commit1fa5251f469a756d09f7f7c98113a69186688206 (patch)
tree173f1965bcde6297b4a70ac778a6cd37aa32e452 /externals/grill/flext/source/flqueue.cpp
parentdf4fc5a5bf7bc65b610a497091f47e5ccc600874 (diff)
""
svn path=/trunk/; revision=2023
Diffstat (limited to 'externals/grill/flext/source/flqueue.cpp')
-rwxr-xr-xexternals/grill/flext/source/flqueue.cpp359
1 files changed, 161 insertions, 198 deletions
diff --git a/externals/grill/flext/source/flqueue.cpp b/externals/grill/flext/source/flqueue.cpp
index 41941734..4165e968 100755
--- a/externals/grill/flext/source/flqueue.cpp
+++ b/externals/grill/flext/source/flqueue.cpp
@@ -19,55 +19,160 @@ WARRANTIES, see the file, "license.txt," in this distribution.
#include "flext.h"
#include "flinternal.h"
+
#ifdef FLEXT_THREADS
//! Thread id of message queue thread
flext::thrid_t flext::thrmsgid = 0;
#endif
+
+#define QUEUE_LENGTH 256
+#define QUEUE_ATOMS 1024
+
class qmsg
{
public:
- qmsg(flext_base *b): nxt(NULL),th(b),tp(tp_none) {}
- ~qmsg();
-
- qmsg *nxt;
-
- void Clear();
+ void Set(flext_base *t,int o,const t_symbol *s,int ac,const t_atom *av) { th = t,out = o,sym = s,argc = ac,argv = av; }
+
+ // \note PD lock must already be held by caller
+ void Send() const
+ {
+ if(out < 0)
+ // message to self
+ th->m_methodmain(-1-out,sym,argc,argv);
+ else
+ // message to outlet
+ th->ToSysAnything(out,sym,argc,argv);
+ }
- void SetBang(int o) { Clear(); out = o; tp = tp_bang; }
- void SetFloat(int o,float f) { Clear(); out = o; tp = tp_float; _float = f; }
- void SetInt(int o,int i) { Clear(); out = o; tp = tp_int; _int = i; }
- void SetSymbol(int o,const t_symbol *s) { Clear(); out = o; tp = tp_sym; _sym = s; }
- void SetList(int o,int argc,const t_atom *argv) { Clear(); out = o; tp = tp_list; _list.argc = argc,_list.argv = flext::CopyList(argc,argv); }
- void SetAny(int o,const t_symbol *s,int argc,const t_atom *argv) { Clear(); out = o; tp = tp_any; _any.s = s,_any.argc = argc,_any.argv = flext::CopyList(argc,argv); }
+ int Args() const { return argc; }
+private:
flext_base *th;
int out;
- enum { tp_none,tp_bang,tp_float,tp_int,tp_sym,tp_list,tp_any } tp;
- union {
- float _float;
- int _int;
- const t_symbol *_sym;
- struct { int argc; t_atom *argv; } _list;
- struct { const t_symbol *s; int argc; t_atom *argv; } _any;
- };
+ const t_symbol *sym;
+ int argc;
+ const t_atom *argv;
};
-qmsg::~qmsg()
-{
- Clear();
- if(nxt) delete nxt;
-}
+// _should_ work without locks.... have yet to check if it really does....
+class Queue:
+ public flext
+{
+public:
+ Queue()
+ {
+ qhead = qtail = 0;
+ ahead = atail = 0;
+ }
-void qmsg::Clear()
-{
- if(tp == tp_list) { if(_list.argv) delete[] _list.argv; }
- else if(tp == tp_any) { if(_any.argv) delete[] _any.argv; }
- tp = tp_none;
-}
+ bool Empty() const { return qhead == qtail; }
+
+ int Count() const
+ {
+ int c = qtail-qhead;
+ return c >= 0?c:c+QUEUE_LENGTH;
+ }
+
+ const qmsg &Head() { return lst[qhead]; }
+
+ void Pop()
+ {
+ PopAtoms(Head().Args());
+ qhead = (qhead+1)%QUEUE_LENGTH;
+ }
+
+ void Push(flext_base *th,int o) // bang
+ {
+ Set(th,o,sym_bang,0,NULL);
+ }
+
+ void Push(flext_base *th,int o,float dt)
+ {
+ t_atom *at = GetAtoms(1);
+ SetFloat(*at,dt);
+ Set(th,o,sym_float,1,at);
+ }
+
+ void Push(flext_base *th,int o,int dt)
+ {
+ t_atom *at = GetAtoms(1);
+ SetInt(*at,dt);
+#if FLEXT_SYS == FLEXT_SYS_PD
+ const t_symbol *sym = sym_float;
+#elif FLEXT_SYS == FLEXT_SYS_MAX
+ const t_symbol *sym = sym_int;
+#else
+#error Not implemented!
+#endif
+ Set(th,o,sym,1,at);
+ }
+
+ void Push(flext_base *th,int o,const t_symbol *dt)
+ {
+ t_atom *at = GetAtoms(1);
+ SetSymbol(*at,dt);
+ Set(th,o,sym_symbol,1,at);
+ }
+
+ void Push(flext_base *th,int o,int argc,const t_atom *argv)
+ {
+ t_atom *at = GetAtoms(argc);
+ memcpy(at,argv,argc*sizeof(t_atom));
+ Set(th,o,sym_list,argc,at);
+ }
+
+ void Push(flext_base *th,int o,const t_symbol *sym,int argc,const t_atom *argv)
+ {
+ t_atom *at = GetAtoms(argc);
+ memcpy(at,argv,argc*sizeof(t_atom));
+ Set(th,o,sym,argc,at);
+ }
+
+protected:
+ void Set(flext_base *th,int o,const t_symbol *sym,int argc,const t_atom *argv)
+ {
+ FLEXT_ASSERT(Count() < QUEUE_LENGTH-1);
+ lst[qtail].Set(th,o,sym,argc,argv);
+ qtail = (qtail+1)%QUEUE_LENGTH;
+ }
+
+ int CntAtoms() const
+ {
+ int c = atail-ahead;
+ return c >= 0?c:c+QUEUE_ATOMS;
+ }
+
+ // must return contiguous region
+ t_atom *GetAtoms(int argc)
+ {
+ // \todo check for available space
+
+ if(atail+argc >= QUEUE_ATOMS) {
+ atail = argc;
+ return atoms;
+ }
+ else {
+ t_atom *at = atoms+atail;
+ atail += argc;
+ return at;
+ }
+ }
+
+ void PopAtoms(int argc)
+ {
+ const int p = ahead+argc;
+ ahead = p >= QUEUE_ATOMS?argc:p;
+ }
+
+ int qhead,qtail;
+ qmsg lst[QUEUE_LENGTH];
+ int ahead,atail;
+ t_atom atoms[QUEUE_ATOMS];
+};
+
+static Queue queue;
-static volatile int qcnt = 0;
-static qmsg *volatile qhead = NULL,*volatile qtail = NULL;
#ifdef FLEXT_QTHR
static flext::ThrCond qthrcond;
@@ -75,102 +180,33 @@ static flext::ThrCond qthrcond;
static t_qelem *qclk = NULL;
#endif
-#ifdef FLEXT_THREADS
-static flext::ThrMutex qmutex;
-#endif
#define CHUNK 10
-static void QWork(bool qlock,bool syslock)
+static void QWork(bool syslock)
{
for(;;) {
// Since qcnt can only be increased from any other function than QWork
// qc will be a minimum guaranteed number of present queue elements.
// On the other hand, if new queue elements are added by the methods called
// in the loop, these will be sent in the next tick to avoid recursion overflow.
- int qc = qcnt;
+ int qc = queue.Count();
if(!qc) break;
#ifdef FLEXT_QTHR
if(syslock) sys_lock();
#endif
- for(int i = 0; i < qc && qhead; ++i) {
- #ifdef FLEXT_THREADS
- if(qlock) qmutex.Lock();
- #endif
- qmsg *m = qhead;
- qcnt--;
- qhead = m->nxt;
- if(!qhead) qtail = NULL;
- m->nxt = NULL;
- #ifdef FLEXT_THREADS
- if(qlock) qmutex.Unlock();
- #endif
-
- if(m->out < 0) {
- // message to self
-
- const int n = -1-m->out;
- t_atom tmp;
-
- switch(m->tp) {
- case qmsg::tp_bang:
- m->th->m_methodmain(n,flext::sym_bang,0,&tmp);
- break;
- case qmsg::tp_float:
- flext::SetFloat(tmp,m->_float);
- m->th->m_methodmain(n,flext::sym_float,1,&tmp);
- break;
- case qmsg::tp_int:
- flext::SetInt(tmp,m->_int);
- #if FLEXT_SYS == FLEXT_SYS_PD
- m->th->m_methodmain(n,flext::sym_float,1,&tmp);
- #elif FLEXT_SYS == FLEXT_SYS_MAX
- m->th->m_methodmain(n,flext::sym_int,1,&tmp);
- #else
- #error Not implemented!
- #endif
- case qmsg::tp_sym:
- flext::SetSymbol(tmp,m->_sym);
- m->th->m_methodmain(n,flext::sym_symbol,1,&tmp);
- break;
- case qmsg::tp_list:
- m->th->m_methodmain(n,flext::sym_list,m->_list.argc,m->_list.argv);
- break;
- case qmsg::tp_any:
- m->th->m_methodmain(n,m->_any.s,m->_any.argc,m->_any.argv);
- break;
- #ifdef FLEXT_DEBUG
- default: ERRINTERNAL();
- #endif
- }
- }
- else {
- // message to outlet
-
- switch(m->tp) {
- case qmsg::tp_bang: m->th->ToSysBang(m->out); break;
- case qmsg::tp_float: m->th->ToSysFloat(m->out,m->_float); break;
- case qmsg::tp_int: m->th->ToSysInt(m->out,m->_int); break;
- case qmsg::tp_sym: m->th->ToSysSymbol(m->out,m->_sym); break;
- case qmsg::tp_list: m->th->ToSysList(m->out,m->_list.argc,m->_list.argv); break;
- case qmsg::tp_any: m->th->ToSysAnything(m->out,m->_any.s,m->_any.argc,m->_any.argv); break;
- #ifdef FLEXT_DEBUG
- default: ERRINTERNAL();
- #endif
- }
- }
-
- // delete processed queue element
- delete m;
+ for(int i = 0; i < qc; ++i) {
+ queue.Head().Send();
+ queue.Pop();
} // inner loop
#ifdef FLEXT_QTHR
if(syslock) sys_unlock();
#endif
- } // for(;;)
+ }
}
#if !defined(FLEXT_QTHR)
@@ -181,11 +217,10 @@ static void QTick(fts_object_t *c,int winlet, fts_symbol_t s, int ac, const fts_
static void QTick(flext_base *c)
{
#endif
-// post("qtick");
#ifdef FLEXT_THREADS
FLEXT_ASSERT(flext::IsSystemThread());
#endif
- QWork(true,false);
+ QWork(false);
}
#endif
@@ -201,30 +236,12 @@ void flext_base::QFlush(flext_base *th)
return;
}
#endif
-#ifdef FLEXT_THREADS
- qmutex.Lock();
-#endif
- while(qcnt) QWork(false,false);
-#ifdef FLEXT_THREADS
- qmutex.Unlock();
-#endif
+
+ while(!queue.Empty()) QWork(false);
}
-static void Queue(qmsg *m)
+static void Trigger()
{
-// post("Queue");
-
-#ifdef FLEXT_THREADS
- qmutex.Lock();
-#endif
- if(qtail) qtail->nxt = m;
- else qhead = m;
- qtail = m;
- qcnt++;
-#ifdef FLEXT_THREADS
- qmutex.Unlock();
-#endif
-
#if FLEXT_SYS == FLEXT_SYS_PD
#ifdef FLEXT_QTHR
// wake up a worker thread
@@ -249,7 +266,7 @@ void flext_base::QWorker(thr_params *)
thrmsgid = GetThreadId();
for(;;) {
qthrcond.Wait();
- QWork(true,true);
+ QWork(true);
}
}
#endif
@@ -260,10 +277,6 @@ void flext_base::StartQueue()
if(started) return;
else started = true;
- // message queue ticker
- qhead = qtail = NULL;
- qcnt = 0;
-
#ifdef FLEXT_QTHR
LaunchThread(QWorker,NULL);
#else
@@ -277,86 +290,36 @@ void flext_base::StartQueue()
void flext_base::ToQueueBang(int o) const
{
- FLEXT_ASSERT(o >= 0);
- qmsg *m = new qmsg(const_cast<flext_base *>(this));
- m->SetBang(o);
- Queue(m);
+ queue.Push(const_cast<flext_base *>(this),o);
+ Trigger();
}
void flext_base::ToQueueFloat(int o,float f) const
{
- FLEXT_ASSERT(o >= 0);
- qmsg *m = new qmsg(const_cast<flext_base *>(this));
- m->SetFloat(o,f);
- Queue(m);
+ queue.Push(const_cast<flext_base *>(this),o,f);
+ Trigger();
}
void flext_base::ToQueueInt(int o,int f) const
{
- FLEXT_ASSERT(o >= 0);
- qmsg *m = new qmsg(const_cast<flext_base *>(this));
- m->SetInt(o,f);
- Queue(m);
+ queue.Push(const_cast<flext_base *>(this),o,f);
+ Trigger();
}
void flext_base::ToQueueSymbol(int o,const t_symbol *s) const
{
- FLEXT_ASSERT(o >= 0);
- qmsg *m = new qmsg(const_cast<flext_base *>(this));
- m->SetSymbol(o,s);
- Queue(m);
+ queue.Push(const_cast<flext_base *>(this),o,s);
+ Trigger();
}
void flext_base::ToQueueList(int o,int argc,const t_atom *argv) const
{
- FLEXT_ASSERT(o >= 0);
- qmsg *m = new qmsg(const_cast<flext_base *>(this));
- m->SetList(o,argc,argv);
- Queue(m);
+ queue.Push(const_cast<flext_base *>(this),o,argc,argv);
+ Trigger();
}
void flext_base::ToQueueAnything(int o,const t_symbol *s,int argc,const t_atom *argv) const
{
- FLEXT_ASSERT(o >= 0);
- qmsg *m = new qmsg(const_cast<flext_base *>(this));
- m->SetAny(o,s,argc,argv);
- Queue(m);
-}
-
-
-void flext_base::ToSelfBang(int n) const
-{
- FLEXT_ASSERT(n >= 0);
- ToQueueBang(-1-n);
-}
-
-void flext_base::ToSelfFloat(int n,float f) const
-{
- FLEXT_ASSERT(n >= 0);
- ToQueueFloat(-1-n,f);
-}
-
-void flext_base::ToSelfInt(int n,int f) const
-{
- FLEXT_ASSERT(n >= 0);
- ToQueueInt(-1-n,f);
+ queue.Push(const_cast<flext_base *>(this),o,s,argc,argv);
+ Trigger();
}
-
-void flext_base::ToSelfSymbol(int n,const t_symbol *s) const
-{
- FLEXT_ASSERT(n >= 0);
- ToQueueSymbol(-1-n,s);
-}
-
-void flext_base::ToSelfList(int n,int argc,const t_atom *argv) const
-{
- FLEXT_ASSERT(n >= 0);
- ToQueueList(-1-n,argc,argv);
-}
-
-void flext_base::ToSelfAnything(int n,const t_symbol *s,int argc,const t_atom *argv) const
-{
- FLEXT_ASSERT(n >= 0);
- ToQueueAnything(-1-n,s,argc,argv);
-}
-