the semi-resident thread pool

zsxxsz zhengshuxin at hexun.com
Fri May 29 21:14:08 PDT 2009


Hi, I written one thread pool in which each thread is semi-resident. The
thread-pool is different from the Tango's one. Any thread of the thread-pool
will exit when it is idle for the timeout. That is to say, all threads for
jobs, and no job no thread. The thread-pool was from my C version. With D, I
wrote it more easier with the delegate function.
Below is the source code:

module adl.thread_pool;

import core.sys.posix.pthread;  // just for pthread_self()
import core.thread;
import core.sync.mutex;
import core.sync.condition;
import std.c.time;

private struct Job
{
        Job *next;
        void function() fn;
        void delegate() dg;
        void *arg;
        int   call;
}

/**
 * semi-daemon thread of thread pool
 */
class CThreadPool
{
public:
        /**
         * Constructs a CThreadPool
         * @param nMaxThread {int} the max number threads in thread pool
         * @param idleTimeout {int} when > 0, the idle thread will
         *  exit after idleTimeout seconds, if == 0, the idle thread
         *  will not exit
         * @param sz {size_t} when > 0, the thread will be created which
         *  stack size is sz.
         */
        this(int nMaxThread, int idleTimeout, size_t sz = 0)
        {
                m_nMaxThread = nMaxThread;
                m_idleTimeout = idleTimeout;
                m_stackSize = sz;
                m_mutex = new Mutex;
                m_cond = new Condition(m_mutex);
        }

        /**
         * Append one task into the thread pool's task queue
         * @param fn {void function()}
         */
        void append(void function() fn)
        {
                Job *job;
                char  buf[256];

                if (fn == null)
                        throw new Exception("fn null");

                job = new Job;
                job.fn = fn;
                job.next = null;
                job.call = Call.FN;

                m_mutex.lock();
                append(job);
                m_mutex.unlock();
        }

        /**
         * Append one task into the thread pool's task queue
         * @param dg {void delegate()}
         */
        void append(void delegate() dg)
        {
                Job *job;
                char  buf[256];

                if (dg == null)
                        throw new Exception("dg null");

                job = new Job;
                job.dg = dg;
                job.next = null;
                job.call = Call.DG;

                m_mutex.lock();
                append(job);
                m_mutex.unlock();
        }

        /**
         * If dg not null, when one new thread is created, dg will be called.
         * @param dg {void delegate()}
         */
        void onThreadInit(void delegate() dg)
        {
                m_onThreadInit = dg;
        }

        /**
         * If dg not null, before one thread exits, db will be called.
         * @param dg {void delegate()}
         */
        void onThreadExit(void delegate() dg)
        {
                m_onThreadExit = dg;
        }

private:
        enum Call { NO, FN, DG }

        Mutex m_mutex;
        Condition m_cond;
        size_t m_stackSize = 0;

        Job* m_jobHead = null, m_jobTail = null;
        int m_nJob = 0;
        bool m_isQuit = false;
        int m_nThread = 0;
        int m_nMaxThread;
        int m_nIdleThread = 0;
        int m_overloadTimeWait = 0;
        int m_idleTimeout;
        time_t m_lastWarn;

        void delegate() m_onThreadInit;
        void delegate() m_onThreadExit;
        void append(Job *job)
        {
                if (m_jobHead == null)
                        m_jobHead = job;
                else
                        m_jobTail.next = job;
                m_jobTail = job;

                m_nJob++;

                if (m_nIdleThread > 0) {
                        m_cond.notify();
                } else if (m_nThread < m_nMaxThread) {
                        Thread thread = new Thread(&doJob);
                        thread.isDaemon = true;
                        thread.start();
                        m_nThread++;
                } else if (m_nJob > 10 * m_nMaxThread) {
                        time_t now = time(null);
                        if (now - m_lastWarn >= 2) {
                                m_lastWarn = now;
                        }
                        if (m_overloadTimeWait > 0) {
                                Thread.sleep(m_overloadTimeWait);
                        }
                }
        }
        void doJob()
        {
                Job *job;
                int status;
                bool timedout;
                long period = m_idleTimeout * 10_000_000;

                if (m_onThreadInit != null)
                        m_onThreadInit();

                m_mutex.lock();
                for (;;) {
                        timedout = false;

                        while (m_jobHead == null && !m_isQuit) {
                                m_nIdleThread++;
                                if (period > 0) {
                                        try {
                                                if (m_cond.wait(period) ==
false) {
                                                        timedout = true;
                                                        break;
                                                }
                                        } catch (SyncException e) {
                                                m_nIdleThread--;
                                                m_nThread--;
                                                m_mutex.unlock();
                                                if (m_onThreadExit != null)
                                                        m_onThreadExit();
                                                throw e;
                                        }
                                } else {
                                        m_cond.wait();
                                }
                                m_nIdleThread--;
                        }  /* end while */
                        job = m_jobHead;

                        if (job != null) {
                                m_jobHead = job.next;
                                m_nJob--;
                                if (m_jobTail == job)
                                        m_jobTail = null;
                                /* the lock shuld be unlocked before enter
working processs */
                                m_mutex.unlock();
                                switch (job.call) {
                                case Call.FN:
                                        job.fn();
                                        break;
                                case Call.DG:
                                        job.dg();
                                        break;
                                default:
                                        break;
                                }

                                /* lock again */
                                m_mutex.lock();
                        }
                        if (m_jobHead == null && m_isQuit) {
                                m_nThread--;
                                if (m_nThread == 0)
                                        m_cond.notifyAll();
                                break;
                        }
                        if (m_jobHead == null && timedout) {
                                m_nThread--;
                                break;
                        }
                }

                m_mutex.unlock();

                writefln("Thread(%d) of ThreadPool exit now", pthread_self());
                if (m_onThreadExit != null)
                        m_onThreadExit();
        }
}

import std.stdio;
unittest
{
        CThreadPool pool = new CThreadPool(10, 10);

        void testThreadInit(string s)
        {
                void onThreadInit()
                {
                        writefln("thread(%d) was created now, s: %s",
pthread_self(), s);
                }
                pool.onThreadInit(&onThreadInit);
        }

        void testThreadExit(string s)
        {
                void onThreadExit()
                {
                        writefln("thread(%d) was to exit now, s: %s",
pthread_self(), s);
                }
                pool.onThreadExit(&onThreadExit);
        }

        void testAddJobs(string s)
        {
                void threadFun()
                {
                        writef("doJob thread id: %d, str: %s\n",
pthread_self(), s);
                        Thread.sleep(10_000_000);
                        writef("doJob thread id: %d, wakeup now\n",
pthread_self());
                }
                pool.append(&threadFun);
                pool.append(&threadFun);
                pool.append(&threadFun);
        }

        string s = "hello world";
        string s1 = "new thread was ok now";
        string s2 = "thread exited now";
        testThreadInit(s1);
        testThreadExit(s2);

        testAddJobs(s);

        Thread.sleep(100_000_000);
}


More information about the Digitalmars-d-announce mailing list