From 630623a0c7289ddd442d139232dea68dcb07a197 Mon Sep 17 00:00:00 2001 From: Thomas Grill Date: Tue, 1 Jul 2003 02:32:46 +0000 Subject: "" svn path=/trunk/; revision=741 --- externals/grill/flext/source/flqueue.cpp | 193 +++++++++++++++++-------------- 1 file changed, 103 insertions(+), 90 deletions(-) (limited to 'externals/grill/flext/source/flqueue.cpp') diff --git a/externals/grill/flext/source/flqueue.cpp b/externals/grill/flext/source/flqueue.cpp index ea432e5b..34bf9b9f 100755 --- a/externals/grill/flext/source/flqueue.cpp +++ b/externals/grill/flext/source/flqueue.cpp @@ -61,6 +61,7 @@ void qmsg::Clear() tp = tp_none; } +static int qcnt = 0; static qmsg *qhead = NULL,*qtail = NULL; #ifdef FLEXT_QTHR @@ -73,86 +74,99 @@ static t_qelem *qclk = NULL; static flext::ThrMutex qmutex; #endif +#define CHUNK 10 + static void QWork(bool qlock,bool syslock) { -#ifdef FLEXT_THREADS - if(qlock) qmutex.Lock(); -#endif -#ifdef FLEXT_QTHR - if(syslock) pd_lock(); -#endif - for(;;) { - qmsg *m = qhead; - if(!m) break; - - if(m->out < 0) { - // message to self - - const int n = -1-m->out; - t_atom tmp; - - switch(m->tp) { - case qmsg::tp_bang: - m->th->m_methodmain(n,flext::sym_bang,0,&tmp); - break; - case qmsg::tp_float: - flext::SetFloat(tmp,m->_float); - m->th->m_methodmain(n,flext::sym_float,1,&tmp); - break; - case qmsg::tp_int: - flext::SetInt(tmp,m->_int); -#if FLEXT_SYS == FLEXT_SYS_PD - m->th->m_methodmain(n,flext::sym_float,1,&tmp); -#elif FLEXT_SYS == FLEXT_SYS_MAX - m->th->m_methodmain(n,flext::sym_int,1,&tmp); -#else -#error Not implemented! -#endif - case qmsg::tp_sym: - flext::SetSymbol(tmp,m->_sym); - m->th->m_methodmain(n,flext::sym_symbol,1,&tmp); - break; - case qmsg::tp_list: - m->th->m_methodmain(n,flext::sym_list,m->_list.argc,m->_list.argv); - break; - case qmsg::tp_any: - m->th->m_methodmain(n,m->_any.s,m->_any.argc,m->_any.argv); - break; - #ifdef FLEXT_DEBUG - default: ERRINTERNAL(); - #endif - } - } - else { - // message to outlet - - switch(m->tp) { - case qmsg::tp_bang: m->th->ToOutBang(m->out); break; - case qmsg::tp_float: m->th->ToOutFloat(m->out,m->_float); break; - case qmsg::tp_int: m->th->ToOutInt(m->out,m->_int); break; - case qmsg::tp_sym: m->th->ToOutSymbol(m->out,m->_sym); break; - case qmsg::tp_list: m->th->ToOutList(m->out,m->_list.argc,m->_list.argv); break; - case qmsg::tp_any: m->th->ToOutAnything(m->out,m->_any.s,m->_any.argc,m->_any.argv); break; - #ifdef FLEXT_DEBUG - default: ERRINTERNAL(); - #endif - } - } - - qhead = m->nxt; - if(!qhead) qtail = NULL; - m->nxt = NULL; - delete m; - } -#ifdef FLEXT_QTHR - if(syslock) pd_unlock(); -#endif -#ifdef FLEXT_THREADS - if(qlock) qmutex.Unlock(); -#endif + // since qcnt can only be increased from any other function than QWork + // qc will be a minimum guaranteed number of present queue elements + int qc = qcnt; + if(!qc) break; + + #ifdef FLEXT_QTHR + if(syslock) pd_lock(); + #endif + + for(int i = 0; i < qc && qhead; ++i) { + #ifdef FLEXT_THREADS + if(qlock) qmutex.Lock(); + #endif + qmsg *m = qhead; + qcnt--; + qhead = m->nxt; + if(!qhead) qtail = NULL; + m->nxt = NULL; + #ifdef FLEXT_THREADS + if(qlock) qmutex.Unlock(); + #endif + + if(m->out < 0) { + // message to self + + const int n = -1-m->out; + t_atom tmp; + + switch(m->tp) { + case qmsg::tp_bang: + m->th->m_methodmain(n,flext::sym_bang,0,&tmp); + break; + case qmsg::tp_float: + flext::SetFloat(tmp,m->_float); + m->th->m_methodmain(n,flext::sym_float,1,&tmp); + break; + case qmsg::tp_int: + flext::SetInt(tmp,m->_int); + #if FLEXT_SYS == FLEXT_SYS_PD + m->th->m_methodmain(n,flext::sym_float,1,&tmp); + #elif FLEXT_SYS == FLEXT_SYS_MAX + m->th->m_methodmain(n,flext::sym_int,1,&tmp); + #else + #error Not implemented! + #endif + case qmsg::tp_sym: + flext::SetSymbol(tmp,m->_sym); + m->th->m_methodmain(n,flext::sym_symbol,1,&tmp); + break; + case qmsg::tp_list: + m->th->m_methodmain(n,flext::sym_list,m->_list.argc,m->_list.argv); + break; + case qmsg::tp_any: + m->th->m_methodmain(n,m->_any.s,m->_any.argc,m->_any.argv); + break; + #ifdef FLEXT_DEBUG + default: ERRINTERNAL(); + #endif + } + } + else { + // message to outlet + + switch(m->tp) { + case qmsg::tp_bang: m->th->ToSysBang(m->out); break; + case qmsg::tp_float: m->th->ToSysFloat(m->out,m->_float); break; + case qmsg::tp_int: m->th->ToSysInt(m->out,m->_int); break; + case qmsg::tp_sym: m->th->ToSysSymbol(m->out,m->_sym); break; + case qmsg::tp_list: m->th->ToSysList(m->out,m->_list.argc,m->_list.argv); break; + case qmsg::tp_any: m->th->ToSysAnything(m->out,m->_any.s,m->_any.argc,m->_any.argv); break; + #ifdef FLEXT_DEBUG + default: ERRINTERNAL(); + #endif + } + } + + // delete processed queue element + delete m; + } // inner loop + + #ifdef FLEXT_QTHR + if(syslock) pd_unlock(); + #endif + + } // for(;;) } +#if !defined(FLEXT_QTHR) #if FLEXT_SYS == FLEXT_SYS_JMAX static void QTick(fts_object_t *c,int winlet, fts_symbol_t s, int ac, const fts_atom_t *at) { @@ -161,21 +175,12 @@ static void QTick(flext_base *c) { #endif // post("qtick"); -#if defined(FLEXT_THREADS) && defined(FLEXT_DEBUG) && !defined(FLEXT_QTHR) - if(!flext::IsSystemThread()) { - error("flext - Queue tick called by wrong thread!"); - return; - } -#endif - QWork(true,true); - -/* -#if !defined(FLEXT_QTHR) && (FLEXT_SYS == FLEXT_SYS_PD || FLEXT_SYS == FLEXT_SYS_MAX) - // Reclocking for safety - clock_delay(qclk,10); +#ifdef FLEXT_THREADS + FLEXT_ASSERT(flext::IsSystemThread()); #endif -*/ + QWork(true,false); } +#endif /* It would be sufficient to only flush messages belonging to object th @@ -189,7 +194,13 @@ void flext_base::QFlush(flext_base *th) return; } #endif - while(qhead) QWork(true,false); +#ifdef FLEXT_THREADS + qmutex.Lock(); +#endif + while(qcnt) QWork(false,false); +#ifdef FLEXT_THREADS + qmutex.Unlock(); +#endif } static void Queue(qmsg *m) @@ -202,6 +213,7 @@ static void Queue(qmsg *m) if(qtail) qtail->nxt = m; else qhead = m; qtail = m; + qcnt++; #ifdef FLEXT_THREADS qmutex.Unlock(); #endif @@ -228,7 +240,7 @@ static void Queue(qmsg *m) void QWorker(flext::thr_params *) { for(;;) { - qthrcond.TimedWait(0.01); + qthrcond.Wait(); QWork(true,true); } } @@ -238,6 +250,7 @@ void flext_base::StartQueue() { // message queue ticker qhead = qtail = NULL; + qcnt = 0; #ifdef FLEXT_QTHR LaunchThread(QWorker,NULL); -- cgit v1.2.1