module bedrock.maxim.concurrency; public import bedrock.maxim.io; public import bedrock.maxim.stream; import bedrock.maxim.logging; import std.algorithm; import std.stdio; import std.conv; import std.traits; import std.typecons; import std.typetuple; import core.thread; import core.stdc.stdlib; import core.stdc.errno; import core.stdc.config; import core.sys.posix.pthread; import core.sys.posix.signal; static import linux = std.c.linux.linux; // // Provides a message-passing concurrency implementation. // * Messages cannot contain references to mutable data. // * Message queues (Channels) are explicitly created and passed to threads as // arguments to spawn(). // * Channels are synchronized (and thus shared). // * The types of messages a Channel supports are well-defined. // * Messages are processed in order. // * Multiple threads can receive from the same Channel. // * Channels have an internal pipe that can be used in a Selector to allow // a thread to receive input from multiple Channels and file-descriptor devices // such as sockets and pipes. // * There is no automatic thread-termination behaviour. // // TODO - prevent non-shared aliased data from being passed to spawn(). // TODO - gathering non-fatal signals via signalfd, putting info on a Channel. // //--------------------------------------------------------------------- // Condition - Adapted from core.sync.condition to be linux-specific, // use an object's monitor and to be shared. //--------------------------------------------------------------------- public class ConditionException : Exception { this(string text) { super(text); } } // clock_gettime definitions copied from commented-out and scattered // definitions in druntime private { enum int CLOCK_REALTIME = 0; enum int TIMER_ABSTIME = 0x01; alias int clockid_t; extern (C) int clock_gettime(clockid_t, timespec*); } shared class Condition { private { pthread_cond_t _handle; shared Object _obj; } // Initialise the Condition, which uses the supplied Object's monitor // for its mutex. this(shared Object o) { assert(o, "Object must not be null"); _obj = o; int rc = pthread_cond_init(cast(pthread_cond_t*)&_handle, null); assert(!rc, "Unable to create condition"); } ~this() { pthread_cond_destroy(cast(pthread_cond_t*)&_handle); } // Wait until someone calls notify() or notifyAll() // Note that signals do not interrupt a wait. void wait() { int rc = pthread_cond_wait(cast(pthread_cond_t*)&_handle, mutexPtr); if (rc == 0) return; if (rc == EINVAL) throw new ConditionException("Invalid parameters"); if (rc == EPERM) throw new ConditionException("Mutex not owned by calling thread"); throw new ConditionException("Unknown error=" ~ to!string(rc)); } // Wait with a timeout in microseconds, returning false on timeout, and true otherwise // Note that signals do not interrupt a wait. bool wait(long microseconds) { enum long NANOS_PER_SEC = 1_000_000_000; enum long NANOS_PER_MICRO = 1_000; timespec t = void; clock_gettime(CLOCK_REALTIME, &t); long nanos = t.tv_sec * NANOS_PER_SEC + t.tv_nsec + microseconds * NANOS_PER_MICRO; t.tv_sec = cast(time_t) (nanos / NANOS_PER_SEC); t.tv_nsec = cast(c_long) (nanos % NANOS_PER_SEC); int rc = pthread_cond_timedwait(cast(pthread_cond_t*)&_handle, mutexPtr, &t); if (rc == 0) return true; if (rc == ETIMEDOUT) return false; if (rc == EINVAL) throw new ConditionException("Invalid parameters"); if (rc == EPERM) throw new ConditionException("Mutex not owned by calling thread"); throw new ConditionException("Unknown error=" ~ to!string(rc)); } // Notify one waiter void notify() { pthread_cond_signal(cast(pthread_cond_t*)&_handle); } // Notify all waiters void notifyAll() { pthread_cond_broadcast(cast(pthread_cond_t*)&_handle); } // Return a pointer to the object's monitor's mutex. // Note - can't stash it on construction because the monitor isn't // created until it is needed for a synchronized call. private pthread_mutex_t* mutexPtr() { struct Monitor { // from monitor.c void* impl; size_t length; void* ptr; size_t refs; // added in dmd 2.048 pthread_mutex_t mon; } Monitor * monitor = cast(Monitor*) (cast(void**)_obj)[1]; // from object_.d return cast(pthread_mutex_t*)&monitor.mon; } } //----------------------------------------------------------------------------------- // A Pipe for transferring raw data between threads. Usually it is better to use // a Channel with a defined protocol (see below), however Pipes can be useful // for tests and simulations where you want to emulate (say) a socket or serial port. //----------------------------------------------------------------------------------- class PipeReadable : ISelectableReadable { private int _fd; this(int fd) { _fd = linux.dup(fd); } ~this() { linux.close(_fd); } override uint read(void* buffer, uint count) { return fdRead(_fd, buffer, count); } override protected int read_descriptor() { return _fd; } } class PipeWriteable : ISelectableWriteable { private int _fd; this(int fd) { _fd = fd; } ~this() { linux.close(_fd); } override uint write(in void* buffer, uint count) { return fdWrite(_fd, buffer, count); } override protected int write_descriptor() { return _fd; } } synchronized class Pipe { private { int readFd, writeFd; } this() { int[2] fds; int rc = linux.pipe(fds); assert(!rc); readFd = fds[0]; writeFd = fds[1]; } ~this() { if (readFd != -1) linux.close(readFd); if (writeFd != -1) linux.close(writeFd); } // one-shot return of a new reader PipeReadable readable() { assert(readFd != -1); auto r = new PipeReadable(readFd); readFd = -1; return r; } // one-shot return of a new writer PipeWriteable writeable() { assert(writeFd != -1); auto w = new PipeWriteable(writeFd); writeFd = -1; return w; } } //--------------------------------------------------------------------------- // A Channel for sending messages between threads. // An internal eventfd provides a file-descriptor to select on for removal. // Multiple threads may add and remove messages from the queue, but usually // one thread adds and one other thread removes. //--------------------------------------------------------------------------- public class ChannelFull : Exception { this() { super("channel full"); } } public class ChannelFinalized: Exception { this() { super("channel finalized"); } } // An ISelectableReader returned from a Channel for use in a Selector // The Channel itself cannot be used because it is shared. class ChannelSelectable : ISelectableReadable { private int _fd; this(int fd) { _fd = fd; } // ISelectableReadable implementation: override uint read(void * buffer, uint count) { assert(false, "read is not supported"); } override protected int read_descriptor() { return _fd; } } extern (C) int eventfd(int initval, int flags); synchronized class Channel(T) if (!hasAliasing!T) { private { struct Node { T payload; Node* next; this(T payload_) { payload = payload_; } } Node* _front; Node* _back; uint _capacity; uint _count; int _fd; bool _finalized; Condition _readCondition; Condition _writeCondition; } this(uint capacity = 0) { assert(capacity >= 0, "Capacity must not be negative"); _capacity = capacity; _fd = eventfd(0, 0); assert(_fd != -1, "Failed to open eventfd"); _readCondition = new shared(Condition)(this); _writeCondition = new shared(Condition)(this); } ~this() { linux.close(_fd); } // Return a new Selectable that can be used in a Selector to discover // when the Channel is not empty. ChannelSelectable newSelectable() { return new ChannelSelectable(_fd); } // Finalise the channel, causing it to throw ChannelFinalised on remove() when empty. final void finalize() { _finalized = true; if (!_count) { ulong one = 1; linux.write(_fd, &one, one.sizeof); _readCondition.notifyAll; } } // Add a message to the back of the channel, throwing if full final void add(T msg) { if (_capacity && _count == _capacity) { throw new ChannelFull(); } auto n = cast(shared) new Node(msg); n.next = null; if (!_front) _front = n; if (!_back) _back = n; else _back.next = n; _back = n; if (!_count) { ulong one = 1; linux.write(_fd, &one, one.sizeof); _readCondition.notifyAll; } ++_count; } // Blocking version of add final void addBlocking(T msg) { while (_capacity && _count == _capacity) { _writeCondition.wait; } add(msg); } // Remove and return the front message, blocking until one is available // or throwing if finalized and empty T remove() { while (!_count) { if (_finalized) { //log_trace("throwing"); throw new ChannelFinalized(); } else { //log_trace("waiting"); _readCondition.wait; } } if (_capacity && _count == _capacity) { _writeCondition.notifyAll; } T msg = _front.payload; assert(_front); _front = _front.next; if (!_front) _back = null; --_count; if (!_count && !_finalized) { ulong val = void; linux.read(_fd, &val, val.sizeof); } return msg; } } // // Spawn and start a thread on the given run function. // FIXME - insist on arguments being shared or !hasAliasing. // void spawn(T...)(void function(T) run, string name, T args) { void exec() { myName = name; // myName is defined in logging try { run(args); } catch (Throwable ex) { log_error("Died with: %s", ex); exit(1); } } auto t = new Thread(&exec); t.start(); } //=========== Code-generating templates ================ // // Helper template to stamp out recursive string-defining templates // private template Paste(string name, string joiner) { enum string Paste = "\nstatic if (more) {" ~ "\n enum string " ~ name ~ " = _" ~ name ~ " ~ \"" ~ joiner ~ "\" ~ Next." ~ name ~ ";" ~ "\n}" ~ "\nelse {" ~ "\n enum string " ~ name ~ " = _" ~ name ~ ";" ~ "\n}"; } //------------------------------------------------------------------------------------- // Message - code-generating template that defines a streamable struct // and a bunch of strings used by higher-level templates. // // Expects a message name followed by pairs of parameter type and parameter name. // The parameter types must not have aliasing - ie cannot refer to non-immutable data. //------------------------------------------------------------------------------------- // generate strings used for Message code private template MessageStrings(string msgName, T...) { static assert(T.length > 1, "Message parameters must be in pairs"); enum more = T.length > 2; static if (more) { alias MessageStrings!(msgName, T[2..$]) Next; } static if (T.length > 0) { static assert(!hasAliasing!(T[0]), "Cannot use type " ~ T[0].stringof ~ " in a message"); static assert(is(typeof(T[1]) : string), "Message parameters must be named"); enum string _fieldStr = T[0].stringof ~ " " ~ T[1] ~ ";"; enum string _initStr = "this." ~ T[1] ~ " = " ~ T[1] ~ ";"; enum string _readStr = T[1] ~ " = stream.get!" ~ T[0].stringof ~ ";"; enum string _writeStr = "(" ~ T[1] ~ ")"; enum string _paramStr = T[0].stringof ~ " " ~ T[1]; enum string _nameStr = T[1]; enum string _callStr = "message." ~ msgName ~ "." ~ T[1]; mixin(Paste!("fieldStr", "\n ")); // field types and names in definition mixin(Paste!("initStr", "\n ")); // field assignments in constructor mixin(Paste!("readStr", "\n ")); // fields read from an InStream mixin(Paste!("writeStr", "")); // fields written to an OutStream mixin(Paste!("paramStr", ", ")); // field types and names, comma-separated mixin(Paste!("nameStr", ", ")); // field names, comma separated mixin(Paste!("callStr", ", ")); // field values accessed in an enclosing union, comma separated } } template Message(string name, T...) { static assert(T.length > 0, "Messages have to contain fields"); alias name msgName; alias MessageStrings!(name, T) strings; enum string code = "\n struct " ~ name ~ "Msg {" ~ "\n " ~ strings.fieldStr ~ "\n this(" ~ strings.paramStr ~ ") {" ~ "\n " ~ strings.initStr ~ "\n }" ~ "\n void read(InStream stream) {" ~ "\n " ~ strings.readStr ~ "\n }" ~ "\n void write(OutStream stream) {" ~ "\n stream" ~ strings.writeStr ~ ";" ~ "\n }" ~ "\n }"; } //---------------------------------------------------------------------------------- // Protocol - defines, for the given Messages: // * a discriminated union to hold any of them, // * a function that returns a channel to send them through, // * functions to send the messages, // * an interface to process dispatched messages with, and // * a function to receive and dispatch incoming messages. //-------------------------------------------------------------------------------- // // Helper to return strings for use in mixins by Protocol template. // All the strings roll out to include an entry for each Message in T... // private template ProtocolStrings(uint index, T...) { static assert(is(typeof(T[0].msgName) : string), "Protocols must comprise messages"); static assert(is(typeof(T[0].code) : string), "Trivial messages not supported yet"); enum more = T.length > 1; static if (more) { alias ProtocolStrings!(index+1, T[1..$]) Next; } enum string _msgStr = T[0].code; enum string _unionStr = "\n " ~ T[0].msgName ~ "Msg " ~ T[0].msgName ~ ";"; enum string _thisStr = "\n this(ref " ~ T[0].msgName ~ "Msg msg) {" ~ "\n kind = " ~ to!string(index) ~ ";" ~ "\n " ~ T[0].msgName ~ " = msg;" ~ "\n }"; enum string _readStr = "\n case " ~ to!string(index) ~ ": " ~ T[0].msgName ~ ".read(stream); break;"; enum string _writeStr = "\n case " ~ to!string(index) ~ ": " ~ T[0].msgName ~ ".write(stream); break;"; enum string _interfaceStr = "\n void " ~ T[0].msgName ~ "(" ~ T[0].strings.paramStr ~ ");"; enum string _sendStr = "\n void " ~ T[0].msgName ~ "(" ~ T[0].strings.paramStr ~ ") {" ~ "\n channel.add(Message(" ~ T[0].msgName ~ "Msg(" ~ T[0].strings.nameStr ~ ")));" ~ "\n }"; enum string _caseStr = "\n case " ~ to!string(index) ~ ":" ~ " handler." ~ T[0].msgName ~ "(" ~ T[0].strings.callStr ~ ");" ~ " break;"; mixin(Paste!("msgStr", "")); mixin(Paste!("unionStr", "")); mixin(Paste!("thisStr", "")); mixin(Paste!("readStr", "")); mixin(Paste!("writeStr", "")); mixin(Paste!("interfaceStr", "")); mixin(Paste!("sendStr", "")); mixin(Paste!("caseStr", "")); } // // Protocol - expects Messages as its template parameters. // The strings used in mixins are available for inspection should you need them. // template Protocol(string name, T...) { alias ProtocolStrings!(0, T) strings; alias strings.msgStr msgStr; // Define the discriminated union to hold any message enum string unionStr = "\n struct Message {" ~ "\n uint kind;" ~ "\n union { " ~ strings.unionStr ~ "\n }" ~ strings.thisStr ~ "\n this(InStream stream) {" ~ "\n kind = stream.get!uint;" ~ "\n switch(kind) {" ~ strings.readStr ~ "\n default: assert(0, \"Cannot read unsupported message kind\");" ~ "\n }" ~ "\n }" ~ "\n void write(OutStream stream) {" ~ "\n stream(kind);" ~ "\n switch(kind) {" ~ strings.writeStr ~ "\n default: assert(0, \"Cannot write unsupported message kind\");" ~ "\n }" ~ "\n }" ~ "\n }"; // Define the channel enum string channelStr = "\n private alias Channel!(Message) _Chan;" ~ "\n private alias shared _Chan Chan;" ~ "\n" ~ "\n private Chan channel;" ~ "\n this() { channel = new Chan(); }" ~ "\n ChannelSelectable newSelectable() { return channel.newSelectable(); }" ~ "\n void finalize() { channel.finalize; }" ~ "\n"; // Define the interface type: interface { void msg1_name(msg1_params); ... } enum string interfaceStr = "\n interface IHandler { " ~ strings.interfaceStr ~ "\n }"; // Define the send functions alias strings.sendStr sendStr; // Define the receive function enum string receiveStr = "\n void receive(IHandler handler) {" ~ "\n auto message = channel.remove;" ~ "\n switch (message.kind) {" ~ strings.caseStr ~ "\n default: assert(0, \"Cannot dispatch unsupported message kind\");" ~ "\n }" ~ "\n }"; enum string code = "\nclass " ~ name ~ " {" ~ msgStr ~ unionStr ~ channelStr ~ interfaceStr ~ sendStr ~ receiveStr ~ "\n}"; }