the semi-resident thread pool
zsxxsz
zhengshuxin at hexun.com
Fri May 29 21:14:08 PDT 2009
Hi, I written one thread pool in which each thread is semi-resident. The
thread-pool is different from the Tango's one. Any thread of the thread-pool
will exit when it is idle for the timeout. That is to say, all threads for
jobs, and no job no thread. The thread-pool was from my C version. With D, I
wrote it more easier with the delegate function.
Below is the source code:
module adl.thread_pool;
import core.sys.posix.pthread; // just for pthread_self()
import core.thread;
import core.sync.mutex;
import core.sync.condition;
import std.c.time;
private struct Job
{
Job *next;
void function() fn;
void delegate() dg;
void *arg;
int call;
}
/**
* semi-daemon thread of thread pool
*/
class CThreadPool
{
public:
/**
* Constructs a CThreadPool
* @param nMaxThread {int} the max number threads in thread pool
* @param idleTimeout {int} when > 0, the idle thread will
* exit after idleTimeout seconds, if == 0, the idle thread
* will not exit
* @param sz {size_t} when > 0, the thread will be created which
* stack size is sz.
*/
this(int nMaxThread, int idleTimeout, size_t sz = 0)
{
m_nMaxThread = nMaxThread;
m_idleTimeout = idleTimeout;
m_stackSize = sz;
m_mutex = new Mutex;
m_cond = new Condition(m_mutex);
}
/**
* Append one task into the thread pool's task queue
* @param fn {void function()}
*/
void append(void function() fn)
{
Job *job;
char buf[256];
if (fn == null)
throw new Exception("fn null");
job = new Job;
job.fn = fn;
job.next = null;
job.call = Call.FN;
m_mutex.lock();
append(job);
m_mutex.unlock();
}
/**
* Append one task into the thread pool's task queue
* @param dg {void delegate()}
*/
void append(void delegate() dg)
{
Job *job;
char buf[256];
if (dg == null)
throw new Exception("dg null");
job = new Job;
job.dg = dg;
job.next = null;
job.call = Call.DG;
m_mutex.lock();
append(job);
m_mutex.unlock();
}
/**
* If dg not null, when one new thread is created, dg will be called.
* @param dg {void delegate()}
*/
void onThreadInit(void delegate() dg)
{
m_onThreadInit = dg;
}
/**
* If dg not null, before one thread exits, db will be called.
* @param dg {void delegate()}
*/
void onThreadExit(void delegate() dg)
{
m_onThreadExit = dg;
}
private:
enum Call { NO, FN, DG }
Mutex m_mutex;
Condition m_cond;
size_t m_stackSize = 0;
Job* m_jobHead = null, m_jobTail = null;
int m_nJob = 0;
bool m_isQuit = false;
int m_nThread = 0;
int m_nMaxThread;
int m_nIdleThread = 0;
int m_overloadTimeWait = 0;
int m_idleTimeout;
time_t m_lastWarn;
void delegate() m_onThreadInit;
void delegate() m_onThreadExit;
void append(Job *job)
{
if (m_jobHead == null)
m_jobHead = job;
else
m_jobTail.next = job;
m_jobTail = job;
m_nJob++;
if (m_nIdleThread > 0) {
m_cond.notify();
} else if (m_nThread < m_nMaxThread) {
Thread thread = new Thread(&doJob);
thread.isDaemon = true;
thread.start();
m_nThread++;
} else if (m_nJob > 10 * m_nMaxThread) {
time_t now = time(null);
if (now - m_lastWarn >= 2) {
m_lastWarn = now;
}
if (m_overloadTimeWait > 0) {
Thread.sleep(m_overloadTimeWait);
}
}
}
void doJob()
{
Job *job;
int status;
bool timedout;
long period = m_idleTimeout * 10_000_000;
if (m_onThreadInit != null)
m_onThreadInit();
m_mutex.lock();
for (;;) {
timedout = false;
while (m_jobHead == null && !m_isQuit) {
m_nIdleThread++;
if (period > 0) {
try {
if (m_cond.wait(period) ==
false) {
timedout = true;
break;
}
} catch (SyncException e) {
m_nIdleThread--;
m_nThread--;
m_mutex.unlock();
if (m_onThreadExit != null)
m_onThreadExit();
throw e;
}
} else {
m_cond.wait();
}
m_nIdleThread--;
} /* end while */
job = m_jobHead;
if (job != null) {
m_jobHead = job.next;
m_nJob--;
if (m_jobTail == job)
m_jobTail = null;
/* the lock shuld be unlocked before enter
working processs */
m_mutex.unlock();
switch (job.call) {
case Call.FN:
job.fn();
break;
case Call.DG:
job.dg();
break;
default:
break;
}
/* lock again */
m_mutex.lock();
}
if (m_jobHead == null && m_isQuit) {
m_nThread--;
if (m_nThread == 0)
m_cond.notifyAll();
break;
}
if (m_jobHead == null && timedout) {
m_nThread--;
break;
}
}
m_mutex.unlock();
writefln("Thread(%d) of ThreadPool exit now", pthread_self());
if (m_onThreadExit != null)
m_onThreadExit();
}
}
import std.stdio;
unittest
{
CThreadPool pool = new CThreadPool(10, 10);
void testThreadInit(string s)
{
void onThreadInit()
{
writefln("thread(%d) was created now, s: %s",
pthread_self(), s);
}
pool.onThreadInit(&onThreadInit);
}
void testThreadExit(string s)
{
void onThreadExit()
{
writefln("thread(%d) was to exit now, s: %s",
pthread_self(), s);
}
pool.onThreadExit(&onThreadExit);
}
void testAddJobs(string s)
{
void threadFun()
{
writef("doJob thread id: %d, str: %s\n",
pthread_self(), s);
Thread.sleep(10_000_000);
writef("doJob thread id: %d, wakeup now\n",
pthread_self());
}
pool.append(&threadFun);
pool.append(&threadFun);
pool.append(&threadFun);
}
string s = "hello world";
string s1 = "new thread was ok now";
string s2 = "thread exited now";
testThreadInit(s1);
testThreadExit(s2);
testAddJobs(s);
Thread.sleep(100_000_000);
}
More information about the Digitalmars-d-announce
mailing list