How to implement parallelization well.
Johan Granberg
lijat.meREM at OVEgmail.com
Sat Jul 15 00:06:41 PDT 2006
I managed to get it to the point where it sortof works but then crashes
after about 5 seconds. The crash is always at the same line which
consists of a call to Thread.yield and it appears when the gc is doing a
full collect
//thread 1
#0 0x9002c3d8 in semaphore_wait_trap ()
(gdb) up
#1 0x0001ed54 in _D3std6thread6Thread8pauseAllFZv () at util.d:1
1 module sige.util;
(gdb) up
#2 0x00034510 in _D3gcx3Gcx11fullcollectFPvZk () at util.d:1
1 module sige.util;
(gdb) up
#3 0x00034c8c in _D3gcx3Gcx16fullcollectshellFZk () at util.d:1
1 module sige.util;
//thread 2
Program received signal SIGUSR1, User defined signal 1.
[Switching to process 1614 thread 0x4807]
0x9002c368 in swtch_pri ()
(gdb) up
#1 0x9002c334 in sched_yield ()
(gdb) up
#2 0x00007d94 in _D8parallel10WorkThread4mainFZi (this=@0x503200) at
parallel.d:36
//code
private import std.thread;
private class Signal
{
bool done=false;
}
private class WorkThread:Thread//TODO check if thread safe if not make it
{
Signal signal;
bool free=false;
bool job=false;
void delegate() work;
private int main()
{
while(!free)
if(job)
{
void delegate() dg;
synchronized(this)
{
dg=work;
}
dg();
synchronized(this)
{
work=null;
signal.done=true;
signal=null;
job=false;
}
}
else
yield();//hanged here
return 0;
}
this()
{
super(&main,0);
this.start();
}
Signal execute(void delegate() dg)
{
synchronized(this)
{
work=dg;
job=true;
return signal=new Signal();
}
}
void release()
{
free=true;
}
bool idle()
{
synchronized(this)
{
return !signal;
}
}
}
private class Lock
{
WorkThread[] threads;
private WorkThread expand()
{
synchronized(this)
{
threads.length=threads.length+1;
return threads[$-1]=new WorkThread();
}
}
private WorkThread free_thread()
{
synchronized(this)
{
foreach(t;threads)
if(t.idle())
return t;
}
return expand();
}
}
private class ThreadPool//TODO check so it is thread safe (looks ok. ask?)
{
private static Lock lock;
static this()
{
lock=new Lock();
}
static Signal execute(void delegate() dg)
{
return lock.free_thread().execute(dg);
}
}
private bool running(Signal[] s)
{
foreach(b;s)
if(!b.done)
return true;
return false;
}
///executes functions in parallel with the first function executed in the
///calling thread. returns when all threads have finished
void parallelize(void delegate()[] functions)
{
Signal[] s;
if(!functions)
return;
synchronized
{
if(functions.length>1)
{
s.length=functions.length-1;
foreach(i,f;functions[1..$])
if(f)
s[i]=ThreadPool.execute(f);
}
}
if(functions[0])
functions[0]();
while(running(s))Thread.yield();
}
uint cores()
{
}
More information about the Digitalmars-d
mailing list