dmd-concurrency

Shammah Chancellor anonymous at coward.com
Fri Nov 22 14:34:19 PST 2013


On 2013-11-20 07:34:36 +0000, Chris Williams said:

> On Wednesday, 20 November 2013 at 04:24:14 UTC, Daniel Murphy wrote:
>> This is the correct forum to post phobos proposals on.
> 
> Well then, here's what I had written:
> 
> A few applications I've considered implementing seem like they would be 
> easier if there was a channel-based messaging system in 
> std.concurrency. I'm happy to do this implementation, but I thought I 
> would try to get some sort of sign-off before doing so. Following, I 
> will lay out my argument for the addition, and then the API that I am 
> considering.
> 
> ---
> 
> One fairly common task is thread-pooling. With the standard 
> send/receive model currently implemented, you have to choose a specific 
> thread to target when you send a task. While it's true that you can 
> simply iterate through your list of threads over and over, to spread 
> the load evenly over them, that presumes that all tasks take even 
> processing time. It makes more sense to be able to push data into a 
> shared channel (secretly a work queue), and the first thread that 
> finishes its previous task will be able to immediately pull the task 
> before everyone else. This also means that the necessity of passing 
> around references to your threads so that they can be looped over goes 
> away.
> 
> I haven't tested it, but it looks like this sort of thing might be 
> quasi-possible using the register/unregister/locate methods. As each 
> thread starts, it can register itself with a named group (i.e. 
> channel), and then anyone who wants to send an item to an arbitrary 
> thread in that group can call locate() to retrieve one thread and call 
> send() against the Tid. The target thread would then need to unregister 
> itself while it is doing work, then re-register itself. My complaint 
> against this is the need to unregister and re-register. If the thread 
> issuing commands sends a large number of tasks all at once, they will 
> all go to the same thread (if coded poorly) or the caller will need to 
> use yield() or sleep() to allow the target thread to receive the task 
> and unregister, so that locate() can find a different thread. That's 
> not terribly efficient. I am also concerned that there's the chance 
> that all threads will be unregistered when we call locate(), whereas a 
> channeling system would be able to expand the mailbox during the times 
> that all threads are busy.
> 
> The actual implementation within concurrency.d also concerns me as (if 
> I read it correctly), the most recent item to register() will be the 
> one which locate() finds, rather than the thread which has been 
> registered the longest. While I suppose it's probably not too large of 
> an issue if the same two threads keep taking all the tasks - that means 
> that your load can't exceed two threads worth of processing power - it 
> still seems like a LIFO system would be better. The registry is also 
> based on an array rather than a set, which can make removal an O(n) 
> operation, if the contents of the registry have to be shifted left, to 
> fill an empty spot.
> 
> Overall, I think that adding a shared message box system would be a 
> straightforward way to improve the handling of thread pooling via the 
> actor model.
> 
> ---
> 
> A less common use-case but I was also considering some world-simulators 
> (e.g. for studying economics or building a game map) and here the 
> ability to broadcast messages to a large set of other actors, based on 
> location, interest, etc. seems useful. In this case, messages would 
> need to be copied out to each subscriber in the channel rather than 
> having an existence as a point to point connection. For a networked 
> game, most likely you would want to break each channel into two, where 
> locally all senders on a channel push to a single listener that pipes 
> the messages over the network, and then remotely the messages would be 
> broadcast to many listeners again, but that's a reasonably 
> straightforward task for someone to implement on top of the channel 
> functionality. I don't think that such functionality is needed in 
> Phobos itself. Mostly, the presence of the broadcasting functionality 
> in the standard library allows them to use the easy and safe actor 
> model for more creative uses than a straight one-to-one pipe.
> 
> ---
> 
> Overall, my hope would be to develop something that is conceptually no 
> more difficult to deal with than the current send()/receive() model, 
> but also able to be used in a wide variety of ways. The API that I 
> would propose to develop is:
> 
> interface Channel {
> 	void send(T...)(T vals);
> 	void prioritySend(T...)(T vals);
> 	void receive(T...)(out Tid sender, T ops);
> 	receiveOnlyRet!(T) receiveOnly(T...)();
> 	bool receiveTimeout(T...)(Duration d, T ops);
> 
> 	void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis);
> 	void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) 
> doThisFunc);
> }
> 
> class SingleChannel : Channel {}	// Send inserts a message into a 
> shared message box. Receive removes message
> 
> class DuplicateChannel(bool echo = true) : Channel {}	// Send inserts 
> the message into a message box per-recipient. Receive removes message 
> in the calling thread's channel message box. If echo is false, messages 
> will not be sent back to the sender, even if they are a registered 
> listener
> 
> void registerSend(Channel c, Tid tid = thisTid);	// used by function 
> sendAll(). Channel can be of either type
> void unregisterSend(Channel c, Tid tid = thisTid);
> void registerReceive(Channel c, Tid tid = thisTid);	// used by function 
> receiveAll(). Channel can be of either type
> void unregisterReceive(Channel c, Tid tid = thisTid);
> 
> void sendAll(T...)(T ops); // Sends a copy of message to all channels 
> this thread has registered for.
> void receiveAll(T...)(out Channel c, out Tid sender, T ops); // 
> Receives a message of type T from any channel that we are registered 
> for. Returns channel and sender
> 
> I believe that the look and feel stays fairly consistent with the 
> current set of functions in std.concurrency. I've added the ability for 
> the recipient to infer information about the sender since, in the 
> duplication model, I believe there are quite a few cases where this 
> would be important information. And of course, I've added the option to 
> register/unregister threads other than ourselves to allow a greater 
> range of code layouts, though it's possible that the lack of this sort 
> of thing in the original code is due to some sort of safety concern?
> 
> The most straightforward way to implement the DuplicateChannel would be 
> to use the individual threads' message boxes, but this would mean that 
> data put into a channel could be pulled out via the traditional 
> receive() method. Currently, my intention would be to partition these 
> two systems (the direct send()/receive() model and the channel model), 
> unless anyone has any reason to think they should be merged into a 
> single whole?
> 
> Those are my thoughts, anyways. Comments? Complaints?

How does one receive from multiple channels out-of-order?   I would 
rather this sent it to the subscribed Tid via send, rather than having 
an additional queue.   It could possible send a ChannelMessage which 
has a reference to the sending channel and the message.   I understand 
this is a different model than what Go and whatnot use, but I think 
it's more pratical in some circumstances.   Maybe both ways would be 
good?  I personally use this method in my vibe-d server.



More information about the Digitalmars-d mailing list