std.concurrency.receive() and event demultiplexer

JR zorael at gmail.com
Fri Aug 16 17:02:17 PDT 2013


On Friday, 9 August 2013 at 17:59:33 UTC, Ruslan Mullakhmetov 
wrote:
> Now we could go further and overcome it with complication of 
> logic:
>   - timed wait for event on socket
>   - when timeout occur check receive for incoming messages 
> again with timeout
>   - switch back to events waiting
>
> The drawbacks are obvious:
>   - complicated logic
>   - artificial switching between two demultiplexers: event loop 
> and std.concurrency.receive()
>   - need to choose good timeout to meet both: responsiveness 
> and cpu load
>
> Alternatively it is possible to take away all blocking 
> operations to another child thread, but this does not eliminate 
> problem of resource freeing. with socket example this is 
> dangerous with hanging socket descriptor and (1) not telling 
> peer socket to shut up conversation (2) overflow of number of 
> open file descriptors

I ended going that way with my small toy IRC bot; one thread to 
*read* from the connected stream and pass on incoming lines, only 
briefly checking for messages inbetween (short) stream read 
timeouts; another thread to *write* to the same stream, 
indefinitely blocking in std.concurrency.receive() until a string 
comes along.

Somewhat dumbed-down excerpt with added clarifying comments;

/* --8<----8<----8<----8<----8<----8<----8<----8<----8<----8<--*/

__gshared Socket __gsocket;  // *right* in the pride D:
__gshared SocketStream __gstream;


void serverRead() {
     bool halt;
     char[512] buf;
     char[] slice;
     Tid broker = locateTid("broker");

     register("reader");  // thread string identifier

     __gsocket.setOption(SocketOptionLevel.SOCKET,
         SocketOption.RCVTIMEO, 5.seconds);  // arbitrary value

     // some template mixins to reduce duplicate code
     mixin MessageActionLocal!(halt,true,Imperative.Abort) 
killswitch;
     mixin MessageActionLocal!(halt,true,OwnerTerminated) 
ownerTerm;
     mixin MessageAction!Variant variant;  // for debugging

     while (!halt) {
         slice = __gstream.readLine(buf);
         if (slice.length)
             broker.send(slice.idup);

         // we just want to check for queued messages,
         // not block waiting for new ones, so timeout immediately

         receiveTimeout(0.seconds,
             &killswitch.trigger,  // these set halt = true
             &ownerTerm.trigger,   // ^
             &variant.doPrint      // this just prints what it 
received
         );
     }
}


void serverWrite() {
     bool halt;
     long prev;

     register("writer");

     // even so, ugh for the boilerplate that remains
     mixin MessageActionLocal!(halt,true,Imperative.Abort) 
killswitch;
     mixin MessageActionLocal!(halt,true,OwnerTerminated) 
ownerTerm;
     mixin MessageAction!Variant variant;  // likewise

     while (!halt) {
         receive(
             (string text) {
                 __gstream.sendLine(text);
                 // heavily abbreviated; with only this you'll soon
                 // get kicked due to spam
             },
             &killswitch.trigger,
             &ownerTerm.trigger,
             &variant.doPrint
         );
     }
}

/* --8<----8<----8<----8<----8<----8<----8<----8<----8<----8<--*/


In particular I'm not happy about the __gshared resources, but 
this works well enough for my purposes (again, IRC bot). I 
initialize said socket and stream in the thread that spawns these 
two, so while used by both, neither will close them when exiting 
scope.

But yes, the reader keeps switching around. Both need to be able 
to catch OwnerTerminated and some other choice imperatives of 
import.

The longest stall will naturally be when it's sent a message 
while blocked reading from the stream, but in this context 5 
seconds is not that big a deal. Still, I wish I knew some other 
way -- to salvage my pride, if nothing else.


More information about the Digitalmars-d mailing list