From 99a29c1926eee84f100ad9ea59a8c33f7878c342 Mon Sep 17 00:00:00 2001 From: Thomas Grill Date: Sun, 22 Dec 2002 01:28:34 +0000 Subject: "no message" svn path=/trunk/; revision=306 --- externals/grill/flext/source/flthr.cpp | 230 ++++++++++++++++++++++++--------- 1 file changed, 172 insertions(+), 58 deletions(-) (limited to 'externals/grill/flext/source/flthr.cpp') diff --git a/externals/grill/flext/source/flthr.cpp b/externals/grill/flext/source/flthr.cpp index 96e52b0d..e8834b76 100644 --- a/externals/grill/flext/source/flthr.cpp +++ b/externals/grill/flext/source/flthr.cpp @@ -17,57 +17,60 @@ WARRANTIES, see the file, "license.txt," in this distribution. #ifdef FLEXT_THREADS -#if FLEXT_OS == FLEXT_OS_WIN -#include -#endif - #include //! Thread id of system thread flext::thrid_t flext::thrid; //! Thread id of helper thread -flext::thrid_t flext_base::thrhelpid; +flext::thrid_t flext::thrhelpid; -flext_base::thr_entry *flext_base::thrhead = NULL,*flext_base::thrtail = NULL; -flext::ThrMutex flext_base::tlmutex; +/* +flext::thr_entry *flext::thrhead = NULL,*flext::thrtail = NULL; +flext::ThrMutex flext::tlmutex; +*/ +static flext::thr_entry *thrhead = NULL,*thrtail = NULL; +static flext::ThrMutex tlmutex; //! Helper thread should terminate -bool thrhelpexit = false; +static bool thrhelpexit = false; //! Helper thread conditional -flext::ThrCond *thrhelpcond = NULL; - - -flext_base::thr_entry::thr_entry(flext_base *t,void *(*m)(thr_params *),thr_params *p,pthread_t id): - th(t),meth(m),params(p),thrid(id), - active(false),shouldexit(false), - nxt(NULL) -{} +static flext::ThrCond *thrhelpcond = NULL; //! Start helper thread -bool flext_base::StartHelper() +bool flext::StartHelper() { + bool ok = false; +#if FLEXT_THREADS == FLEXT_THR_POSIX pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED); thrhelpexit = false; int ret = pthread_create (&thrhelpid,&attr,(void *(*)(void *))ThrHelper,NULL); - if(ret) { - error((char *)("flext - Could not launch helper thread!")); - return false; + ok = !ret; +#elif FLEXT_THREADS == FLEXT_THR_MP + if(!MPLibraryIsLoaded()) + error("Thread library is not loaded"); + else { + OSStatus ret = MPCreateTask((TaskProc)ThrHelper,NULL,0,0,0,0,0,&thrhelpid); + ok = ret == noErr; } - else - return true; +#else +#error +#endif + if(!ok) + error("flext - Could not launch helper thread!"); + return ok; } #if 0 /*! \brief Stop helper thread \note not called! */ -bool flext_base::StopHelper() +bool flext::StopHelper() { thrhelpexit = true; if(thrhelpcond) thrhelpcond->Signal(); @@ -75,12 +78,16 @@ bool flext_base::StopHelper() #endif //! Static helper thread function -void flext_base::ThrHelper(void *) +void flext::ThrHelper(void *) { +#if FLEXT_THREADS == FLEXT_THR_POSIX // set prototype thread attributes pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED); +#endif + +// post("Helper"); // set thread priority one point below normal // so thread construction won't disturb real-time audio @@ -90,18 +97,32 @@ void flext_base::ThrHelper(void *) // helper loop for(;;) { +// post("Helper loop"); + thrhelpcond->Wait(); if(thrhelpexit) break; +// post("Helper signalled"); tlmutex.Lock(); +// post("Helper after lock"); + // start all inactive threads in queue thr_entry *prv = NULL,*ti; for(ti = thrhead; ti; prv = ti,ti = ti->nxt) { if(!ti->active) { - int ret = pthread_create (&ti->thrid,&attr,(void *(*)(void *))ti->meth,ti->params); - if(ret) { - error((char *)("flext - Could not launch thread!")); + bool ok; + +// post("Helper start thread"); +#if FLEXT_THREADS == FLEXT_THR_POSIX + ok = pthread_create (&ti->thrid,&attr,(void *(*)(void *))ti->meth,ti->params) == 0; +#elif FLEXT_THREADS == FLEXT_THR_MP + ok = MPCreateTask((TaskProc)ti->meth,ti->params,0,0,0,0,0,&ti->thrid) == noErr; +#else +#error +#endif + if(!ok) { + error("flext - Could not launch thread!"); // delete from queue if(prv) @@ -114,6 +135,8 @@ void flext_base::ThrHelper(void *) delete ti; } else { +// post("Helper thread ok"); + // set active flag ti->active = true; } @@ -128,47 +151,56 @@ void flext_base::ThrHelper(void *) } -bool flext_base::StartThread(void *(*meth)(thr_params *p),thr_params *p,char *methname) +bool flext::LaunchThread(void (*meth)(thr_params *p),thr_params *p) { #ifdef FLEXT_DEBUG - if(!p || !thrhelpcond) { + if(!thrhelpcond) { ERRINTERNAL(); return false; } #endif +// post("make threads"); + tlmutex.Lock(); // make an entry into thread list - thr_entry *nt = new thr_entry(this,meth,p); + thr_entry *nt = new thr_entry(meth,p); if(thrtail) thrtail->nxt = nt; else thrhead = nt; thrtail = nt; + tlmutex.Unlock(); + +// post("signal helper"); + // signal thread helper thrhelpcond->Signal(); - tlmutex.Unlock(); return true; } bool flext_base::ShouldExit() const { + bool ret = true; + + tlmutex.Lock(); for(thr_entry *ti = thrhead; ti; ti = ti->nxt) - if(ti->Is()) return ti->shouldexit; + if(ti->Is()) { ret = ti->shouldexit; break; } + tlmutex.Unlock(); // thread was not found -> EXIT!!! - return true; + return ret; } -bool flext_base::PushThread() +bool flext::PushThread() { // set priority of newly created thread one point below the system thread's RelPriority(-1); return true; } -void flext_base::PopThread() +void flext::PopThread() { tlmutex.Lock(); @@ -190,7 +222,7 @@ void flext_base::PopThread() } else { #ifdef FLEXT_DEBUG - post("%s - INTERNAL ERROR: Thread not found!",thisName()); + post("flext - INTERNAL ERROR: Thread not found!"); #endif } @@ -198,46 +230,70 @@ void flext_base::PopThread() } //! Terminate all object threads -void flext_base::TermThreads() +bool flext_base::StopThreads() { thr_entry *t; // signal termination - for(t = thrhead; t; t = t->nxt) - if(t->th == this) t->shouldexit = true; + tlmutex.Lock(); + for(t = thrhead; t; t = t->nxt) { + if(t->This() == this) t->shouldexit = true; + } + tlmutex.Unlock(); - // TODO: maybe there should be a thread conditional for every thread so that it can be signaled + // TODO: maybe there should be a thread conditional for every thread so that it can be signalled efficiently - // wait for thread termination + // wait for thread termination (1 second max.) + int cnt; for(int wi = 0; wi < 100; ++wi) { - int cnt = 0; + cnt = 0; + tlmutex.Lock(); for(t = thrhead; t; t = t->nxt) - if(t->th == this) ++cnt; + if(t->This() == this) ++cnt; + tlmutex.Unlock(); if(!cnt) break; Sleep(0.01f); } - // --- all object threads have terminated by now ------- + if(cnt) { +#ifdef FLEXT_DEBUG + post("flext - doing hard thread termination"); +#endif + + // --- all object threads have terminated by now ------- - qmutex.Lock(); // Lock message queue - tlmutex.Lock(); + qmutex.Lock(); // Lock message queue + tlmutex.Lock(); - // timeout -> hard termination - for(t = thrhead; t; ) - if(t->th == this) { - if(pthread_cancel(t->thrid)) post("%s - Thread could not be terminated!",thisName()); - thr_entry *tn = t->nxt; - t->nxt = NULL; delete t; - t = tn; - } - else t = t->nxt; + // timeout -> hard termination + for(t = thrhead; t; ) + if(t->This() == this) { + #if FLEXT_THREADS == FLEXT_THR_POSIX + if(pthread_cancel(t->thrid)) post("%s - Thread could not be terminated!",thisName()); + #elif FLEXT_THREADS == FLEXT_THR_MP + MPTerminateTask(t->thrid,0); + // here, we should use a task queue to check whether the task has really terminated!! + #else + #error + #endif + thr_entry *tn = t->nxt; + t->nxt = NULL; delete t; + t = tn; + } + else t = t->nxt; - tlmutex.Unlock(); - qmutex.Unlock(); + tlmutex.Unlock(); + qmutex.Unlock(); + } + +// post("All threads have terminated"); + + return true; } bool flext::RelPriority(int dp,thrid_t ref,thrid_t id) { +#if FLEXT_THREADS == FLEXT_THR_POSIX sched_param parm; int policy; if(pthread_getschedparam(ref,&policy,&parm) < 0) { @@ -274,11 +330,39 @@ bool flext::RelPriority(int dp,thrid_t ref,thrid_t id) } } return true; +#elif FLEXT_THREADS == FLEXT_THR_MP + thr_entry *t; + for(t = thrhead; t && t->Id() != id; t = t->nxt) {} + if(t) { + // thread found in list + int w = GetPriority(id); + if(dp < 0) w /= 1<<(-dp); + else w *= 1< 10000) { + #ifdef FLEXT_DEBUG + post("flext - maximum thread priority reached"); + #endif + w = 10000; + } + t->weight = w; + return MPSetTaskWeight(id,w) == noErr; + } + else return false; +#else +#error +#endif } int flext::GetPriority(thrid_t id) { +#if FLEXT_THREADS == FLEXT_THR_POSIX sched_param parm; int policy; if(pthread_getschedparam(id,&policy,&parm) < 0) { @@ -288,11 +372,19 @@ int flext::GetPriority(thrid_t id) return -1; } return parm.sched_priority; +#elif FLEXT_THREADS == FLEXT_THR_MP + thr_entry *t; + for(t = thrhead; t && t->Id() != id; t = t->nxt) {} + return t?t->weight:-1; +#else +#error +#endif } bool flext::SetPriority(int p,thrid_t id) { +#if FLEXT_THREADS == FLEXT_THR_POSIX sched_param parm; int policy; if(pthread_getschedparam(id,&policy,&parm) < 0) { @@ -311,12 +403,34 @@ bool flext::SetPriority(int p,thrid_t id) } } return true; +#elif FLEXT_THREADS == FLEXT_THR_MP + thr_entry *t; + for(t = thrhead; t && t->Id() != id; t = t->nxt) {} + if(t) + return MPSetTaskWeight(id,t->weight = p) == noErr; + else + return false; +#else +#error +#endif } -flext_base::thr_params::thr_params(flext_base *c,int n): cl(c),var(new _data[n]) {} + +flext_base::thr_params::thr_params(int n): cl(NULL),var(new _data[n]) {} flext_base::thr_params::~thr_params() { if(var) delete[] var; } void flext_base::thr_params::set_any(const t_symbol *s,int argc,const t_atom *argv) { var[0]._any.args = new AtomAnything(s,argc,argv); } void flext_base::thr_params::set_list(int argc,const t_atom *argv) { var[0]._list.args = new AtomList(argc,argv); } + +flext_base::thr_entry::thr_entry(void (*m)(thr_params *),thr_params *p,thrid_t id): + th(p?p->cl:NULL),meth(m),params(p),thrid(id), + active(false),shouldexit(false), +#if FLEXT_THREADS == FLEXT_THR_MP + weight(100), // MP default weight +#endif + nxt(NULL) +{} + + #endif // FLEXT_THREADS -- cgit v1.2.1