the semi-resident thread pool

Robert Fraser fraserofthenight at gmail.com
Sat May 30 00:45:19 PDT 2009


zsxxsz wrote:
> Hi, I written one thread pool in which each thread is semi-resident. The
> thread-pool is different from the Tango's one. Any thread of the thread-pool
> will exit when it is idle for the timeout. That is to say, all threads for
> jobs, and no job no thread. The thread-pool was from my C version. With D, I
> wrote it more easier with the delegate function.
> Below is the source code:
> 
> module adl.thread_pool;
> 
> import core.sys.posix.pthread;  // just for pthread_self()
> import core.thread;
> import core.sync.mutex;
> import core.sync.condition;
> import std.c.time;
> 
> private struct Job
> {
>         Job *next;
>         void function() fn;
>         void delegate() dg;
>         void *arg;
>         int   call;
> }
> 
> /**
>  * semi-daemon thread of thread pool
>  */
> class CThreadPool
> {
> public:
>         /**
>          * Constructs a CThreadPool
>          * @param nMaxThread {int} the max number threads in thread pool
>          * @param idleTimeout {int} when > 0, the idle thread will
>          *  exit after idleTimeout seconds, if == 0, the idle thread
>          *  will not exit
>          * @param sz {size_t} when > 0, the thread will be created which
>          *  stack size is sz.
>          */
>         this(int nMaxThread, int idleTimeout, size_t sz = 0)
>         {
>                 m_nMaxThread = nMaxThread;
>                 m_idleTimeout = idleTimeout;
>                 m_stackSize = sz;
>                 m_mutex = new Mutex;
>                 m_cond = new Condition(m_mutex);
>         }
> 
>         /**
>          * Append one task into the thread pool's task queue
>          * @param fn {void function()}
>          */
>         void append(void function() fn)
>         {
>                 Job *job;
>                 char  buf[256];
> 
>                 if (fn == null)
>                         throw new Exception("fn null");
> 
>                 job = new Job;
>                 job.fn = fn;
>                 job.next = null;
>                 job.call = Call.FN;
> 
>                 m_mutex.lock();
>                 append(job);
>                 m_mutex.unlock();
>         }
> 
>         /**
>          * Append one task into the thread pool's task queue
>          * @param dg {void delegate()}
>          */
>         void append(void delegate() dg)
>         {
>                 Job *job;
>                 char  buf[256];
> 
>                 if (dg == null)
>                         throw new Exception("dg null");
> 
>                 job = new Job;
>                 job.dg = dg;
>                 job.next = null;
>                 job.call = Call.DG;
> 
>                 m_mutex.lock();
>                 append(job);
>                 m_mutex.unlock();
>         }
> 
>         /**
>          * If dg not null, when one new thread is created, dg will be called.
>          * @param dg {void delegate()}
>          */
>         void onThreadInit(void delegate() dg)
>         {
>                 m_onThreadInit = dg;
>         }
> 
>         /**
>          * If dg not null, before one thread exits, db will be called.
>          * @param dg {void delegate()}
>          */
>         void onThreadExit(void delegate() dg)
>         {
>                 m_onThreadExit = dg;
>         }
> 
> private:
>         enum Call { NO, FN, DG }
> 
>         Mutex m_mutex;
>         Condition m_cond;
>         size_t m_stackSize = 0;
> 
>         Job* m_jobHead = null, m_jobTail = null;
>         int m_nJob = 0;
>         bool m_isQuit = false;
>         int m_nThread = 0;
>         int m_nMaxThread;
>         int m_nIdleThread = 0;
>         int m_overloadTimeWait = 0;
>         int m_idleTimeout;
>         time_t m_lastWarn;
> 
>         void delegate() m_onThreadInit;
>         void delegate() m_onThreadExit;
>         void append(Job *job)
>         {
>                 if (m_jobHead == null)
>                         m_jobHead = job;
>                 else
>                         m_jobTail.next = job;
>                 m_jobTail = job;
> 
>                 m_nJob++;
> 
>                 if (m_nIdleThread > 0) {
>                         m_cond.notify();
>                 } else if (m_nThread < m_nMaxThread) {
>                         Thread thread = new Thread(&doJob);
>                         thread.isDaemon = true;
>                         thread.start();
>                         m_nThread++;
>                 } else if (m_nJob > 10 * m_nMaxThread) {
>                         time_t now = time(null);
>                         if (now - m_lastWarn >= 2) {
>                                 m_lastWarn = now;
>                         }
>                         if (m_overloadTimeWait > 0) {
>                                 Thread.sleep(m_overloadTimeWait);
>                         }
>                 }
>         }
>         void doJob()
>         {
>                 Job *job;
>                 int status;
>                 bool timedout;
>                 long period = m_idleTimeout * 10_000_000;
> 
>                 if (m_onThreadInit != null)
>                         m_onThreadInit();
> 
>                 m_mutex.lock();
>                 for (;;) {
>                         timedout = false;
> 
>                         while (m_jobHead == null && !m_isQuit) {
>                                 m_nIdleThread++;
>                                 if (period > 0) {
>                                         try {
>                                                 if (m_cond.wait(period) ==
> false) {
>                                                         timedout = true;
>                                                         break;
>                                                 }
>                                         } catch (SyncException e) {
>                                                 m_nIdleThread--;
>                                                 m_nThread--;
>                                                 m_mutex.unlock();
>                                                 if (m_onThreadExit != null)
>                                                         m_onThreadExit();
>                                                 throw e;
>                                         }
>                                 } else {
>                                         m_cond.wait();
>                                 }
>                                 m_nIdleThread--;
>                         }  /* end while */
>                         job = m_jobHead;
> 
>                         if (job != null) {
>                                 m_jobHead = job.next;
>                                 m_nJob--;
>                                 if (m_jobTail == job)
>                                         m_jobTail = null;
>                                 /* the lock shuld be unlocked before enter
> working processs */
>                                 m_mutex.unlock();
>                                 switch (job.call) {
>                                 case Call.FN:
>                                         job.fn();
>                                         break;
>                                 case Call.DG:
>                                         job.dg();
>                                         break;
>                                 default:
>                                         break;
>                                 }
> 
>                                 /* lock again */
>                                 m_mutex.lock();
>                         }
>                         if (m_jobHead == null && m_isQuit) {
>                                 m_nThread--;
>                                 if (m_nThread == 0)
>                                         m_cond.notifyAll();
>                                 break;
>                         }
>                         if (m_jobHead == null && timedout) {
>                                 m_nThread--;
>                                 break;
>                         }
>                 }
> 
>                 m_mutex.unlock();
> 
>                 writefln("Thread(%d) of ThreadPool exit now", pthread_self());
>                 if (m_onThreadExit != null)
>                         m_onThreadExit();
>         }
> }
> 
> import std.stdio;
> unittest
> {
>         CThreadPool pool = new CThreadPool(10, 10);
> 
>         void testThreadInit(string s)
>         {
>                 void onThreadInit()
>                 {
>                         writefln("thread(%d) was created now, s: %s",
> pthread_self(), s);
>                 }
>                 pool.onThreadInit(&onThreadInit);
>         }
> 
>         void testThreadExit(string s)
>         {
>                 void onThreadExit()
>                 {
>                         writefln("thread(%d) was to exit now, s: %s",
> pthread_self(), s);
>                 }
>                 pool.onThreadExit(&onThreadExit);
>         }
> 
>         void testAddJobs(string s)
>         {
>                 void threadFun()
>                 {
>                         writef("doJob thread id: %d, str: %s\n",
> pthread_self(), s);
>                         Thread.sleep(10_000_000);
>                         writef("doJob thread id: %d, wakeup now\n",
> pthread_self());
>                 }
>                 pool.append(&threadFun);
>                 pool.append(&threadFun);
>                 pool.append(&threadFun);
>         }
> 
>         string s = "hello world";
>         string s1 = "new thread was ok now";
>         string s2 = "thread exited now";
>         testThreadInit(s1);
>         testThreadExit(s2);
> 
>         testAddJobs(s);
> 
>         Thread.sleep(100_000_000);
> }

Sweet! Does the code want a license?


More information about the Digitalmars-d-announce mailing list