How to implement parallelization well.

Johan Granberg lijat.meREM at OVEgmail.com
Sat Jul 15 00:06:41 PDT 2006


I managed to get it to the point where it sortof works but then crashes 
after about 5 seconds. The crash is always at the same line which 
consists of a call to Thread.yield and it appears when the gc is doing a 
full collect

//thread 1
#0  0x9002c3d8 in semaphore_wait_trap ()
(gdb) up
#1  0x0001ed54 in _D3std6thread6Thread8pauseAllFZv () at util.d:1
1       module sige.util;
(gdb) up
#2  0x00034510 in _D3gcx3Gcx11fullcollectFPvZk () at util.d:1
1       module sige.util;
(gdb) up
#3  0x00034c8c in _D3gcx3Gcx16fullcollectshellFZk () at util.d:1
1       module sige.util;

//thread 2
Program received signal SIGUSR1, User defined signal 1.
[Switching to process 1614 thread 0x4807]
0x9002c368 in swtch_pri ()
(gdb) up
#1  0x9002c334 in sched_yield ()
(gdb) up
#2  0x00007d94 in _D8parallel10WorkThread4mainFZi (this=@0x503200) at 
parallel.d:36

//code
private import std.thread;

private class Signal
{
	bool done=false;
}
private class WorkThread:Thread//TODO check if thread safe if not make it
{
	Signal signal;
	bool free=false;
	bool job=false;
	void delegate() work;
	private int main()
	{
		while(!free)
			if(job)
			{
				void delegate() dg;
				synchronized(this)
				{
					dg=work;
				}
				dg();
				synchronized(this)
				{
					work=null;
					signal.done=true;
					signal=null;
					job=false;
				}
			}
			else
				yield();//hanged here
		return 0;
	}
	this()
	{
		super(&main,0);
		this.start();
	}
	Signal execute(void delegate() dg)
	{
		synchronized(this)
		{
			work=dg;
			job=true;
			return signal=new Signal();
		}
	}
	void release()
	{
		free=true;
	}
	bool idle()
	{
		synchronized(this)
		{
			return !signal;
		}
	}
}
private class Lock
{
	WorkThread[] threads;
	private WorkThread expand()
	{
		synchronized(this)
		{
			threads.length=threads.length+1;
			return threads[$-1]=new WorkThread();
		}
	}
	private WorkThread free_thread()
	{
		synchronized(this)
		{
			foreach(t;threads)
				if(t.idle())
					return t;
		}
		return expand();
	}
}
private class ThreadPool//TODO check so it is thread safe (looks ok. ask?)
{
	
	private static Lock lock;
	static this()
	{
		lock=new Lock();
	}
	static Signal execute(void delegate() dg)
	{
		return lock.free_thread().execute(dg);
	}
}
private bool running(Signal[] s)
{
	foreach(b;s)
		if(!b.done)
			return true;
	return false;
}
///executes functions in parallel with the first function executed in the
///calling thread. returns when all threads have finished
void parallelize(void delegate()[] functions)
{
	Signal[] s;
	if(!functions)
		return;
	synchronized
	{
		if(functions.length>1)
		{
			s.length=functions.length-1;
			foreach(i,f;functions[1..$])
				if(f)
					s[i]=ThreadPool.execute(f);
		}
	}
	if(functions[0])
		functions[0]();
	while(running(s))Thread.yield();
}
uint cores()
{
	
}




More information about the Digitalmars-d mailing list