Type-safe channels (Re: One year of Go)

Graham Fawcett fawcett at uwindsor.ca
Thu Nov 18 07:15:53 PST 2010


Hi Sean,

On Tue, 16 Nov 2010 10:45:37 -0500, Sean Kelly wrote:

> Graham Fawcett Wrote:
> 
>> On Fri, 12 Nov 2010 18:15:15 -0500, Sean Kelly wrote:
>> > 
>> > To be honest, I haven't spent much time with Go because my
>> > cursory exposure to the language hasn't shown it to solve the
>> > problems I care about better than the languages I currently use.
>> > I think Go is in the right ballpark with channels though, and the
>> > rest is passable. I'm trying to withhold my opinion about the
>> > lack of generics and such and evaluate the language based on
>> > what's there.
>> 
>> Since you brought up channels...
>> 
>> One thing I very much like about Go (and Haskell, etc.) that I
>> don't think has a counterpart in Phobos is type-safe message
>> passing. I like the flexibility of the current "mailbox" design.
>> But I'd really like an alternative that could enforce type-safety
>> on messages. I don't think per-thread mailboxes could accommodate
>> this, but Channel objects could; e.g.:
>> 
>> struct Channel(T) {
>>     T receive() { ... }
>>     void send(T val) { ... }
>> }
> 
> It shouldn't be too difficult to wrap send/receive with such a
> design to provide type safety. In fact, it's one reason I wanted to
> use the flexible model. Of course, using channels would be a matter
> of convention, since both would be available.

I don't have code at hand, but I remember getting frustrated trying to
put a type-safe wrapper around send/receive. I think the compiler was
failing to correctly type-check the delegates I was using to process
the messages against the types of messages I was trying to enforce.
But I was probably just doing it wrong, I'm still a D newbie.

>> (I am guessing that tricks might have to be played to pass Channel
>> objects around as pure values across threads, but I'm also guessing
>> this is surmountable...)
> 
> It should be possible to pass shared classes as well. If the Channel
> is non-shared then you'll have to do some casting in the
> implementation for send/receive to allow it though.

For fun, I whipped up a little type-safe channel implementation. It's
not stdlib quality, but it's fun to play with. :) As written below,
Chans can't be passed via spawn() or send(), although I suspect that
Chan could be turned into a shared class pretty easily. Ruthless
critiques are welcome!

Best,
Graham


/**
   A type-safe channel implementation. It's undoubtedly flawed, use with 
care.
   Author: Graham Fawcett <fawcett at uwindsor.ca>
   License: <a href="http://www.boost.org/LICENSE_1_0.txt">Boost License 
1.0</a>.
*/

module chans;

import core.sync.condition;
import core.sync.mutex;
import core.thread : Thread;

class Chan(T) {

  alias immutable(T) IT;

  Mutex lock;
  Condition hasAny;
  IT[] queue;

  this() {
    lock = new Mutex;
    hasAny = new Condition(lock);
  }

  void send(IT val) {
    synchronized (lock) {
      queue ~= val;     // note, there's no upper bound on queue size.
      hasAny.notify();
    }
  }

  IT receive() {
    synchronized (lock) {
      while (true) {
        if (queue.length > 0) {
          IT val = queue[0];
          queue = queue[1..$];
          return val;
        } else {
          hasAny.wait();
        }
      }
    }
  }

  // a return type for receive(timeout).
  immutable struct maybeResult {
    bool timedOut;
    IT value;
  }

  maybeResult receive(long timeout) {
    // tip: timeout = seconds * 10_000_000
    synchronized (lock) {
      while (true) {
        if (queue.length > 0) {
          IT val = queue[0];
          queue = queue[1..$];
          return maybeResult(false, val);
        } else {
          bool timedOut = !hasAny.wait(timeout);
          if (timedOut) {
            return maybeResult(true);
          }
        }
      }
    }
  }
}


/* Testing: Start up a worker pool of N threads, all listening on a
common request channel. Each worker takes one request, calculates a
response (ooh, multiplication!) and sends the result to a common
response channel; then it exits. The main thread collects the results
and prints the sum of the answers. */

import std.stdio;

struct request { int payload; }
struct response { int result; }

void multiplier(Chan!request chan, Chan!response outChan) {
  auto req = chan.receive();
  writef("(%s<-%d) ", Thread.getThis().name, req.payload);
  stdout.flush();
  outChan.send(response(100 * req.payload));
}

void test1() {
  enum NUM_WORKERS = 100;
  auto chan = new Chan!request;
  auto outChan = new Chan!response;

  foreach(x; 0..NUM_WORKERS) {
    Thread t = new Thread(() { multiplier(chan, outChan); });
    t.name = std.string.format("T%d", x);
    t.isDaemon = true;
    t.start();
  }

  foreach(x; 0..NUM_WORKERS) {
    chan.send(request(x));
  }

  auto sum = 0;
  foreach(x; 0..NUM_WORKERS) {
    auto r = outChan.receive();
    sum += r.result;
  }
  writefln("\nsum of responses: %s", sum);
}

void main() {
  test1();
}


More information about the Digitalmars-d mailing list