dmd-concurrency

Chris Williams yoreanon-chrisw at yahoo.co.jp
Tue Nov 19 23:34:36 PST 2013


On Wednesday, 20 November 2013 at 04:24:14 UTC, Daniel Murphy 
wrote:
> This is the correct forum to post phobos proposals on.

Well then, here's what I had written:

A few applications I've considered implementing seem like they 
would be easier if there was a channel-based messaging system in 
std.concurrency. I'm happy to do this implementation, but I 
thought I would try to get some sort of sign-off before doing so. 
Following, I will lay out my argument for the addition, and then 
the API that I am considering.

---

One fairly common task is thread-pooling. With the standard 
send/receive model currently implemented, you have to choose a 
specific thread to target when you send a task. While it's true 
that you can simply iterate through your list of threads over and 
over, to spread the load evenly over them, that presumes that all 
tasks take even processing time. It makes more sense to be able 
to push data into a shared channel (secretly a work queue), and 
the first thread that finishes its previous task will be able to 
immediately pull the task before everyone else. This also means 
that the necessity of passing around references to your threads 
so that they can be looped over goes away.

I haven't tested it, but it looks like this sort of thing might 
be quasi-possible using the register/unregister/locate methods. 
As each thread starts, it can register itself with a named group 
(i.e. channel), and then anyone who wants to send an item to an 
arbitrary thread in that group can call locate() to retrieve one 
thread and call send() against the Tid. The target thread would 
then need to unregister itself while it is doing work, then 
re-register itself. My complaint against this is the need to 
unregister and re-register. If the thread issuing commands sends 
a large number of tasks all at once, they will all go to the same 
thread (if coded poorly) or the caller will need to use yield() 
or sleep() to allow the target thread to receive the task and 
unregister, so that locate() can find a different thread. That's 
not terribly efficient. I am also concerned that there's the 
chance that all threads will be unregistered when we call 
locate(), whereas a channeling system would be able to expand the 
mailbox during the times that all threads are busy.

The actual implementation within concurrency.d also concerns me 
as (if I read it correctly), the most recent item to register() 
will be the one which locate() finds, rather than the thread 
which has been registered the longest. While I suppose it's 
probably not too large of an issue if the same two threads keep 
taking all the tasks - that means that your load can't exceed two 
threads worth of processing power - it still seems like a LIFO 
system would be better. The registry is also based on an array 
rather than a set, which can make removal an O(n) operation, if 
the contents of the registry have to be shifted left, to fill an 
empty spot.

Overall, I think that adding a shared message box system would be 
a straightforward way to improve the handling of thread pooling 
via the actor model.

---

A less common use-case but I was also considering some 
world-simulators (e.g. for studying economics or building a game 
map) and here the ability to broadcast messages to a large set of 
other actors, based on location, interest, etc. seems useful. In 
this case, messages would need to be copied out to each 
subscriber in the channel rather than having an existence as a 
point to point connection. For a networked game, most likely you 
would want to break each channel into two, where locally all 
senders on a channel push to a single listener that pipes the 
messages over the network, and then remotely the messages would 
be broadcast to many listeners again, but that's a reasonably 
straightforward task for someone to implement on top of the 
channel functionality. I don't think that such functionality is 
needed in Phobos itself. Mostly, the presence of the broadcasting 
functionality in the standard library allows them to use the easy 
and safe actor model for more creative uses than a straight 
one-to-one pipe.

---

Overall, my hope would be to develop something that is 
conceptually no more difficult to deal with than the current 
send()/receive() model, but also able to be used in a wide 
variety of ways. The API that I would propose to develop is:

interface Channel {
	void send(T...)(T vals);
	void prioritySend(T...)(T vals);
	void receive(T...)(out Tid sender, T ops);
	receiveOnlyRet!(T) receiveOnly(T...)();
	bool receiveTimeout(T...)(Duration d, T ops);

	void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding 
doThis);
	void setMaxMailboxSize(Tid tid, size_t messages, bool 
function(Tid) doThisFunc);
}

class SingleChannel : Channel {}	// Send inserts a message into a 
shared message box. Receive removes message

class DuplicateChannel(bool echo = true) : Channel {}	// Send 
inserts the message into a message box per-recipient. Receive 
removes message in the calling thread's channel message box. If 
echo is false, messages will not be sent back to the sender, even 
if they are a registered listener

void registerSend(Channel c, Tid tid = thisTid);	// used by 
function sendAll(). Channel can be of either type
void unregisterSend(Channel c, Tid tid = thisTid);
void registerReceive(Channel c, Tid tid = thisTid);	// used by 
function receiveAll(). Channel can be of either type
void unregisterReceive(Channel c, Tid tid = thisTid);

void sendAll(T...)(T ops); // Sends a copy of message to all 
channels this thread has registered for.
void receiveAll(T...)(out Channel c, out Tid sender, T ops); // 
Receives a message of type T from any channel that we are 
registered for. Returns channel and sender

I believe that the look and feel stays fairly consistent with the 
current set of functions in std.concurrency. I've added the 
ability for the recipient to infer information about the sender 
since, in the duplication model, I believe there are quite a few 
cases where this would be important information. And of course, 
I've added the option to register/unregister threads other than 
ourselves to allow a greater range of code layouts, though it's 
possible that the lack of this sort of thing in the original code 
is due to some sort of safety concern?

The most straightforward way to implement the DuplicateChannel 
would be to use the individual threads' message boxes, but this 
would mean that data put into a channel could be pulled out via 
the traditional receive() method. Currently, my intention would 
be to partition these two systems (the direct send()/receive() 
model and the channel model), unless anyone has any reason to 
think they should be merged into a single whole?

Those are my thoughts, anyways. Comments? Complaints?


More information about the Digitalmars-d mailing list