diff options
Diffstat (limited to 'externals/grill/flext/source/flthr.cpp')
-rw-r--r-- | externals/grill/flext/source/flthr.cpp | 484 |
1 files changed, 247 insertions, 237 deletions
diff --git a/externals/grill/flext/source/flthr.cpp b/externals/grill/flext/source/flthr.cpp index 0750dd83..47c64354 100644 --- a/externals/grill/flext/source/flthr.cpp +++ b/externals/grill/flext/source/flthr.cpp @@ -13,10 +13,15 @@ WARRANTIES, see the file, "license.txt," in this distribution. */ #include "flext.h" -#include "flinternal.h" #ifdef FLEXT_THREADS +// maximum wait time for threads to finish (in ms) +#define MAXIMUMWAIT 100 + + +#include "flinternal.h" +#include "flcontainers.h" #include <time.h> @@ -31,7 +36,6 @@ WARRANTIES, see the file, "license.txt," in this distribution. #error WIN32 threads need Windows SDK version >= 0x500 #endif - #include <errno.h> //! Thread id of system thread @@ -40,11 +44,61 @@ flext::thrid_t flext::thrid = 0; //! Thread id of helper thread flext::thrid_t flext::thrhelpid = 0; -static flext::thr_entry *thrhead = NULL,*thrtail = NULL; -static flext::ThrMutex tlmutex; -//! Helper thread should terminate -static bool thrhelpexit = false; +//! \brief This represents an entry to the list of active method threads +class thr_entry + : public flext + , public Fifo::Cell +{ +public: + void thr_entry::Set(void (*m)(thr_params *),thr_params *p,thrid_t id = GetThreadId()) + { + th = p?p->cl:NULL; + meth = m,params = p,thrid = id; + shouldexit = false; +#if FLEXT_THREADS == FLEXT_THR_MP + weight = 100; // MP default weight +#endif + } + + //! \brief Check if this class represents the current thread + bool Is(thrid_t id = GetThreadId()) const { return IsThread(thrid,id); } + + FLEXT_CLASSDEF(flext_base) *This() const { return th; } + thrid_t Id() const { return thrid; } + + FLEXT_CLASSDEF(flext_base) *th; + void (*meth)(thr_params *); + thr_params *params; + thrid_t thrid; + bool shouldexit; +#if FLEXT_THREADS == FLEXT_THR_MP + int weight; +#endif +}; + +template<class T> +class ThrFinder: + public T +{ +public: + void Push(thr_entry *e) { T::Push(e); } + thr_entry *Pop() { return T::Pop(); } + + thr_entry *Find(flext::thrid_t id,bool pop = false) + { + TypedLifo<thr_entry> qutmp; + thr_entry *fnd; + while((fnd = Pop()) && !fnd->Is(id)) qutmp.Push(fnd); + // put back entries + for(thr_entry *ti; ti = qutmp.Pop(); ) Push(ti); + if(fnd && !pop) Push(fnd); + return fnd; + } +}; + +static ThrFinder< PooledLifo<thr_entry,1,10> > thrpending; +static ThrFinder< TypedLifo<thr_entry> > thractive,thrstopped; //! Helper thread conditional static flext::ThrCond *thrhelpcond = NULL; @@ -53,12 +107,13 @@ static flext::ThrCond *thrhelpcond = NULL; flext::thrid_t flext::GetSysThreadId() { return thrid; } -void flext::LaunchHelper(thr_entry *e) +static void LaunchHelper(thr_entry *e) { - e->thrid = GetThreadId(); + e->thrid = flext::GetThreadId(); e->meth(e->params); } + //! Start helper thread bool flext::StartHelper() { @@ -76,7 +131,6 @@ bool flext::StartHelper() pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED); - thrhelpexit = false; ok = pthread_create (&thrhelpid,&attr,(void *(*)(void *))ThrHelper,NULL) == 0; #elif FLEXT_THREADS == FLEXT_THR_MP if(!MPLibraryIsLoaded()) @@ -105,17 +159,6 @@ bool flext::StartHelper() return ok; } -#if 0 -/*! \brief Stop helper thread - \note Never called! -*/ -bool flext::StopHelper() -{ - thrhelpexit = true; - if(thrhelpcond) thrhelpcond->Signal(); -} -#endif - //! Static helper thread function void flext::ThrHelper(void *) { @@ -128,8 +171,6 @@ void flext::ThrHelper(void *) pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED); #endif -// post("Helper thread started"); - // set thread priority one point below normal // so thread construction won't disturb real-time audio RelPriority(-1); @@ -138,156 +179,151 @@ void flext::ThrHelper(void *) // helper loop for(;;) { -// post("Helper loop"); - thrhelpcond->Wait(); - if(thrhelpexit) break; - -// post("Helper signalled"); - tlmutex.Lock(); -// post("Helper after lock"); - - // start all inactive threads in queue - thr_entry *prv = NULL,*ti; - for(ti = thrhead; ti; prv = ti,ti = ti->nxt) { - if(!ti->active) { - bool ok; - -// post("Helper start thread"); -#if FLEXT_THREADS == FLEXT_THR_POSIX - thrid_t dummy; - ok = pthread_create (&dummy,&attr,(void *(*)(void *))LaunchHelper,ti) == 0; -#elif FLEXT_THREADS == FLEXT_THR_MP - thrid_t dummy; - ok = MPCreateTask((TaskProc)LaunchHelper,ti,0,0,0,0,0,&dummy) == noErr; -#elif FLEXT_THREADS == FLEXT_THR_WIN32 - ok = _beginthread((void (*)(void *))LaunchHelper,0,ti) >= 0; -#else -#error -#endif - if(!ok) { - error("flext - Could not launch thread!"); - - // delete from queue - if(prv) - prv->nxt = ti->nxt; - else - thrhead = ti->nxt; - if(thrtail == ti) thrtail = prv; - - ti->nxt = NULL; - delete ti; - } - else { -// post("Helper thread ok"); - - // set active flag - ti->active = true; - } - } - } - - tlmutex.Unlock(); + // start all inactive threads + thr_entry *ti; + while(ti = thrpending.Pop()) { + bool ok; + + #if FLEXT_THREADS == FLEXT_THR_POSIX + thrid_t dummy; + ok = pthread_create (&dummy,&attr,(void *(*)(void *))LaunchHelper,ti) == 0; + #elif FLEXT_THREADS == FLEXT_THR_MP + thrid_t dummy; + ok = MPCreateTask((TaskProc)LaunchHelper,ti,0,0,0,0,0,&dummy) == noErr; + #elif FLEXT_THREADS == FLEXT_THR_WIN32 + ok = _beginthread((void (*)(void *))LaunchHelper,0,ti) >= 0; + #else + #error + #endif + if(!ok) { + error("flext - Could not launch thread!"); + thrpending.Free(ti); ti = NULL; + } + else + // insert into queue of active threads + thractive.Push(ti); + } } + FLEXT_ASSERT(false); +/* + // Never reached! + delete thrhelpcond; thrhelpcond = NULL; #if FLEXT_THREADS == FLEXT_THR_POSIX pthread_attr_destroy(&attr); #endif +*/ } bool flext::LaunchThread(void (*meth)(thr_params *p),thr_params *p) { -#ifdef FLEXT_DEBUG - if(!thrhelpcond) { - ERRINTERNAL(); - return false; - } -#endif - -// post("make threads"); - - tlmutex.Lock(); + FLEXT_ASSERT(thrhelpcond); // make an entry into thread list - thr_entry *nt = new thr_entry(meth,p); - if(thrtail) thrtail->nxt = nt; - else thrhead = nt; - thrtail = nt; - - tlmutex.Unlock(); - -// post("signal helper"); - + thr_entry *e = thrpending.New(); + e->Set(meth,p); + thrpending.Push(e); // signal thread helper thrhelpcond->Signal(); return true; } +static bool waitforstopped(TypedFifo<thr_entry> &qufnd,float wait = 0) +{ + TypedLifo<thr_entry> qutmp; + + double until; + if(wait) until = flext::GetOSTime()+wait; + + for(;;) { + thr_entry *fnd = qufnd.Get(); + if(!fnd) break; // no more entries -> done! + + thr_entry *ti; + // search for entry + while((ti = thrstopped.Pop()) != NULL && ti != fnd) qutmp.Push(ti); + // put back entries + while((ti = qutmp.Pop()) != NULL) thrstopped.Push(ti); + + if(ti) { + // still in thrstopped queue + qufnd.Put(fnd); + // yield to other threads + flext::ThrYield(); + + if(wait && flext::GetOSTime() > until) + // not successful -> remaining thread are still in qufnd queue + return false; + } + } + return true; +} + bool flext::StopThread(void (*meth)(thr_params *p),thr_params *p,bool wait) { -#ifdef FLEXT_DEBUG - if(!thrhelpcond) { - ERRINTERNAL(); - return false; - } -#endif + FLEXT_ASSERT(thrhelpcond); - int found = 0; - - tlmutex.Lock(); - for(thr_entry *ti = thrhead; ti; ti = ti->nxt) - // set shouldexit if meth and params match - if(ti->meth == meth && ti->params == p) { ti->shouldexit = true; found++; } - tlmutex.Unlock(); - - if(found) { - // signal thread helper - thrhelpcond->Signal(); -#if 0 - // ###################### - // i don't think we need to wait for a single thread to stop - // ###################### - int cnt = 0; - for(int wi = 0; wi < 100; ++wi) { - // lock and count this object's threads - cnt = 0; - tlmutex.Lock(); - for(thr_entry *t = thrhead; t; t = t->nxt) - if(t->meth == meth && t->params == p) ++cnt; - tlmutex.Unlock(); - - // if no threads are remaining, we are done - if(!cnt) break; - - // Wait - Sleep(0.01f); - } + TypedLifo<thr_entry> qutmp; + thr_entry *ti; - return cnt == 0; -#endif - return true; - } - else - return false; + // first search pending queue + // -------------------------- + + { + bool found = false; + while((ti = thrpending.Pop()) != NULL) + if(ti->meth == meth && ti->params == p) { + // found -> thread hasn't started -> just delete + thrpending.Free(ti); + found = true; + } + else + qutmp.Push(ti); + + // put back into pending queue (order doesn't matter) + while((ti = qutmp.Pop()) != NULL) thrpending.Push(ti); + + if(found) return true; + } + + // now search active queue + // ----------------------- + + TypedFifo<thr_entry> qufnd; + + while((ti = thractive.Pop()) != NULL) + if(ti->meth == meth && ti->params == p) { + thrstopped.Push(ti); + thrhelpcond->Signal(); + qufnd.Put(ti); + } + else + qutmp.Push(ti); + + // put back into pending queue (order doesn't matter) + while((ti = qutmp.Pop()) != NULL) thractive.Push(ti); + + // wakeup helper thread + thrhelpcond->Signal(); + + // now wait for entries in qufnd to have vanished from thrstopped + if(wait) + return waitforstopped(qufnd); + else + return qufnd.Size() == 0; } bool flext_base::ShouldExit() const { - bool ret = true; - - tlmutex.Lock(); - for(thr_entry *ti = thrhead; ti; ti = ti->nxt) - if(ti->Is()) { ret = ti->shouldexit; break; } - tlmutex.Unlock(); - - // thread was not found -> EXIT!!! - return ret; + thr_entry *fnd = thrstopped.Find(GetThreadId()); + return fnd != NULL; } bool flext::PushThread() @@ -299,99 +335,87 @@ bool flext::PushThread() void flext::PopThread() { - tlmutex.Lock(); - -// post("Pop thread"); - - thr_entry *prv = NULL,*ti; - for(ti = thrhead; ti; prv = ti,ti = ti->nxt) - if(ti->Is()) break; + thrid_t id = GetThreadId(); + thr_entry *fnd = thrstopped.Find(id,true); + if(!fnd) fnd = thractive.Find(id,true); - if(ti) { - if(prv) - prv->nxt = ti->nxt; - else - thrhead = ti->nxt; - if(thrtail == ti) thrtail = prv; - - ti->nxt = NULL; - delete ti; - } - else { + if(fnd) + thrpending.Free(fnd); #ifdef FLEXT_DEBUG + else post("flext - INTERNAL ERROR: Thread not found!"); #endif - } - - tlmutex.Unlock(); } //! Terminate all object threads bool flext_base::StopThreads() { - thr_entry *t; + FLEXT_ASSERT(thrhelpcond); - // signal termination for all object's threads - tlmutex.Lock(); - for(t = thrhead; t; t = t->nxt) { - if(t->This() == this) t->shouldexit = true; - } - tlmutex.Unlock(); + TypedLifo<thr_entry> qutmp; + thr_entry *ti; - // TODO: maybe there should be a thread conditional for every thread so that it can be signalled efficiently + // first search pending queue + // -------------------------- - // wait for thread termination (1 second max.) - int cnt; - for(int wi = 0; wi < 100; ++wi) { - // lock and count this object's threads - tlmutex.Lock(); - for(cnt = 0,t = thrhead; t; t = t->nxt) - if(t->This() == this) ++cnt; - tlmutex.Unlock(); + bool found = false; + while((ti = thrpending.Pop()) != NULL) + if(ti->This() == this) + // found -> thread hasn't started -> just delete + thrpending.Free(ti); + else + qutmp.Push(ti); - // if no threads are remaining, we are done - if(!cnt) break; + // put back into pending queue (order doesn't matter) + while((ti = qutmp.Pop()) != NULL) thrpending.Push(ti); - // Wait - Sleep(0.01f); - } + // now search active queue + // ----------------------- + + TypedFifo<thr_entry> qufnd; + + while((ti = thractive.Pop()) != NULL) + if(ti->This() == this) { + thrstopped.Push(ti); + thrhelpcond->Signal(); + qufnd.Put(ti); + } + else + qutmp.Push(ti); - if(cnt) { + // put back into pending queue (order doesn't matter) + while((ti = qutmp.Pop()) != NULL) thractive.Push(ti); + + // wakeup helper thread + thrhelpcond->Signal(); + + // now wait for entries in qufnd to have vanished from thrstopped + if(!waitforstopped(qufnd,MAXIMUMWAIT*0.001f)) { #ifdef FLEXT_DEBUG post("flext - doing hard thread termination"); #endif - - // --- all object threads have terminated by now ------- - tlmutex.Lock(); // timeout -> hard termination - for(t = thrhead; t; ) - if(t->This() == this) { - #if FLEXT_THREADS == FLEXT_THR_POSIX - if(pthread_cancel(t->thrid)) post("%s - Thread could not be terminated!",thisName()); - #elif FLEXT_THREADS == FLEXT_THR_MP - MPTerminateTask(t->thrid,0); - // here, we should use a task queue to check whether the task has really terminated!! - #elif FLEXT_THREADS == FLEXT_THR_WIN32 - // can't use the c library function _endthread.. memory leaks will occur - HANDLE hnd = OpenThread(THREAD_ALL_ACCESS,TRUE,t->thrid); - TerminateThread(hnd,0); - #else - #error - #endif - thr_entry *tn = t->nxt; - t->nxt = NULL; delete t; - t = tn; - } - else t = t->nxt; - thrhead = NULL; - - tlmutex.Unlock(); + while((ti = qufnd.Get()) != NULL) { +#if FLEXT_THREADS == FLEXT_THR_POSIX + if(pthread_cancel(ti->thrid)) + post("%s - Thread could not be terminated!",thisName()); +#elif FLEXT_THREADS == FLEXT_THR_MP + MPTerminateTask(ti->thrid,0); + // here, we should use a task queue to check whether the task has really terminated!! +#elif FLEXT_THREADS == FLEXT_THR_WIN32 + // can't use the c library function _endthread.. memory leaks will occur + HANDLE hnd = OpenThread(THREAD_ALL_ACCESS,TRUE,ti->thrid); + TerminateThread(hnd,0); +#else +#error Not implemented +#endif + thrpending.Free(ti); + } + return false; } - -// post("All threads have terminated"); - - return true; + else + return true; } bool flext::RelPriority(int dp,thrid_t ref,thrid_t id) @@ -469,9 +493,9 @@ bool flext::RelPriority(int dp,thrid_t ref,thrid_t id) return true; #elif FLEXT_THREADS == FLEXT_THR_MP - thr_entry *t; - for(t = thrhead; t && t->Id() != id; t = t->nxt) {} - if(t) { + thr_entry *ti = thrpending.Find(id); + if(!ti) ti = thractive.Find(id); + if(ti) { // thread found in list int w = GetPriority(id); if(dp < 0) w /= 1<<(-dp); @@ -488,7 +512,7 @@ bool flext::RelPriority(int dp,thrid_t ref,thrid_t id) #endif w = 10000; } - t->weight = w; + ti->weight = w; return MPSetTaskWeight(id,w) == noErr; } else return false; @@ -524,9 +548,9 @@ int flext::GetPriority(thrid_t id) return pr; #elif FLEXT_THREADS == FLEXT_THR_MP - thr_entry *t; - for(t = thrhead; t && t->Id() != id; t = t->nxt) {} - return t?t->weight:-1; + thr_entry *ti = thrpending.Find(id); + if(!ti) ti = thractive.Find(id); + return ti?ti->weight:-1; #else #error #endif @@ -566,12 +590,9 @@ bool flext::SetPriority(int p,thrid_t id) return true; #elif FLEXT_THREADS == FLEXT_THR_MP - thr_entry *t; - for(t = thrhead; t && t->Id() != id; t = t->nxt) {} - if(t) - return MPSetTaskWeight(id,t->weight = p) == noErr; - else - return false; + thr_entry *ti = thrpending.Find(id); + if(!ti) ti = thractive.Find(id); + return ti && MPSetTaskWeight(id,ti->weight = p) == noErr; #else #error #endif @@ -585,17 +606,6 @@ void flext_base::thr_params::set_any(const t_symbol *s,int argc,const t_atom *ar void flext_base::thr_params::set_list(int argc,const t_atom *argv) { var[0]._list.args = new AtomList(argc,argv); } -flext_base::thr_entry::thr_entry(void (*m)(thr_params *),thr_params *p,thrid_t id): - th(p?p->cl:NULL),meth(m),params(p),thrid(id), - active(false),shouldexit(false), -#if FLEXT_THREADS == FLEXT_THR_MP - weight(100), // MP default weight -#endif - nxt(NULL) -{} - - - #if FLEXT_THREADS == FLEXT_THR_POSIX bool flext::ThrCond::Wait() { Lock(); |