Various shared bugs

Jason House jason.james.house at gmail.com
Wed Dec 23 18:13:14 PST 2009


Graham St Jack wrote:

> On Mon, 07 Dec 2009 22:45:17 -0500, Jason House wrote:
>> I've backed out most of my pro-shared changes and will try again in a
>> few months :(
> 
> I have also given up on shared and am also adopting a waiting strategy.

Yeah, it's sad.  I have been successful in converting part of my code base 
to use shared, and suspect I can probably get another chunk working if I try 
really hard.  I partially suspect that the viral nature of shared makes any 
conversion of a code base tough.  It may be much easier to get something 
working if shared is used from the beginning while developing.


> I would love to get some tips from anyone (like Walter, for example) who
> thinks they have a way of using shared successfully.

Here's the one module that I was able to completely convert to use shared 
(and converted all uses of it to use shared).  Maybe it'd help you figure 
out how to get things to work?  It's a relatively brain dead message queue.  
It can't hold more than one message at a time (adding a second message will 
block until the first message has been received by all threads).  It's 
intended for one master thread to send a message to all recipients in a 
thread group.

module hb.io.ipc;

import std.cstream;
import core.thread;
import tango.core.Atomic;

template broadcastMessageQueue(target, double sleepSec=0.1){
	private enum int sleepTicks = cast(int) (sleepSec*100_000_000);
	/// Broadcasts a delegate to a bunch of identical recipients
	class sender{
		alias void delegate(target) messageType;
		private int id;
		private int max;
		private int pending;
		private messageType msg;
		this(int numberOfRecipients){ max = numberOfRecipients; }
		/// Only blocks if queue is full
		void send(shared messageType message) shared{
			waitForQueueToEmpty;
			id++;
			msg = message;
			pending = max;
		}
		/// Blocks until every recipient got the message
		void push(shared messageType message) shared{
			send(message);
			waitForQueueToEmpty;
		}
		private bool receive(int messageId, target t) shared{
			if (pending == 0 || id < messageId)
				return false;
			msg(t);
			atomicDecrement!(msync.raw)(pending);
			return true;
		}
		private void waitForQueueToEmpty() shared{
			while(pending > 0)
				Thread.sleep(sleepTicks);
		}
	}
	
	/// Receives delegates from the specified sender. Never blocks.
	class receiver{
		private target parent;
		private shared sender source;
		private int nextMessageId = 1;
		this(target t, shared sender s){ parent = t; source = s; }
		bool receive(){
			// Cast is hack to circumvent bugzilla issue #3089
			if (source.receive(nextMessageId, parent)){
				nextMessageId++;
				return true;
			}
			return false;
		}
	}
}

version(test)
unittest{
	derr.writefln("Testing broadcast message queue");
	class dummy{ int x; }
	auto foo = new dummy;
	auto bar = new dummy;
	// Extra parenthesis as hack to circumvent dmd bugzilla issue #3091 
	auto sender = new shared(broadcastMessageQueue!(dummy).sender)(2);
	auto rx1 = new broadcastMessageQueue!(dummy).receiver(foo, sender);
	auto rx2 = new broadcastMessageQueue!(dummy).receiver(bar, sender);

	assert(rx1.receive == false);
	assert(rx2.receive == false);

	sender.send( cast(shared void delegate(dummy)) (dummy d){d.x++;});

	assert(rx1.receive);
	assert(rx2.receive);

	assert(rx1.receive == false);
	assert(rx2.receive == false);


	assert(foo.x == 1);
	assert(bar.x == 1);
}



More information about the Digitalmars-d mailing list