Type-safe channels (Re: One year of Go)
Graham Fawcett
fawcett at uwindsor.ca
Thu Nov 18 07:15:53 PST 2010
Hi Sean,
On Tue, 16 Nov 2010 10:45:37 -0500, Sean Kelly wrote:
> Graham Fawcett Wrote:
>
>> On Fri, 12 Nov 2010 18:15:15 -0500, Sean Kelly wrote:
>> >
>> > To be honest, I haven't spent much time with Go because my
>> > cursory exposure to the language hasn't shown it to solve the
>> > problems I care about better than the languages I currently use.
>> > I think Go is in the right ballpark with channels though, and the
>> > rest is passable. I'm trying to withhold my opinion about the
>> > lack of generics and such and evaluate the language based on
>> > what's there.
>>
>> Since you brought up channels...
>>
>> One thing I very much like about Go (and Haskell, etc.) that I
>> don't think has a counterpart in Phobos is type-safe message
>> passing. I like the flexibility of the current "mailbox" design.
>> But I'd really like an alternative that could enforce type-safety
>> on messages. I don't think per-thread mailboxes could accommodate
>> this, but Channel objects could; e.g.:
>>
>> struct Channel(T) {
>> T receive() { ... }
>> void send(T val) { ... }
>> }
>
> It shouldn't be too difficult to wrap send/receive with such a
> design to provide type safety. In fact, it's one reason I wanted to
> use the flexible model. Of course, using channels would be a matter
> of convention, since both would be available.
I don't have code at hand, but I remember getting frustrated trying to
put a type-safe wrapper around send/receive. I think the compiler was
failing to correctly type-check the delegates I was using to process
the messages against the types of messages I was trying to enforce.
But I was probably just doing it wrong, I'm still a D newbie.
>> (I am guessing that tricks might have to be played to pass Channel
>> objects around as pure values across threads, but I'm also guessing
>> this is surmountable...)
>
> It should be possible to pass shared classes as well. If the Channel
> is non-shared then you'll have to do some casting in the
> implementation for send/receive to allow it though.
For fun, I whipped up a little type-safe channel implementation. It's
not stdlib quality, but it's fun to play with. :) As written below,
Chans can't be passed via spawn() or send(), although I suspect that
Chan could be turned into a shared class pretty easily. Ruthless
critiques are welcome!
Best,
Graham
/**
A type-safe channel implementation. It's undoubtedly flawed, use with
care.
Author: Graham Fawcett <fawcett at uwindsor.ca>
License: <a href="http://www.boost.org/LICENSE_1_0.txt">Boost License
1.0</a>.
*/
module chans;
import core.sync.condition;
import core.sync.mutex;
import core.thread : Thread;
class Chan(T) {
alias immutable(T) IT;
Mutex lock;
Condition hasAny;
IT[] queue;
this() {
lock = new Mutex;
hasAny = new Condition(lock);
}
void send(IT val) {
synchronized (lock) {
queue ~= val; // note, there's no upper bound on queue size.
hasAny.notify();
}
}
IT receive() {
synchronized (lock) {
while (true) {
if (queue.length > 0) {
IT val = queue[0];
queue = queue[1..$];
return val;
} else {
hasAny.wait();
}
}
}
}
// a return type for receive(timeout).
immutable struct maybeResult {
bool timedOut;
IT value;
}
maybeResult receive(long timeout) {
// tip: timeout = seconds * 10_000_000
synchronized (lock) {
while (true) {
if (queue.length > 0) {
IT val = queue[0];
queue = queue[1..$];
return maybeResult(false, val);
} else {
bool timedOut = !hasAny.wait(timeout);
if (timedOut) {
return maybeResult(true);
}
}
}
}
}
}
/* Testing: Start up a worker pool of N threads, all listening on a
common request channel. Each worker takes one request, calculates a
response (ooh, multiplication!) and sends the result to a common
response channel; then it exits. The main thread collects the results
and prints the sum of the answers. */
import std.stdio;
struct request { int payload; }
struct response { int result; }
void multiplier(Chan!request chan, Chan!response outChan) {
auto req = chan.receive();
writef("(%s<-%d) ", Thread.getThis().name, req.payload);
stdout.flush();
outChan.send(response(100 * req.payload));
}
void test1() {
enum NUM_WORKERS = 100;
auto chan = new Chan!request;
auto outChan = new Chan!response;
foreach(x; 0..NUM_WORKERS) {
Thread t = new Thread(() { multiplier(chan, outChan); });
t.name = std.string.format("T%d", x);
t.isDaemon = true;
t.start();
}
foreach(x; 0..NUM_WORKERS) {
chan.send(request(x));
}
auto sum = 0;
foreach(x; 0..NUM_WORKERS) {
auto r = outChan.receive();
sum += r.result;
}
writefln("\nsum of responses: %s", sum);
}
void main() {
test1();
}
More information about the Digitalmars-d
mailing list