dmd-concurrency
Shammah Chancellor
anonymous at coward.com
Fri Nov 22 14:34:19 PST 2013
On 2013-11-20 07:34:36 +0000, Chris Williams said:
> On Wednesday, 20 November 2013 at 04:24:14 UTC, Daniel Murphy wrote:
>> This is the correct forum to post phobos proposals on.
>
> Well then, here's what I had written:
>
> A few applications I've considered implementing seem like they would be
> easier if there was a channel-based messaging system in
> std.concurrency. I'm happy to do this implementation, but I thought I
> would try to get some sort of sign-off before doing so. Following, I
> will lay out my argument for the addition, and then the API that I am
> considering.
>
> ---
>
> One fairly common task is thread-pooling. With the standard
> send/receive model currently implemented, you have to choose a specific
> thread to target when you send a task. While it's true that you can
> simply iterate through your list of threads over and over, to spread
> the load evenly over them, that presumes that all tasks take even
> processing time. It makes more sense to be able to push data into a
> shared channel (secretly a work queue), and the first thread that
> finishes its previous task will be able to immediately pull the task
> before everyone else. This also means that the necessity of passing
> around references to your threads so that they can be looped over goes
> away.
>
> I haven't tested it, but it looks like this sort of thing might be
> quasi-possible using the register/unregister/locate methods. As each
> thread starts, it can register itself with a named group (i.e.
> channel), and then anyone who wants to send an item to an arbitrary
> thread in that group can call locate() to retrieve one thread and call
> send() against the Tid. The target thread would then need to unregister
> itself while it is doing work, then re-register itself. My complaint
> against this is the need to unregister and re-register. If the thread
> issuing commands sends a large number of tasks all at once, they will
> all go to the same thread (if coded poorly) or the caller will need to
> use yield() or sleep() to allow the target thread to receive the task
> and unregister, so that locate() can find a different thread. That's
> not terribly efficient. I am also concerned that there's the chance
> that all threads will be unregistered when we call locate(), whereas a
> channeling system would be able to expand the mailbox during the times
> that all threads are busy.
>
> The actual implementation within concurrency.d also concerns me as (if
> I read it correctly), the most recent item to register() will be the
> one which locate() finds, rather than the thread which has been
> registered the longest. While I suppose it's probably not too large of
> an issue if the same two threads keep taking all the tasks - that means
> that your load can't exceed two threads worth of processing power - it
> still seems like a LIFO system would be better. The registry is also
> based on an array rather than a set, which can make removal an O(n)
> operation, if the contents of the registry have to be shifted left, to
> fill an empty spot.
>
> Overall, I think that adding a shared message box system would be a
> straightforward way to improve the handling of thread pooling via the
> actor model.
>
> ---
>
> A less common use-case but I was also considering some world-simulators
> (e.g. for studying economics or building a game map) and here the
> ability to broadcast messages to a large set of other actors, based on
> location, interest, etc. seems useful. In this case, messages would
> need to be copied out to each subscriber in the channel rather than
> having an existence as a point to point connection. For a networked
> game, most likely you would want to break each channel into two, where
> locally all senders on a channel push to a single listener that pipes
> the messages over the network, and then remotely the messages would be
> broadcast to many listeners again, but that's a reasonably
> straightforward task for someone to implement on top of the channel
> functionality. I don't think that such functionality is needed in
> Phobos itself. Mostly, the presence of the broadcasting functionality
> in the standard library allows them to use the easy and safe actor
> model for more creative uses than a straight one-to-one pipe.
>
> ---
>
> Overall, my hope would be to develop something that is conceptually no
> more difficult to deal with than the current send()/receive() model,
> but also able to be used in a wide variety of ways. The API that I
> would propose to develop is:
>
> interface Channel {
> void send(T...)(T vals);
> void prioritySend(T...)(T vals);
> void receive(T...)(out Tid sender, T ops);
> receiveOnlyRet!(T) receiveOnly(T...)();
> bool receiveTimeout(T...)(Duration d, T ops);
>
> void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis);
> void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid)
> doThisFunc);
> }
>
> class SingleChannel : Channel {} // Send inserts a message into a
> shared message box. Receive removes message
>
> class DuplicateChannel(bool echo = true) : Channel {} // Send inserts
> the message into a message box per-recipient. Receive removes message
> in the calling thread's channel message box. If echo is false, messages
> will not be sent back to the sender, even if they are a registered
> listener
>
> void registerSend(Channel c, Tid tid = thisTid); // used by function
> sendAll(). Channel can be of either type
> void unregisterSend(Channel c, Tid tid = thisTid);
> void registerReceive(Channel c, Tid tid = thisTid); // used by function
> receiveAll(). Channel can be of either type
> void unregisterReceive(Channel c, Tid tid = thisTid);
>
> void sendAll(T...)(T ops); // Sends a copy of message to all channels
> this thread has registered for.
> void receiveAll(T...)(out Channel c, out Tid sender, T ops); //
> Receives a message of type T from any channel that we are registered
> for. Returns channel and sender
>
> I believe that the look and feel stays fairly consistent with the
> current set of functions in std.concurrency. I've added the ability for
> the recipient to infer information about the sender since, in the
> duplication model, I believe there are quite a few cases where this
> would be important information. And of course, I've added the option to
> register/unregister threads other than ourselves to allow a greater
> range of code layouts, though it's possible that the lack of this sort
> of thing in the original code is due to some sort of safety concern?
>
> The most straightforward way to implement the DuplicateChannel would be
> to use the individual threads' message boxes, but this would mean that
> data put into a channel could be pulled out via the traditional
> receive() method. Currently, my intention would be to partition these
> two systems (the direct send()/receive() model and the channel model),
> unless anyone has any reason to think they should be merged into a
> single whole?
>
> Those are my thoughts, anyways. Comments? Complaints?
How does one receive from multiple channels out-of-order? I would
rather this sent it to the subscribed Tid via send, rather than having
an additional queue. It could possible send a ChannelMessage which
has a reference to the sending channel and the message. I understand
this is a different model than what Go and whatnot use, but I think
it's more pratical in some circumstances. Maybe both ways would be
good? I personally use this method in my vibe-d server.
More information about the Digitalmars-d
mailing list