aboutsummaryrefslogtreecommitdiff
path: root/externals/grill/flext/source/flthr.cpp
diff options
context:
space:
mode:
authorThomas Grill <xovo@users.sourceforge.net>2005-03-12 04:56:36 +0000
committerThomas Grill <xovo@users.sourceforge.net>2005-03-12 04:56:36 +0000
commit42a820099a1c03b6f0a84e4f16cb983c69309aa5 (patch)
treec466e6d1fb56662f5dadcb4cc7f24605ffdcefae /externals/grill/flext/source/flthr.cpp
parent27da08004c5f024a0f35e3cb4c2b3942548ebcfa (diff)
macros for 64-bit architectures
fixes for icc@linux fixes for attribute editor (to deal with large dialogs) fix lines on box text change fix for gcc strangeness no more static assignment of symbols (problems with Metrowerks) fixed bugs in SIMD code for non-power-of-2 lengths fixed shared library versioning lock-free thread management don't depend on ldconfig begin in the system path (should be in /sbin/..) fixed flext::Timer::At method new lock-free lifo and fifo svn path=/trunk/; revision=2619
Diffstat (limited to 'externals/grill/flext/source/flthr.cpp')
-rw-r--r--externals/grill/flext/source/flthr.cpp484
1 files changed, 247 insertions, 237 deletions
diff --git a/externals/grill/flext/source/flthr.cpp b/externals/grill/flext/source/flthr.cpp
index 0750dd83..47c64354 100644
--- a/externals/grill/flext/source/flthr.cpp
+++ b/externals/grill/flext/source/flthr.cpp
@@ -13,10 +13,15 @@ WARRANTIES, see the file, "license.txt," in this distribution.
*/
#include "flext.h"
-#include "flinternal.h"
#ifdef FLEXT_THREADS
+// maximum wait time for threads to finish (in ms)
+#define MAXIMUMWAIT 100
+
+
+#include "flinternal.h"
+#include "flcontainers.h"
#include <time.h>
@@ -31,7 +36,6 @@ WARRANTIES, see the file, "license.txt," in this distribution.
#error WIN32 threads need Windows SDK version >= 0x500
#endif
-
#include <errno.h>
//! Thread id of system thread
@@ -40,11 +44,61 @@ flext::thrid_t flext::thrid = 0;
//! Thread id of helper thread
flext::thrid_t flext::thrhelpid = 0;
-static flext::thr_entry *thrhead = NULL,*thrtail = NULL;
-static flext::ThrMutex tlmutex;
-//! Helper thread should terminate
-static bool thrhelpexit = false;
+//! \brief This represents an entry to the list of active method threads
+class thr_entry
+ : public flext
+ , public Fifo::Cell
+{
+public:
+ void thr_entry::Set(void (*m)(thr_params *),thr_params *p,thrid_t id = GetThreadId())
+ {
+ th = p?p->cl:NULL;
+ meth = m,params = p,thrid = id;
+ shouldexit = false;
+#if FLEXT_THREADS == FLEXT_THR_MP
+ weight = 100; // MP default weight
+#endif
+ }
+
+ //! \brief Check if this class represents the current thread
+ bool Is(thrid_t id = GetThreadId()) const { return IsThread(thrid,id); }
+
+ FLEXT_CLASSDEF(flext_base) *This() const { return th; }
+ thrid_t Id() const { return thrid; }
+
+ FLEXT_CLASSDEF(flext_base) *th;
+ void (*meth)(thr_params *);
+ thr_params *params;
+ thrid_t thrid;
+ bool shouldexit;
+#if FLEXT_THREADS == FLEXT_THR_MP
+ int weight;
+#endif
+};
+
+template<class T>
+class ThrFinder:
+ public T
+{
+public:
+ void Push(thr_entry *e) { T::Push(e); }
+ thr_entry *Pop() { return T::Pop(); }
+
+ thr_entry *Find(flext::thrid_t id,bool pop = false)
+ {
+ TypedLifo<thr_entry> qutmp;
+ thr_entry *fnd;
+ while((fnd = Pop()) && !fnd->Is(id)) qutmp.Push(fnd);
+ // put back entries
+ for(thr_entry *ti; ti = qutmp.Pop(); ) Push(ti);
+ if(fnd && !pop) Push(fnd);
+ return fnd;
+ }
+};
+
+static ThrFinder< PooledLifo<thr_entry,1,10> > thrpending;
+static ThrFinder< TypedLifo<thr_entry> > thractive,thrstopped;
//! Helper thread conditional
static flext::ThrCond *thrhelpcond = NULL;
@@ -53,12 +107,13 @@ static flext::ThrCond *thrhelpcond = NULL;
flext::thrid_t flext::GetSysThreadId() { return thrid; }
-void flext::LaunchHelper(thr_entry *e)
+static void LaunchHelper(thr_entry *e)
{
- e->thrid = GetThreadId();
+ e->thrid = flext::GetThreadId();
e->meth(e->params);
}
+
//! Start helper thread
bool flext::StartHelper()
{
@@ -76,7 +131,6 @@ bool flext::StartHelper()
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
- thrhelpexit = false;
ok = pthread_create (&thrhelpid,&attr,(void *(*)(void *))ThrHelper,NULL) == 0;
#elif FLEXT_THREADS == FLEXT_THR_MP
if(!MPLibraryIsLoaded())
@@ -105,17 +159,6 @@ bool flext::StartHelper()
return ok;
}
-#if 0
-/*! \brief Stop helper thread
- \note Never called!
-*/
-bool flext::StopHelper()
-{
- thrhelpexit = true;
- if(thrhelpcond) thrhelpcond->Signal();
-}
-#endif
-
//! Static helper thread function
void flext::ThrHelper(void *)
{
@@ -128,8 +171,6 @@ void flext::ThrHelper(void *)
pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
#endif
-// post("Helper thread started");
-
// set thread priority one point below normal
// so thread construction won't disturb real-time audio
RelPriority(-1);
@@ -138,156 +179,151 @@ void flext::ThrHelper(void *)
// helper loop
for(;;) {
-// post("Helper loop");
-
thrhelpcond->Wait();
- if(thrhelpexit) break;
-
-// post("Helper signalled");
- tlmutex.Lock();
-// post("Helper after lock");
-
- // start all inactive threads in queue
- thr_entry *prv = NULL,*ti;
- for(ti = thrhead; ti; prv = ti,ti = ti->nxt) {
- if(!ti->active) {
- bool ok;
-
-// post("Helper start thread");
-#if FLEXT_THREADS == FLEXT_THR_POSIX
- thrid_t dummy;
- ok = pthread_create (&dummy,&attr,(void *(*)(void *))LaunchHelper,ti) == 0;
-#elif FLEXT_THREADS == FLEXT_THR_MP
- thrid_t dummy;
- ok = MPCreateTask((TaskProc)LaunchHelper,ti,0,0,0,0,0,&dummy) == noErr;
-#elif FLEXT_THREADS == FLEXT_THR_WIN32
- ok = _beginthread((void (*)(void *))LaunchHelper,0,ti) >= 0;
-#else
-#error
-#endif
- if(!ok) {
- error("flext - Could not launch thread!");
-
- // delete from queue
- if(prv)
- prv->nxt = ti->nxt;
- else
- thrhead = ti->nxt;
- if(thrtail == ti) thrtail = prv;
-
- ti->nxt = NULL;
- delete ti;
- }
- else {
-// post("Helper thread ok");
-
- // set active flag
- ti->active = true;
- }
- }
- }
-
- tlmutex.Unlock();
+ // start all inactive threads
+ thr_entry *ti;
+ while(ti = thrpending.Pop()) {
+ bool ok;
+
+ #if FLEXT_THREADS == FLEXT_THR_POSIX
+ thrid_t dummy;
+ ok = pthread_create (&dummy,&attr,(void *(*)(void *))LaunchHelper,ti) == 0;
+ #elif FLEXT_THREADS == FLEXT_THR_MP
+ thrid_t dummy;
+ ok = MPCreateTask((TaskProc)LaunchHelper,ti,0,0,0,0,0,&dummy) == noErr;
+ #elif FLEXT_THREADS == FLEXT_THR_WIN32
+ ok = _beginthread((void (*)(void *))LaunchHelper,0,ti) >= 0;
+ #else
+ #error
+ #endif
+ if(!ok) {
+ error("flext - Could not launch thread!");
+ thrpending.Free(ti); ti = NULL;
+ }
+ else
+ // insert into queue of active threads
+ thractive.Push(ti);
+ }
}
+ FLEXT_ASSERT(false);
+/*
+ // Never reached!
+
delete thrhelpcond;
thrhelpcond = NULL;
#if FLEXT_THREADS == FLEXT_THR_POSIX
pthread_attr_destroy(&attr);
#endif
+*/
}
bool flext::LaunchThread(void (*meth)(thr_params *p),thr_params *p)
{
-#ifdef FLEXT_DEBUG
- if(!thrhelpcond) {
- ERRINTERNAL();
- return false;
- }
-#endif
-
-// post("make threads");
-
- tlmutex.Lock();
+ FLEXT_ASSERT(thrhelpcond);
// make an entry into thread list
- thr_entry *nt = new thr_entry(meth,p);
- if(thrtail) thrtail->nxt = nt;
- else thrhead = nt;
- thrtail = nt;
-
- tlmutex.Unlock();
-
-// post("signal helper");
-
+ thr_entry *e = thrpending.New();
+ e->Set(meth,p);
+ thrpending.Push(e);
// signal thread helper
thrhelpcond->Signal();
return true;
}
+static bool waitforstopped(TypedFifo<thr_entry> &qufnd,float wait = 0)
+{
+ TypedLifo<thr_entry> qutmp;
+
+ double until;
+ if(wait) until = flext::GetOSTime()+wait;
+
+ for(;;) {
+ thr_entry *fnd = qufnd.Get();
+ if(!fnd) break; // no more entries -> done!
+
+ thr_entry *ti;
+ // search for entry
+ while((ti = thrstopped.Pop()) != NULL && ti != fnd) qutmp.Push(ti);
+ // put back entries
+ while((ti = qutmp.Pop()) != NULL) thrstopped.Push(ti);
+
+ if(ti) {
+ // still in thrstopped queue
+ qufnd.Put(fnd);
+ // yield to other threads
+ flext::ThrYield();
+
+ if(wait && flext::GetOSTime() > until)
+ // not successful -> remaining thread are still in qufnd queue
+ return false;
+ }
+ }
+ return true;
+}
+
bool flext::StopThread(void (*meth)(thr_params *p),thr_params *p,bool wait)
{
-#ifdef FLEXT_DEBUG
- if(!thrhelpcond) {
- ERRINTERNAL();
- return false;
- }
-#endif
+ FLEXT_ASSERT(thrhelpcond);
- int found = 0;
-
- tlmutex.Lock();
- for(thr_entry *ti = thrhead; ti; ti = ti->nxt)
- // set shouldexit if meth and params match
- if(ti->meth == meth && ti->params == p) { ti->shouldexit = true; found++; }
- tlmutex.Unlock();
-
- if(found) {
- // signal thread helper
- thrhelpcond->Signal();
-#if 0
- // ######################
- // i don't think we need to wait for a single thread to stop
- // ######################
- int cnt = 0;
- for(int wi = 0; wi < 100; ++wi) {
- // lock and count this object's threads
- cnt = 0;
- tlmutex.Lock();
- for(thr_entry *t = thrhead; t; t = t->nxt)
- if(t->meth == meth && t->params == p) ++cnt;
- tlmutex.Unlock();
-
- // if no threads are remaining, we are done
- if(!cnt) break;
-
- // Wait
- Sleep(0.01f);
- }
+ TypedLifo<thr_entry> qutmp;
+ thr_entry *ti;
- return cnt == 0;
-#endif
- return true;
- }
- else
- return false;
+ // first search pending queue
+ // --------------------------
+
+ {
+ bool found = false;
+ while((ti = thrpending.Pop()) != NULL)
+ if(ti->meth == meth && ti->params == p) {
+ // found -> thread hasn't started -> just delete
+ thrpending.Free(ti);
+ found = true;
+ }
+ else
+ qutmp.Push(ti);
+
+ // put back into pending queue (order doesn't matter)
+ while((ti = qutmp.Pop()) != NULL) thrpending.Push(ti);
+
+ if(found) return true;
+ }
+
+ // now search active queue
+ // -----------------------
+
+ TypedFifo<thr_entry> qufnd;
+
+ while((ti = thractive.Pop()) != NULL)
+ if(ti->meth == meth && ti->params == p) {
+ thrstopped.Push(ti);
+ thrhelpcond->Signal();
+ qufnd.Put(ti);
+ }
+ else
+ qutmp.Push(ti);
+
+ // put back into pending queue (order doesn't matter)
+ while((ti = qutmp.Pop()) != NULL) thractive.Push(ti);
+
+ // wakeup helper thread
+ thrhelpcond->Signal();
+
+ // now wait for entries in qufnd to have vanished from thrstopped
+ if(wait)
+ return waitforstopped(qufnd);
+ else
+ return qufnd.Size() == 0;
}
bool flext_base::ShouldExit() const
{
- bool ret = true;
-
- tlmutex.Lock();
- for(thr_entry *ti = thrhead; ti; ti = ti->nxt)
- if(ti->Is()) { ret = ti->shouldexit; break; }
- tlmutex.Unlock();
-
- // thread was not found -> EXIT!!!
- return ret;
+ thr_entry *fnd = thrstopped.Find(GetThreadId());
+ return fnd != NULL;
}
bool flext::PushThread()
@@ -299,99 +335,87 @@ bool flext::PushThread()
void flext::PopThread()
{
- tlmutex.Lock();
-
-// post("Pop thread");
-
- thr_entry *prv = NULL,*ti;
- for(ti = thrhead; ti; prv = ti,ti = ti->nxt)
- if(ti->Is()) break;
+ thrid_t id = GetThreadId();
+ thr_entry *fnd = thrstopped.Find(id,true);
+ if(!fnd) fnd = thractive.Find(id,true);
- if(ti) {
- if(prv)
- prv->nxt = ti->nxt;
- else
- thrhead = ti->nxt;
- if(thrtail == ti) thrtail = prv;
-
- ti->nxt = NULL;
- delete ti;
- }
- else {
+ if(fnd)
+ thrpending.Free(fnd);
#ifdef FLEXT_DEBUG
+ else
post("flext - INTERNAL ERROR: Thread not found!");
#endif
- }
-
- tlmutex.Unlock();
}
//! Terminate all object threads
bool flext_base::StopThreads()
{
- thr_entry *t;
+ FLEXT_ASSERT(thrhelpcond);
- // signal termination for all object's threads
- tlmutex.Lock();
- for(t = thrhead; t; t = t->nxt) {
- if(t->This() == this) t->shouldexit = true;
- }
- tlmutex.Unlock();
+ TypedLifo<thr_entry> qutmp;
+ thr_entry *ti;
- // TODO: maybe there should be a thread conditional for every thread so that it can be signalled efficiently
+ // first search pending queue
+ // --------------------------
- // wait for thread termination (1 second max.)
- int cnt;
- for(int wi = 0; wi < 100; ++wi) {
- // lock and count this object's threads
- tlmutex.Lock();
- for(cnt = 0,t = thrhead; t; t = t->nxt)
- if(t->This() == this) ++cnt;
- tlmutex.Unlock();
+ bool found = false;
+ while((ti = thrpending.Pop()) != NULL)
+ if(ti->This() == this)
+ // found -> thread hasn't started -> just delete
+ thrpending.Free(ti);
+ else
+ qutmp.Push(ti);
- // if no threads are remaining, we are done
- if(!cnt) break;
+ // put back into pending queue (order doesn't matter)
+ while((ti = qutmp.Pop()) != NULL) thrpending.Push(ti);
- // Wait
- Sleep(0.01f);
- }
+ // now search active queue
+ // -----------------------
+
+ TypedFifo<thr_entry> qufnd;
+
+ while((ti = thractive.Pop()) != NULL)
+ if(ti->This() == this) {
+ thrstopped.Push(ti);
+ thrhelpcond->Signal();
+ qufnd.Put(ti);
+ }
+ else
+ qutmp.Push(ti);
- if(cnt) {
+ // put back into pending queue (order doesn't matter)
+ while((ti = qutmp.Pop()) != NULL) thractive.Push(ti);
+
+ // wakeup helper thread
+ thrhelpcond->Signal();
+
+ // now wait for entries in qufnd to have vanished from thrstopped
+ if(!waitforstopped(qufnd,MAXIMUMWAIT*0.001f)) {
#ifdef FLEXT_DEBUG
post("flext - doing hard thread termination");
#endif
-
- // --- all object threads have terminated by now -------
- tlmutex.Lock();
// timeout -> hard termination
- for(t = thrhead; t; )
- if(t->This() == this) {
- #if FLEXT_THREADS == FLEXT_THR_POSIX
- if(pthread_cancel(t->thrid)) post("%s - Thread could not be terminated!",thisName());
- #elif FLEXT_THREADS == FLEXT_THR_MP
- MPTerminateTask(t->thrid,0);
- // here, we should use a task queue to check whether the task has really terminated!!
- #elif FLEXT_THREADS == FLEXT_THR_WIN32
- // can't use the c library function _endthread.. memory leaks will occur
- HANDLE hnd = OpenThread(THREAD_ALL_ACCESS,TRUE,t->thrid);
- TerminateThread(hnd,0);
- #else
- #error
- #endif
- thr_entry *tn = t->nxt;
- t->nxt = NULL; delete t;
- t = tn;
- }
- else t = t->nxt;
- thrhead = NULL;
-
- tlmutex.Unlock();
+ while((ti = qufnd.Get()) != NULL) {
+#if FLEXT_THREADS == FLEXT_THR_POSIX
+ if(pthread_cancel(ti->thrid))
+ post("%s - Thread could not be terminated!",thisName());
+#elif FLEXT_THREADS == FLEXT_THR_MP
+ MPTerminateTask(ti->thrid,0);
+ // here, we should use a task queue to check whether the task has really terminated!!
+#elif FLEXT_THREADS == FLEXT_THR_WIN32
+ // can't use the c library function _endthread.. memory leaks will occur
+ HANDLE hnd = OpenThread(THREAD_ALL_ACCESS,TRUE,ti->thrid);
+ TerminateThread(hnd,0);
+#else
+#error Not implemented
+#endif
+ thrpending.Free(ti);
+ }
+ return false;
}
-
-// post("All threads have terminated");
-
- return true;
+ else
+ return true;
}
bool flext::RelPriority(int dp,thrid_t ref,thrid_t id)
@@ -469,9 +493,9 @@ bool flext::RelPriority(int dp,thrid_t ref,thrid_t id)
return true;
#elif FLEXT_THREADS == FLEXT_THR_MP
- thr_entry *t;
- for(t = thrhead; t && t->Id() != id; t = t->nxt) {}
- if(t) {
+ thr_entry *ti = thrpending.Find(id);
+ if(!ti) ti = thractive.Find(id);
+ if(ti) {
// thread found in list
int w = GetPriority(id);
if(dp < 0) w /= 1<<(-dp);
@@ -488,7 +512,7 @@ bool flext::RelPriority(int dp,thrid_t ref,thrid_t id)
#endif
w = 10000;
}
- t->weight = w;
+ ti->weight = w;
return MPSetTaskWeight(id,w) == noErr;
}
else return false;
@@ -524,9 +548,9 @@ int flext::GetPriority(thrid_t id)
return pr;
#elif FLEXT_THREADS == FLEXT_THR_MP
- thr_entry *t;
- for(t = thrhead; t && t->Id() != id; t = t->nxt) {}
- return t?t->weight:-1;
+ thr_entry *ti = thrpending.Find(id);
+ if(!ti) ti = thractive.Find(id);
+ return ti?ti->weight:-1;
#else
#error
#endif
@@ -566,12 +590,9 @@ bool flext::SetPriority(int p,thrid_t id)
return true;
#elif FLEXT_THREADS == FLEXT_THR_MP
- thr_entry *t;
- for(t = thrhead; t && t->Id() != id; t = t->nxt) {}
- if(t)
- return MPSetTaskWeight(id,t->weight = p) == noErr;
- else
- return false;
+ thr_entry *ti = thrpending.Find(id);
+ if(!ti) ti = thractive.Find(id);
+ return ti && MPSetTaskWeight(id,ti->weight = p) == noErr;
#else
#error
#endif
@@ -585,17 +606,6 @@ void flext_base::thr_params::set_any(const t_symbol *s,int argc,const t_atom *ar
void flext_base::thr_params::set_list(int argc,const t_atom *argv) { var[0]._list.args = new AtomList(argc,argv); }
-flext_base::thr_entry::thr_entry(void (*m)(thr_params *),thr_params *p,thrid_t id):
- th(p?p->cl:NULL),meth(m),params(p),thrid(id),
- active(false),shouldexit(false),
-#if FLEXT_THREADS == FLEXT_THR_MP
- weight(100), // MP default weight
-#endif
- nxt(NULL)
-{}
-
-
-
#if FLEXT_THREADS == FLEXT_THR_POSIX
bool flext::ThrCond::Wait() {
Lock();