std.concurrency.receive() and event demultiplexer
JR
zorael at gmail.com
Fri Aug 16 17:02:17 PDT 2013
On Friday, 9 August 2013 at 17:59:33 UTC, Ruslan Mullakhmetov
wrote:
> Now we could go further and overcome it with complication of
> logic:
> - timed wait for event on socket
> - when timeout occur check receive for incoming messages
> again with timeout
> - switch back to events waiting
>
> The drawbacks are obvious:
> - complicated logic
> - artificial switching between two demultiplexers: event loop
> and std.concurrency.receive()
> - need to choose good timeout to meet both: responsiveness
> and cpu load
>
> Alternatively it is possible to take away all blocking
> operations to another child thread, but this does not eliminate
> problem of resource freeing. with socket example this is
> dangerous with hanging socket descriptor and (1) not telling
> peer socket to shut up conversation (2) overflow of number of
> open file descriptors
I ended going that way with my small toy IRC bot; one thread to
*read* from the connected stream and pass on incoming lines, only
briefly checking for messages inbetween (short) stream read
timeouts; another thread to *write* to the same stream,
indefinitely blocking in std.concurrency.receive() until a string
comes along.
Somewhat dumbed-down excerpt with added clarifying comments;
/* --8<----8<----8<----8<----8<----8<----8<----8<----8<----8<--*/
__gshared Socket __gsocket; // *right* in the pride D:
__gshared SocketStream __gstream;
void serverRead() {
bool halt;
char[512] buf;
char[] slice;
Tid broker = locateTid("broker");
register("reader"); // thread string identifier
__gsocket.setOption(SocketOptionLevel.SOCKET,
SocketOption.RCVTIMEO, 5.seconds); // arbitrary value
// some template mixins to reduce duplicate code
mixin MessageActionLocal!(halt,true,Imperative.Abort)
killswitch;
mixin MessageActionLocal!(halt,true,OwnerTerminated)
ownerTerm;
mixin MessageAction!Variant variant; // for debugging
while (!halt) {
slice = __gstream.readLine(buf);
if (slice.length)
broker.send(slice.idup);
// we just want to check for queued messages,
// not block waiting for new ones, so timeout immediately
receiveTimeout(0.seconds,
&killswitch.trigger, // these set halt = true
&ownerTerm.trigger, // ^
&variant.doPrint // this just prints what it
received
);
}
}
void serverWrite() {
bool halt;
long prev;
register("writer");
// even so, ugh for the boilerplate that remains
mixin MessageActionLocal!(halt,true,Imperative.Abort)
killswitch;
mixin MessageActionLocal!(halt,true,OwnerTerminated)
ownerTerm;
mixin MessageAction!Variant variant; // likewise
while (!halt) {
receive(
(string text) {
__gstream.sendLine(text);
// heavily abbreviated; with only this you'll soon
// get kicked due to spam
},
&killswitch.trigger,
&ownerTerm.trigger,
&variant.doPrint
);
}
}
/* --8<----8<----8<----8<----8<----8<----8<----8<----8<----8<--*/
In particular I'm not happy about the __gshared resources, but
this works well enough for my purposes (again, IRC bot). I
initialize said socket and stream in the thread that spawns these
two, so while used by both, neither will close them when exiting
scope.
But yes, the reader keeps switching around. Both need to be able
to catch OwnerTerminated and some other choice imperatives of
import.
The longest stall will naturally be when it's sent a message
while blocked reading from the stream, but in this context 5
seconds is not that big a deal. Still, I wish I knew some other
way -- to salvage my pride, if nothing else.
More information about the Digitalmars-d
mailing list