dmd-concurrency
Chris Williams
yoreanon-chrisw at yahoo.co.jp
Tue Nov 19 23:34:36 PST 2013
On Wednesday, 20 November 2013 at 04:24:14 UTC, Daniel Murphy
wrote:
> This is the correct forum to post phobos proposals on.
Well then, here's what I had written:
A few applications I've considered implementing seem like they
would be easier if there was a channel-based messaging system in
std.concurrency. I'm happy to do this implementation, but I
thought I would try to get some sort of sign-off before doing so.
Following, I will lay out my argument for the addition, and then
the API that I am considering.
---
One fairly common task is thread-pooling. With the standard
send/receive model currently implemented, you have to choose a
specific thread to target when you send a task. While it's true
that you can simply iterate through your list of threads over and
over, to spread the load evenly over them, that presumes that all
tasks take even processing time. It makes more sense to be able
to push data into a shared channel (secretly a work queue), and
the first thread that finishes its previous task will be able to
immediately pull the task before everyone else. This also means
that the necessity of passing around references to your threads
so that they can be looped over goes away.
I haven't tested it, but it looks like this sort of thing might
be quasi-possible using the register/unregister/locate methods.
As each thread starts, it can register itself with a named group
(i.e. channel), and then anyone who wants to send an item to an
arbitrary thread in that group can call locate() to retrieve one
thread and call send() against the Tid. The target thread would
then need to unregister itself while it is doing work, then
re-register itself. My complaint against this is the need to
unregister and re-register. If the thread issuing commands sends
a large number of tasks all at once, they will all go to the same
thread (if coded poorly) or the caller will need to use yield()
or sleep() to allow the target thread to receive the task and
unregister, so that locate() can find a different thread. That's
not terribly efficient. I am also concerned that there's the
chance that all threads will be unregistered when we call
locate(), whereas a channeling system would be able to expand the
mailbox during the times that all threads are busy.
The actual implementation within concurrency.d also concerns me
as (if I read it correctly), the most recent item to register()
will be the one which locate() finds, rather than the thread
which has been registered the longest. While I suppose it's
probably not too large of an issue if the same two threads keep
taking all the tasks - that means that your load can't exceed two
threads worth of processing power - it still seems like a LIFO
system would be better. The registry is also based on an array
rather than a set, which can make removal an O(n) operation, if
the contents of the registry have to be shifted left, to fill an
empty spot.
Overall, I think that adding a shared message box system would be
a straightforward way to improve the handling of thread pooling
via the actor model.
---
A less common use-case but I was also considering some
world-simulators (e.g. for studying economics or building a game
map) and here the ability to broadcast messages to a large set of
other actors, based on location, interest, etc. seems useful. In
this case, messages would need to be copied out to each
subscriber in the channel rather than having an existence as a
point to point connection. For a networked game, most likely you
would want to break each channel into two, where locally all
senders on a channel push to a single listener that pipes the
messages over the network, and then remotely the messages would
be broadcast to many listeners again, but that's a reasonably
straightforward task for someone to implement on top of the
channel functionality. I don't think that such functionality is
needed in Phobos itself. Mostly, the presence of the broadcasting
functionality in the standard library allows them to use the easy
and safe actor model for more creative uses than a straight
one-to-one pipe.
---
Overall, my hope would be to develop something that is
conceptually no more difficult to deal with than the current
send()/receive() model, but also able to be used in a wide
variety of ways. The API that I would propose to develop is:
interface Channel {
void send(T...)(T vals);
void prioritySend(T...)(T vals);
void receive(T...)(out Tid sender, T ops);
receiveOnlyRet!(T) receiveOnly(T...)();
bool receiveTimeout(T...)(Duration d, T ops);
void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding
doThis);
void setMaxMailboxSize(Tid tid, size_t messages, bool
function(Tid) doThisFunc);
}
class SingleChannel : Channel {} // Send inserts a message into a
shared message box. Receive removes message
class DuplicateChannel(bool echo = true) : Channel {} // Send
inserts the message into a message box per-recipient. Receive
removes message in the calling thread's channel message box. If
echo is false, messages will not be sent back to the sender, even
if they are a registered listener
void registerSend(Channel c, Tid tid = thisTid); // used by
function sendAll(). Channel can be of either type
void unregisterSend(Channel c, Tid tid = thisTid);
void registerReceive(Channel c, Tid tid = thisTid); // used by
function receiveAll(). Channel can be of either type
void unregisterReceive(Channel c, Tid tid = thisTid);
void sendAll(T...)(T ops); // Sends a copy of message to all
channels this thread has registered for.
void receiveAll(T...)(out Channel c, out Tid sender, T ops); //
Receives a message of type T from any channel that we are
registered for. Returns channel and sender
I believe that the look and feel stays fairly consistent with the
current set of functions in std.concurrency. I've added the
ability for the recipient to infer information about the sender
since, in the duplication model, I believe there are quite a few
cases where this would be important information. And of course,
I've added the option to register/unregister threads other than
ourselves to allow a greater range of code layouts, though it's
possible that the lack of this sort of thing in the original code
is due to some sort of safety concern?
The most straightforward way to implement the DuplicateChannel
would be to use the individual threads' message boxes, but this
would mean that data put into a channel could be pulled out via
the traditional receive() method. Currently, my intention would
be to partition these two systems (the direct send()/receive()
model and the channel model), unless anyone has any reason to
think they should be merged into a single whole?
Those are my thoughts, anyways. Comments? Complaints?
More information about the Digitalmars-d
mailing list