module alt.concurrency; //public import alt.io; import std.algorithm; import std.contracts; import std.stdio; import std.conv; import std.traits; import std.typecons; import std.typetuple; import core.sync.condition; import core.sync.mutex; import core.thread; import core.stdc.stdlib; // // Provides an alternative message-passing concurrency implementation // to std.concurrency. It is similar to std.concurrency, but is different in that: // * Messages cannot contain references to mutable data. // * Message queues (Channels) are explicitly created and passed to threads as // arguments to spawn(). // * Channels are synchronized (and thus shared). // * The types of messages a Channel supports are well-defined. // * Messages are processed in order. // * Multiple threads can receive from the same Channel. // * Out of band messages are not supported directly, but can be implemented via // additional Channels. // * Channels have an internal pipe that can be used in a Selector to allow // a thread to receive input from multiple Channels and file-descriptor devices // such as sockets and pipes. // * There is no automatic thread-termination behaviour. // // The motivation for this module is to stimulate some discussion about // how D's standard message-passing support should behave. // It is also a learning experience for the author. // The std.concurrency issues of most interest to the author are: // * Messages are not named - they are inferred from the types. // * Lack of compile-time checking. // * It allows mutable references to be sent in messages. // // Graham St Jack, July 2010. // This is free software - do with it as you will. // // Currently only Linux is supported. // // TODO - prevent non-shareable data from being passed to spawn(). // TODO - handling of signals. // TODO - print demangled stack trace on unexpected thread death. // TODO - provide Windows and Mac support in alt.io, tidy up alt.io // and debug it properly. // // // A Channel for sending messages between threads. // The internal pipe is used to block removal until a message is available, // and to provide a file-descriptor to select on. Multiple threads // may add and remove messages from the queue, but usually one thread // adds and one other thread removes. // public class ChannelFull : Exception { this() { super("channel full"); } } public class ChannelFinalized: Exception { this() { super("channel finalized"); } } // A SelectableReader returned from a Channel for use in a Selector // The Channel itself cannot be used because it is shared. /* class Selectable : ISelectableReader { private int mFd; this(int fd) { mFd = fd; } // ISelectableReader implementation: override uint read(void * buffer, uint count) { assert(false, "read is not supported"); } override int read_descriptor() { return mFd; } } */ // FIXME - class should be synchronized, but druntime.core.sync's // Condition class doesn't work with synchronized classes or methods yet. class Channel(T) { private { struct Node { T payload; Node* next; this(T payload) { this.payload = payload; } } Node* mFront; Node* mBack; uint mCapacity; uint mCount; //ubyte mZero; //Pipe mPipe; bool mFinalized; Mutex mMutex; Condition mReadCondition; Condition mWriteCondition; } this(uint capacity = 0) { mCapacity = capacity; //mPipe = new Pipe(); mMutex = new Mutex(); mReadCondition = new Condition(mMutex); mWriteCondition = new Condition(mMutex); } // Return a new Selectable that can be used in a Selector to discover // when the Channel is not empty. /* Selectable new_selectable() { synchronized(mMutex) { return new Selectable(mPipe.read_descriptor); } } */ // Finalise the channel, causing it to throw ChannelFinalised on remove() when empty. final void finalize() { synchronized(mMutex) { mFinalized = true; if (!mCount) { //mPipe.write(&mZero, mZero.sizeof); mReadCondition.notifyAll; } } } // Add a message to the back of the channel, throwing if full final void add(T msg) { synchronized(mMutex) { if (mCapacity && mCount == mCapacity) { throw new ChannelFull(); } auto n = new Node(msg); n.next = null; if (!mFront) mFront = n; if (!mBack) mBack = n; else mBack.next = n; mBack = n; if (!mCount) { //mPipe.write(&mZero, mZero.sizeof); mReadCondition.notifyAll; } ++mCount; } } // Blocking version of add final void add_with_block(T msg) { synchronized(mMutex) { while (mCapacity && mCount == mCapacity) { mWriteCondition.wait; } add(msg); } } // Remove and return the front message, blocking until one is available // or throwing if finalized and empty T remove() { synchronized(mMutex) { while (!mCount) { if (mFinalized) { throw new ChannelFinalized(); } else { mReadCondition.wait; } } if (mCapacity && mCount == mCapacity) { mWriteCondition.notifyAll; } T msg = mFront.payload; assert(mFront); mFront = mFront.next; if (!mFront) mBack = null; --mCount; if (!mCount) { //mPipe.read(&mZero, mZero.sizeof); } return msg; } } } // // Spawn and start a thread on the given run function. // FIXME - insist on arguments being shared or !hasAliasing. // void spawn(T...)(void function(T) run, T args) { void exec() { try { run(args); } catch (Throwable ex) { writeln("Thread died with: " ~ ex.msg); exit(1); } } auto t = new Thread(&exec); t.start(); } // // Protocol - defines: // * a family of messages, // * a Channel type to send them through, // * send methods to send the messages, // * a process method to dispatch received messages, and // * an interface to process them with. // // Synopsis: // // alias Protocol!(Message!("message1", string, "action"), // Message!("message2", int, "val1", int, "val2")) workProtocol; // // class Worker : workProtocol.IHandler { // override void message1(string action) {...} // override void message2(int val1, int val2) {...} // } // // void do_work(workProtocol.Chan channel, string name) nothrow { // writefln("%s starting", name); // auto worker = new Worker(); // try { // for (;;) { // workProtocol.receive(channel, worker); // } // } // catch (ChannelFinalized ex) { normal_exit_stuff; } // catch (Exception ex) { abnormal_exit_stuff; } // } // // void main() { // auto channel = new workProtocol.Chan; // spawn(&do_work, channel); // workProtocol.message1(channel, "fred"); // workProtocol.message2(channel, 1, 2); // channel.finalize; // } // // // // Notes: // * A worker function can be passed any number of channels, and may receive // input from any number of other file-decriptor-based event sources, // using a Selector to avoid making blocking I/O calls. // * Message is a cut-down version of Tuple that is intended for use only // in composing protocols. It deliberately omits operators, etc that might // tempt its use elsewhere. // // // Template that resolves to various string representations of the Message. // Expects a message name followed by pairs of parameter type and parameter name. // template Message(string name, T...) { static assert(T.length > 0); static assert(!hasAliasing!(T[0]), "Cannot use " ~ T[0].stringof ~ " in a message"); alias name msgName; static if (is(typeof(T[1]) : string)) { static if (T.length > 2) { alias Message!(name, T[2..$]) Next; enum string fieldStr = T[0].stringof ~ " " ~ T[1] ~ "; " ~ Next.fieldStr; enum string paramStr = T[0].stringof ~ " " ~ T[1] ~ ", " ~ Next.paramStr; enum string nameStr = T[1] ~ ", " ~ Next.nameStr; enum string callStr = "message." ~ name ~ "." ~ T[1] ~ ", " ~ Next.callStr; } else { enum string fieldStr = T[0].stringof ~ " " ~ T[1] ~";"; enum string paramStr = T[0].stringof ~ " " ~ T[1]; enum string nameStr = T[1]; enum string callStr = "message." ~ name ~ "." ~ T[1]; } } else { static assert(false, "Message parameters must be named"); } } // // Helper to stamp out recursive string-defining templates // private template Paste(string name) { enum string Paste = "static if (T.length > 1) { enum " ~ name ~ " = _" ~ name ~ " ~ Next." ~ name ~ "; } else { enum string " ~ name ~ " = _" ~ name ~ "; }; "; } // // Helper to return strings for use in mixins by Protocol template. // All the strings roll out to include an entry for each name/Params pair in T... // private template ProtocolStrings(uint index, T...) { static if (is(typeof(T[0].msgName) : string) && is(typeof(T[0].fieldStr) : string) && is(typeof(T[0].paramStr) : string) && is(typeof(T[0].nameStr) : string) && is(typeof(T[0].callStr) : string)) { static if (T.length > 1) { alias ProtocolStrings!(index+1, T[1..$]) Next; } // message struct: struct name { fields } enum string _msgStr = "\nstruct " ~ T[0].msgName ~ "Msg { " ~ T[0].fieldStr ~ " }"; enum string _unionStr = "\n " ~ T[0].msgName ~ "Msg " ~ T[0].msgName ~ ";"; enum string _thisStr = "\n this(ref " ~ T[0].msgName ~ "Msg msg) {" ~ "\n kind = " ~ to!string(index) ~ ";" ~ "\n " ~ T[0].msgName ~ " = msg;" ~ "\n }"; enum string _interfaceStr = "\n void " ~ T[0].msgName ~ "(" ~ T[0].paramStr ~ ");"; enum string _sendStr = "\nvoid " ~ T[0].msgName ~ "(Chan channel, " ~ T[0].paramStr ~ ") {" ~ "\n channel.add(Message(" ~ T[0].msgName ~ "Msg(" ~ T[0].nameStr ~ ")));" ~ "\n}"; enum string _caseStr = "\n case " ~ to!string(index) ~ ":" ~ "\n handler." ~ T[0].msgName ~ "(" ~ T[0].callStr ~ ");" ~ "\n break;"; mixin(Paste!("msgStr")); mixin(Paste!("unionStr")); mixin(Paste!("thisStr")); mixin(Paste!("interfaceStr")); mixin(Paste!("sendStr")); mixin(Paste!("caseStr")); } else { static assert(false, "protocols must have pairs of names and Params"); } } // // Protocol - expects pairs of strings and Params, representing // message names and message parameters. // The strings used in mixins are available for inspection should you need them. // // Params do not need to be different from each other. // The types used in Params fields must not refer to non-invariant data. // template Protocol(T...) { alias ProtocolStrings!(0, T) Strings; // Define the message types enum string msgStr = Strings.msgStr; // Define the discriminated union to hold any message // struct Message { // uint kind; // union { // messages // } // this(params) { // kind = index; // 0 for first message type, 1 for second, etc // message = { params }; // } // ... // } enum string unionStr = "\nstruct Message {" ~ "\n uint kind;" ~ "\n union { " ~ Strings.unionStr ~ "\n }" ~ Strings.thisStr ~ "\n}"; // Define the channel enum string channelStr = "alias Channel!(Message) Chan;"; // Define the interface type: interface { void msg1_name(msg1_params); ... } enum string interfaceStr = "\ninterface IHandler { " ~ Strings.interfaceStr ~ "\n}"; // Define the send functions: // void message_name(Chan channel, params) { // channel.add(invariant(Message)(params)) // } // ... enum string sendStr = Strings.sendStr; // Define the receive function: // void receive(Chan channel, IHandler handler) { // auto message = channel.receive(); // switch (message.kind) { // case 0: // break; // ... // default // assert(false, "unsupported message kind"); // } // } enum string receiveStr = "\nvoid receive(Chan channel, IHandler handler) {" ~ "\n auto message = channel.remove();" ~ "\n switch (message.kind) {" ~ Strings.caseStr ~ "\n default:" ~ "\n assert(0, \"Received unsupported message kind\");" ~ "\n }" ~ "\n}"; enum string codeStr = msgStr ~ unionStr ~ channelStr ~ interfaceStr ~ sendStr ~ receiveStr; mixin(codeStr); } unittest { alias Protocol!(Message!("message1", string, "action"), Message!("message2", int, "val1", int, "val2")) workProtocol; writefln("%s", workProtocol.codeStr); // static necessary to move to module scope so do_work can access static class Worker : workProtocol.IHandler { override void message1(string action) { writefln("got message1: action=%s", action); } override void message2(int val1, int val2) { writefln("got message2: val1=%s val2=%s", val1, val2); } } // static necessary to move to module scope so function ptr can be taken static void do_work(workProtocol.Chan channel, string name) { try { writefln("%s starting", name); scope worker = new Worker(); for (;;) { workProtocol.receive(channel, worker); } } catch (ChannelFinalized ex) {} catch (Exception ex) { assert(0, "unexpected exception"); } } auto channel = new workProtocol.Chan; spawn(&do_work, channel, "Sam"); workProtocol.message1(channel, "Fred"); workProtocol.message2(channel, 1, 2); channel.finalize; }