Various shared bugs
Jason House
jason.james.house at gmail.com
Wed Dec 23 18:13:14 PST 2009
Graham St Jack wrote:
> On Mon, 07 Dec 2009 22:45:17 -0500, Jason House wrote:
>> I've backed out most of my pro-shared changes and will try again in a
>> few months :(
>
> I have also given up on shared and am also adopting a waiting strategy.
Yeah, it's sad. I have been successful in converting part of my code base
to use shared, and suspect I can probably get another chunk working if I try
really hard. I partially suspect that the viral nature of shared makes any
conversion of a code base tough. It may be much easier to get something
working if shared is used from the beginning while developing.
> I would love to get some tips from anyone (like Walter, for example) who
> thinks they have a way of using shared successfully.
Here's the one module that I was able to completely convert to use shared
(and converted all uses of it to use shared). Maybe it'd help you figure
out how to get things to work? It's a relatively brain dead message queue.
It can't hold more than one message at a time (adding a second message will
block until the first message has been received by all threads). It's
intended for one master thread to send a message to all recipients in a
thread group.
module hb.io.ipc;
import std.cstream;
import core.thread;
import tango.core.Atomic;
template broadcastMessageQueue(target, double sleepSec=0.1){
private enum int sleepTicks = cast(int) (sleepSec*100_000_000);
/// Broadcasts a delegate to a bunch of identical recipients
class sender{
alias void delegate(target) messageType;
private int id;
private int max;
private int pending;
private messageType msg;
this(int numberOfRecipients){ max = numberOfRecipients; }
/// Only blocks if queue is full
void send(shared messageType message) shared{
waitForQueueToEmpty;
id++;
msg = message;
pending = max;
}
/// Blocks until every recipient got the message
void push(shared messageType message) shared{
send(message);
waitForQueueToEmpty;
}
private bool receive(int messageId, target t) shared{
if (pending == 0 || id < messageId)
return false;
msg(t);
atomicDecrement!(msync.raw)(pending);
return true;
}
private void waitForQueueToEmpty() shared{
while(pending > 0)
Thread.sleep(sleepTicks);
}
}
/// Receives delegates from the specified sender. Never blocks.
class receiver{
private target parent;
private shared sender source;
private int nextMessageId = 1;
this(target t, shared sender s){ parent = t; source = s; }
bool receive(){
// Cast is hack to circumvent bugzilla issue #3089
if (source.receive(nextMessageId, parent)){
nextMessageId++;
return true;
}
return false;
}
}
}
version(test)
unittest{
derr.writefln("Testing broadcast message queue");
class dummy{ int x; }
auto foo = new dummy;
auto bar = new dummy;
// Extra parenthesis as hack to circumvent dmd bugzilla issue #3091
auto sender = new shared(broadcastMessageQueue!(dummy).sender)(2);
auto rx1 = new broadcastMessageQueue!(dummy).receiver(foo, sender);
auto rx2 = new broadcastMessageQueue!(dummy).receiver(bar, sender);
assert(rx1.receive == false);
assert(rx2.receive == false);
sender.send( cast(shared void delegate(dummy)) (dummy d){d.x++;});
assert(rx1.receive);
assert(rx2.receive);
assert(rx1.receive == false);
assert(rx2.receive == false);
assert(foo.x == 1);
assert(bar.x == 1);
}
More information about the Digitalmars-d
mailing list