the semi-resident thread pool
Robert Fraser
fraserofthenight at gmail.com
Sat May 30 00:45:19 PDT 2009
zsxxsz wrote:
> Hi, I written one thread pool in which each thread is semi-resident. The
> thread-pool is different from the Tango's one. Any thread of the thread-pool
> will exit when it is idle for the timeout. That is to say, all threads for
> jobs, and no job no thread. The thread-pool was from my C version. With D, I
> wrote it more easier with the delegate function.
> Below is the source code:
>
> module adl.thread_pool;
>
> import core.sys.posix.pthread; // just for pthread_self()
> import core.thread;
> import core.sync.mutex;
> import core.sync.condition;
> import std.c.time;
>
> private struct Job
> {
> Job *next;
> void function() fn;
> void delegate() dg;
> void *arg;
> int call;
> }
>
> /**
> * semi-daemon thread of thread pool
> */
> class CThreadPool
> {
> public:
> /**
> * Constructs a CThreadPool
> * @param nMaxThread {int} the max number threads in thread pool
> * @param idleTimeout {int} when > 0, the idle thread will
> * exit after idleTimeout seconds, if == 0, the idle thread
> * will not exit
> * @param sz {size_t} when > 0, the thread will be created which
> * stack size is sz.
> */
> this(int nMaxThread, int idleTimeout, size_t sz = 0)
> {
> m_nMaxThread = nMaxThread;
> m_idleTimeout = idleTimeout;
> m_stackSize = sz;
> m_mutex = new Mutex;
> m_cond = new Condition(m_mutex);
> }
>
> /**
> * Append one task into the thread pool's task queue
> * @param fn {void function()}
> */
> void append(void function() fn)
> {
> Job *job;
> char buf[256];
>
> if (fn == null)
> throw new Exception("fn null");
>
> job = new Job;
> job.fn = fn;
> job.next = null;
> job.call = Call.FN;
>
> m_mutex.lock();
> append(job);
> m_mutex.unlock();
> }
>
> /**
> * Append one task into the thread pool's task queue
> * @param dg {void delegate()}
> */
> void append(void delegate() dg)
> {
> Job *job;
> char buf[256];
>
> if (dg == null)
> throw new Exception("dg null");
>
> job = new Job;
> job.dg = dg;
> job.next = null;
> job.call = Call.DG;
>
> m_mutex.lock();
> append(job);
> m_mutex.unlock();
> }
>
> /**
> * If dg not null, when one new thread is created, dg will be called.
> * @param dg {void delegate()}
> */
> void onThreadInit(void delegate() dg)
> {
> m_onThreadInit = dg;
> }
>
> /**
> * If dg not null, before one thread exits, db will be called.
> * @param dg {void delegate()}
> */
> void onThreadExit(void delegate() dg)
> {
> m_onThreadExit = dg;
> }
>
> private:
> enum Call { NO, FN, DG }
>
> Mutex m_mutex;
> Condition m_cond;
> size_t m_stackSize = 0;
>
> Job* m_jobHead = null, m_jobTail = null;
> int m_nJob = 0;
> bool m_isQuit = false;
> int m_nThread = 0;
> int m_nMaxThread;
> int m_nIdleThread = 0;
> int m_overloadTimeWait = 0;
> int m_idleTimeout;
> time_t m_lastWarn;
>
> void delegate() m_onThreadInit;
> void delegate() m_onThreadExit;
> void append(Job *job)
> {
> if (m_jobHead == null)
> m_jobHead = job;
> else
> m_jobTail.next = job;
> m_jobTail = job;
>
> m_nJob++;
>
> if (m_nIdleThread > 0) {
> m_cond.notify();
> } else if (m_nThread < m_nMaxThread) {
> Thread thread = new Thread(&doJob);
> thread.isDaemon = true;
> thread.start();
> m_nThread++;
> } else if (m_nJob > 10 * m_nMaxThread) {
> time_t now = time(null);
> if (now - m_lastWarn >= 2) {
> m_lastWarn = now;
> }
> if (m_overloadTimeWait > 0) {
> Thread.sleep(m_overloadTimeWait);
> }
> }
> }
> void doJob()
> {
> Job *job;
> int status;
> bool timedout;
> long period = m_idleTimeout * 10_000_000;
>
> if (m_onThreadInit != null)
> m_onThreadInit();
>
> m_mutex.lock();
> for (;;) {
> timedout = false;
>
> while (m_jobHead == null && !m_isQuit) {
> m_nIdleThread++;
> if (period > 0) {
> try {
> if (m_cond.wait(period) ==
> false) {
> timedout = true;
> break;
> }
> } catch (SyncException e) {
> m_nIdleThread--;
> m_nThread--;
> m_mutex.unlock();
> if (m_onThreadExit != null)
> m_onThreadExit();
> throw e;
> }
> } else {
> m_cond.wait();
> }
> m_nIdleThread--;
> } /* end while */
> job = m_jobHead;
>
> if (job != null) {
> m_jobHead = job.next;
> m_nJob--;
> if (m_jobTail == job)
> m_jobTail = null;
> /* the lock shuld be unlocked before enter
> working processs */
> m_mutex.unlock();
> switch (job.call) {
> case Call.FN:
> job.fn();
> break;
> case Call.DG:
> job.dg();
> break;
> default:
> break;
> }
>
> /* lock again */
> m_mutex.lock();
> }
> if (m_jobHead == null && m_isQuit) {
> m_nThread--;
> if (m_nThread == 0)
> m_cond.notifyAll();
> break;
> }
> if (m_jobHead == null && timedout) {
> m_nThread--;
> break;
> }
> }
>
> m_mutex.unlock();
>
> writefln("Thread(%d) of ThreadPool exit now", pthread_self());
> if (m_onThreadExit != null)
> m_onThreadExit();
> }
> }
>
> import std.stdio;
> unittest
> {
> CThreadPool pool = new CThreadPool(10, 10);
>
> void testThreadInit(string s)
> {
> void onThreadInit()
> {
> writefln("thread(%d) was created now, s: %s",
> pthread_self(), s);
> }
> pool.onThreadInit(&onThreadInit);
> }
>
> void testThreadExit(string s)
> {
> void onThreadExit()
> {
> writefln("thread(%d) was to exit now, s: %s",
> pthread_self(), s);
> }
> pool.onThreadExit(&onThreadExit);
> }
>
> void testAddJobs(string s)
> {
> void threadFun()
> {
> writef("doJob thread id: %d, str: %s\n",
> pthread_self(), s);
> Thread.sleep(10_000_000);
> writef("doJob thread id: %d, wakeup now\n",
> pthread_self());
> }
> pool.append(&threadFun);
> pool.append(&threadFun);
> pool.append(&threadFun);
> }
>
> string s = "hello world";
> string s1 = "new thread was ok now";
> string s2 = "thread exited now";
> testThreadInit(s1);
> testThreadExit(s2);
>
> testAddJobs(s);
>
> Thread.sleep(100_000_000);
> }
Sweet! Does the code want a license?
More information about the Digitalmars-d-announce
mailing list