aboutsummaryrefslogtreecommitdiff
path: root/externals/grill/flext/source/flthr.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'externals/grill/flext/source/flthr.cpp')
-rw-r--r--externals/grill/flext/source/flthr.cpp484
1 files changed, 247 insertions, 237 deletions
diff --git a/externals/grill/flext/source/flthr.cpp b/externals/grill/flext/source/flthr.cpp
index 0750dd83..47c64354 100644
--- a/externals/grill/flext/source/flthr.cpp
+++ b/externals/grill/flext/source/flthr.cpp
@@ -13,10 +13,15 @@ WARRANTIES, see the file, "license.txt," in this distribution.
*/
#include "flext.h"
-#include "flinternal.h"
#ifdef FLEXT_THREADS
+// maximum wait time for threads to finish (in ms)
+#define MAXIMUMWAIT 100
+
+
+#include "flinternal.h"
+#include "flcontainers.h"
#include <time.h>
@@ -31,7 +36,6 @@ WARRANTIES, see the file, "license.txt," in this distribution.
#error WIN32 threads need Windows SDK version >= 0x500
#endif
-
#include <errno.h>
//! Thread id of system thread
@@ -40,11 +44,61 @@ flext::thrid_t flext::thrid = 0;
//! Thread id of helper thread
flext::thrid_t flext::thrhelpid = 0;
-static flext::thr_entry *thrhead = NULL,*thrtail = NULL;
-static flext::ThrMutex tlmutex;
-//! Helper thread should terminate
-static bool thrhelpexit = false;
+//! \brief This represents an entry to the list of active method threads
+class thr_entry
+ : public flext
+ , public Fifo::Cell
+{
+public:
+ void thr_entry::Set(void (*m)(thr_params *),thr_params *p,thrid_t id = GetThreadId())
+ {
+ th = p?p->cl:NULL;
+ meth = m,params = p,thrid = id;
+ shouldexit = false;
+#if FLEXT_THREADS == FLEXT_THR_MP
+ weight = 100; // MP default weight
+#endif
+ }
+
+ //! \brief Check if this class represents the current thread
+ bool Is(thrid_t id = GetThreadId()) const { return IsThread(thrid,id); }
+
+ FLEXT_CLASSDEF(flext_base) *This() const { return th; }
+ thrid_t Id() const { return thrid; }
+
+ FLEXT_CLASSDEF(flext_base) *th;
+ void (*meth)(thr_params *);
+ thr_params *params;
+ thrid_t thrid;
+ bool shouldexit;
+#if FLEXT_THREADS == FLEXT_THR_MP
+ int weight;
+#endif
+};
+
+template<class T>
+class ThrFinder:
+ public T
+{
+public:
+ void Push(thr_entry *e) { T::Push(e); }
+ thr_entry *Pop() { return T::Pop(); }
+
+ thr_entry *Find(flext::thrid_t id,bool pop = false)
+ {
+ TypedLifo<thr_entry> qutmp;
+ thr_entry *fnd;
+ while((fnd = Pop()) && !fnd->Is(id)) qutmp.Push(fnd);
+ // put back entries
+ for(thr_entry *ti; ti = qutmp.Pop(); ) Push(ti);
+ if(fnd && !pop) Push(fnd);
+ return fnd;
+ }
+};
+
+static ThrFinder< PooledLifo<thr_entry,1,10> > thrpending;
+static ThrFinder< TypedLifo<thr_entry> > thractive,thrstopped;
//! Helper thread conditional
static flext::ThrCond *thrhelpcond = NULL;
@@ -53,12 +107,13 @@ static flext::ThrCond *thrhelpcond = NULL;
flext::thrid_t flext::GetSysThreadId() { return thrid; }
-void flext::LaunchHelper(thr_entry *e)
+static void LaunchHelper(thr_entry *e)
{
- e->thrid = GetThreadId();
+ e->thrid = flext::GetThreadId();
e->meth(e->params);
}
+
//! Start helper thread
bool flext::StartHelper()
{
@@ -76,7 +131,6 @@ bool flext::StartHelper()
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
- thrhelpexit = false;
ok = pthread_create (&thrhelpid,&attr,(void *(*)(void *))ThrHelper,NULL) == 0;
#elif FLEXT_THREADS == FLEXT_THR_MP
if(!MPLibraryIsLoaded())
@@ -105,17 +159,6 @@ bool flext::StartHelper()
return ok;
}
-#if 0
-/*! \brief Stop helper thread
- \note Never called!
-*/
-bool flext::StopHelper()
-{
- thrhelpexit = true;
- if(thrhelpcond) thrhelpcond->Signal();
-}
-#endif
-
//! Static helper thread function
void flext::ThrHelper(void *)
{
@@ -128,8 +171,6 @@ void flext::ThrHelper(void *)
pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
#endif
-// post("Helper thread started");
-
// set thread priority one point below normal
// so thread construction won't disturb real-time audio
RelPriority(-1);
@@ -138,156 +179,151 @@ void flext::ThrHelper(void *)
// helper loop
for(;;) {
-// post("Helper loop");
-
thrhelpcond->Wait();
- if(thrhelpexit) break;
-
-// post("Helper signalled");
- tlmutex.Lock();
-// post("Helper after lock");
-
- // start all inactive threads in queue
- thr_entry *prv = NULL,*ti;
- for(ti = thrhead; ti; prv = ti,ti = ti->nxt) {
- if(!ti->active) {
- bool ok;
-
-// post("Helper start thread");
-#if FLEXT_THREADS == FLEXT_THR_POSIX
- thrid_t dummy;
- ok = pthread_create (&dummy,&attr,(void *(*)(void *))LaunchHelper,ti) == 0;
-#elif FLEXT_THREADS == FLEXT_THR_MP
- thrid_t dummy;
- ok = MPCreateTask((TaskProc)LaunchHelper,ti,0,0,0,0,0,&dummy) == noErr;
-#elif FLEXT_THREADS == FLEXT_THR_WIN32
- ok = _beginthread((void (*)(void *))LaunchHelper,0,ti) >= 0;
-#else
-#error
-#endif
- if(!ok) {
- error("flext - Could not launch thread!");
-
- // delete from queue
- if(prv)
- prv->nxt = ti->nxt;
- else
- thrhead = ti->nxt;
- if(thrtail == ti) thrtail = prv;
-
- ti->nxt = NULL;
- delete ti;
- }
- else {
-// post("Helper thread ok");
-
- // set active flag
- ti->active = true;
- }
- }
- }
-
- tlmutex.Unlock();
+ // start all inactive threads
+ thr_entry *ti;
+ while(ti = thrpending.Pop()) {
+ bool ok;
+
+ #if FLEXT_THREADS == FLEXT_THR_POSIX
+ thrid_t dummy;
+ ok = pthread_create (&dummy,&attr,(void *(*)(void *))LaunchHelper,ti) == 0;
+ #elif FLEXT_THREADS == FLEXT_THR_MP
+ thrid_t dummy;
+ ok = MPCreateTask((TaskProc)LaunchHelper,ti,0,0,0,0,0,&dummy) == noErr;
+ #elif FLEXT_THREADS == FLEXT_THR_WIN32
+ ok = _beginthread((void (*)(void *))LaunchHelper,0,ti) >= 0;
+ #else
+ #error
+ #endif
+ if(!ok) {
+ error("flext - Could not launch thread!");
+ thrpending.Free(ti); ti = NULL;
+ }
+ else
+ // insert into queue of active threads
+ thractive.Push(ti);
+ }
}
+ FLEXT_ASSERT(false);
+/*
+ // Never reached!
+
delete thrhelpcond;
thrhelpcond = NULL;
#if FLEXT_THREADS == FLEXT_THR_POSIX
pthread_attr_destroy(&attr);
#endif
+*/
}
bool flext::LaunchThread(void (*meth)(thr_params *p),thr_params *p)
{
-#ifdef FLEXT_DEBUG
- if(!thrhelpcond) {
- ERRINTERNAL();
- return false;
- }
-#endif
-
-// post("make threads");
-
- tlmutex.Lock();
+ FLEXT_ASSERT(thrhelpcond);
// make an entry into thread list
- thr_entry *nt = new thr_entry(meth,p);
- if(thrtail) thrtail->nxt = nt;
- else thrhead = nt;
- thrtail = nt;
-
- tlmutex.Unlock();
-
-// post("signal helper");
-
+ thr_entry *e = thrpending.New();
+ e->Set(meth,p);
+ thrpending.Push(e);
// signal thread helper
thrhelpcond->Signal();
return true;
}
+static bool waitforstopped(TypedFifo<thr_entry> &qufnd,float wait = 0)
+{
+ TypedLifo<thr_entry> qutmp;
+
+ double until;
+ if(wait) until = flext::GetOSTime()+wait;
+
+ for(;;) {
+ thr_entry *fnd = qufnd.Get();
+ if(!fnd) break; // no more entries -> done!
+
+ thr_entry *ti;
+ // search for entry
+ while((ti = thrstopped.Pop()) != NULL && ti != fnd) qutmp.Push(ti);
+ // put back entries
+ while((ti = qutmp.Pop()) != NULL) thrstopped.Push(ti);
+
+ if(ti) {
+ // still in thrstopped queue
+ qufnd.Put(fnd);
+ // yield to other threads
+ flext::ThrYield();
+
+ if(wait && flext::GetOSTime() > until)
+ // not successful -> remaining thread are still in qufnd queue
+ return false;
+ }
+ }
+ return true;
+}
+
bool flext::StopThread(void (*meth)(thr_params *p),thr_params *p,bool wait)
{
-#ifdef FLEXT_DEBUG
- if(!thrhelpcond) {
- ERRINTERNAL();
- return false;
- }
-#endif
+ FLEXT_ASSERT(thrhelpcond);
- int found = 0;
-
- tlmutex.Lock();
- for(thr_entry *ti = thrhead; ti; ti = ti->nxt)
- // set shouldexit if meth and params match
- if(ti->meth == meth && ti->params == p) { ti->shouldexit = true; found++; }
- tlmutex.Unlock();
-
- if(found) {
- // signal thread helper
- thrhelpcond->Signal();
-#if 0
- // ######################
- // i don't think we need to wait for a single thread to stop
- // ######################
- int cnt = 0;
- for(int wi = 0; wi < 100; ++wi) {
- // lock and count this object's threads
- cnt = 0;
- tlmutex.Lock();
- for(thr_entry *t = thrhead; t; t = t->nxt)
- if(t->meth == meth && t->params == p) ++cnt;
- tlmutex.Unlock();
-
- // if no threads are remaining, we are done
- if(!cnt) break;
-
- // Wait
- Sleep(0.01f);
- }
+ TypedLifo<thr_entry> qutmp;
+ thr_entry *ti;
- return cnt == 0;
-#endif
- return true;
- }
- else
- return false;
+ // first search pending queue
+ // --------------------------
+
+ {
+ bool found = false;
+ while((ti = thrpending.Pop()) != NULL)
+ if(ti->meth == meth && ti->params == p) {
+ // found -> thread hasn't started -> just delete
+ thrpending.Free(ti);
+ found = true;
+ }
+ else
+ qutmp.Push(ti);
+
+ // put back into pending queue (order doesn't matter)
+ while((ti = qutmp.Pop()) != NULL) thrpending.Push(ti);
+
+ if(found) return true;
+ }
+
+ // now search active queue
+ // -----------------------
+
+ TypedFifo<thr_entry> qufnd;
+
+ while((ti = thractive.Pop()) != NULL)
+ if(ti->meth == meth && ti->params == p) {
+ thrstopped.Push(ti);
+ thrhelpcond->Signal();
+ qufnd.Put(ti);
+ }
+ else
+ qutmp.Push(ti);
+
+ // put back into pending queue (order doesn't matter)
+ while((ti = qutmp.Pop()) != NULL) thractive.Push(ti);
+
+ // wakeup helper thread
+ thrhelpcond->Signal();
+
+ // now wait for entries in qufnd to have vanished from thrstopped
+ if(wait)
+ return waitforstopped(qufnd);
+ else
+ return qufnd.Size() == 0;
}
bool flext_base::ShouldExit() const
{
- bool ret = true;
-
- tlmutex.Lock();
- for(thr_entry *ti = thrhead; ti; ti = ti->nxt)
- if(ti->Is()) { ret = ti->shouldexit; break; }
- tlmutex.Unlock();
-
- // thread was not found -> EXIT!!!
- return ret;
+ thr_entry *fnd = thrstopped.Find(GetThreadId());
+ return fnd != NULL;
}
bool flext::PushThread()
@@ -299,99 +335,87 @@ bool flext::PushThread()
void flext::PopThread()
{
- tlmutex.Lock();
-
-// post("Pop thread");
-
- thr_entry *prv = NULL,*ti;
- for(ti = thrhead; ti; prv = ti,ti = ti->nxt)
- if(ti->Is()) break;
+ thrid_t id = GetThreadId();
+ thr_entry *fnd = thrstopped.Find(id,true);
+ if(!fnd) fnd = thractive.Find(id,true);
- if(ti) {
- if(prv)
- prv->nxt = ti->nxt;
- else
- thrhead = ti->nxt;
- if(thrtail == ti) thrtail = prv;
-
- ti->nxt = NULL;
- delete ti;
- }
- else {
+ if(fnd)
+ thrpending.Free(fnd);
#ifdef FLEXT_DEBUG
+ else
post("flext - INTERNAL ERROR: Thread not found!");
#endif
- }
-
- tlmutex.Unlock();
}
//! Terminate all object threads
bool flext_base::StopThreads()
{
- thr_entry *t;
+ FLEXT_ASSERT(thrhelpcond);
- // signal termination for all object's threads
- tlmutex.Lock();
- for(t = thrhead; t; t = t->nxt) {
- if(t->This() == this) t->shouldexit = true;
- }
- tlmutex.Unlock();
+ TypedLifo<thr_entry> qutmp;
+ thr_entry *ti;
- // TODO: maybe there should be a thread conditional for every thread so that it can be signalled efficiently
+ // first search pending queue
+ // --------------------------
- // wait for thread termination (1 second max.)
- int cnt;
- for(int wi = 0; wi < 100; ++wi) {
- // lock and count this object's threads
- tlmutex.Lock();
- for(cnt = 0,t = thrhead; t; t = t->nxt)
- if(t->This() == this) ++cnt;
- tlmutex.Unlock();
+ bool found = false;
+ while((ti = thrpending.Pop()) != NULL)
+ if(ti->This() == this)
+ // found -> thread hasn't started -> just delete
+ thrpending.Free(ti);
+ else
+ qutmp.Push(ti);
- // if no threads are remaining, we are done
- if(!cnt) break;
+ // put back into pending queue (order doesn't matter)
+ while((ti = qutmp.Pop()) != NULL) thrpending.Push(ti);
- // Wait
- Sleep(0.01f);
- }
+ // now search active queue
+ // -----------------------
+
+ TypedFifo<thr_entry> qufnd;
+
+ while((ti = thractive.Pop()) != NULL)
+ if(ti->This() == this) {
+ thrstopped.Push(ti);
+ thrhelpcond->Signal();
+ qufnd.Put(ti);
+ }
+ else
+ qutmp.Push(ti);
- if(cnt) {
+ // put back into pending queue (order doesn't matter)
+ while((ti = qutmp.Pop()) != NULL) thractive.Push(ti);
+
+ // wakeup helper thread
+ thrhelpcond->Signal();
+
+ // now wait for entries in qufnd to have vanished from thrstopped
+ if(!waitforstopped(qufnd,MAXIMUMWAIT*0.001f)) {
#ifdef FLEXT_DEBUG
post("flext - doing hard thread termination");
#endif
-
- // --- all object threads have terminated by now -------
- tlmutex.Lock();
// timeout -> hard termination
- for(t = thrhead; t; )
- if(t->This() == this) {
- #if FLEXT_THREADS == FLEXT_THR_POSIX
- if(pthread_cancel(t->thrid)) post("%s - Thread could not be terminated!",thisName());
- #elif FLEXT_THREADS == FLEXT_THR_MP
- MPTerminateTask(t->thrid,0);
- // here, we should use a task queue to check whether the task has really terminated!!
- #elif FLEXT_THREADS == FLEXT_THR_WIN32
- // can't use the c library function _endthread.. memory leaks will occur
- HANDLE hnd = OpenThread(THREAD_ALL_ACCESS,TRUE,t->thrid);
- TerminateThread(hnd,0);
- #else
- #error
- #endif
- thr_entry *tn = t->nxt;
- t->nxt = NULL; delete t;
- t = tn;
- }
- else t = t->nxt;
- thrhead = NULL;
-
- tlmutex.Unlock();
+ while((ti = qufnd.Get()) != NULL) {
+#if FLEXT_THREADS == FLEXT_THR_POSIX
+ if(pthread_cancel(ti->thrid))
+ post("%s - Thread could not be terminated!",thisName());
+#elif FLEXT_THREADS == FLEXT_THR_MP
+ MPTerminateTask(ti->thrid,0);
+ // here, we should use a task queue to check whether the task has really terminated!!
+#elif FLEXT_THREADS == FLEXT_THR_WIN32
+ // can't use the c library function _endthread.. memory leaks will occur
+ HANDLE hnd = OpenThread(THREAD_ALL_ACCESS,TRUE,ti->thrid);
+ TerminateThread(hnd,0);
+#else
+#error Not implemented
+#endif
+ thrpending.Free(ti);
+ }
+ return false;
}
-
-// post("All threads have terminated");
-
- return true;
+ else
+ return true;
}
bool flext::RelPriority(int dp,thrid_t ref,thrid_t id)
@@ -469,9 +493,9 @@ bool flext::RelPriority(int dp,thrid_t ref,thrid_t id)
return true;
#elif FLEXT_THREADS == FLEXT_THR_MP
- thr_entry *t;
- for(t = thrhead; t && t->Id() != id; t = t->nxt) {}
- if(t) {
+ thr_entry *ti = thrpending.Find(id);
+ if(!ti) ti = thractive.Find(id);
+ if(ti) {
// thread found in list
int w = GetPriority(id);
if(dp < 0) w /= 1<<(-dp);
@@ -488,7 +512,7 @@ bool flext::RelPriority(int dp,thrid_t ref,thrid_t id)
#endif
w = 10000;
}
- t->weight = w;
+ ti->weight = w;
return MPSetTaskWeight(id,w) == noErr;
}
else return false;
@@ -524,9 +548,9 @@ int flext::GetPriority(thrid_t id)
return pr;
#elif FLEXT_THREADS == FLEXT_THR_MP
- thr_entry *t;
- for(t = thrhead; t && t->Id() != id; t = t->nxt) {}
- return t?t->weight:-1;
+ thr_entry *ti = thrpending.Find(id);
+ if(!ti) ti = thractive.Find(id);
+ return ti?ti->weight:-1;
#else
#error
#endif
@@ -566,12 +590,9 @@ bool flext::SetPriority(int p,thrid_t id)
return true;
#elif FLEXT_THREADS == FLEXT_THR_MP
- thr_entry *t;
- for(t = thrhead; t && t->Id() != id; t = t->nxt) {}
- if(t)
- return MPSetTaskWeight(id,t->weight = p) == noErr;
- else
- return false;
+ thr_entry *ti = thrpending.Find(id);
+ if(!ti) ti = thractive.Find(id);
+ return ti && MPSetTaskWeight(id,ti->weight = p) == noErr;
#else
#error
#endif
@@ -585,17 +606,6 @@ void flext_base::thr_params::set_any(const t_symbol *s,int argc,const t_atom *ar
void flext_base::thr_params::set_list(int argc,const t_atom *argv) { var[0]._list.args = new AtomList(argc,argv); }
-flext_base::thr_entry::thr_entry(void (*m)(thr_params *),thr_params *p,thrid_t id):
- th(p?p->cl:NULL),meth(m),params(p),thrid(id),
- active(false),shouldexit(false),
-#if FLEXT_THREADS == FLEXT_THR_MP
- weight(100), // MP default weight
-#endif
- nxt(NULL)
-{}
-
-
-
#if FLEXT_THREADS == FLEXT_THR_POSIX
bool flext::ThrCond::Wait() {
Lock();