std.concurrency thread communication problem
Ali Çehreli via Digitalmars-d-learn
digitalmars-d-learn at puremagic.com
Sat May 17 12:59:22 PDT 2014
On 05/17/2014 12:33 PM, John Colvin wrote:
> On Saturday, 17 May 2014 at 18:43:25 UTC, Charles Hixson via
> Digitalmars-d-learn wrote:
>> I'm building a program which I intend to have many threads that can
>> each send
>> messages to (and receive messages from) each other. The obvious way
>> to do
>> this would be to have a shared array of Tids, but this seems to not
>> work. I'm
>> continually fighting the system to get it to compile, and this makes
>> me think
>> it should probably be done some other way...but what?
>>
>> One possibility is to have each thread maintain a separate array that
>> contains
>> all the threads, which would mean that they would need to be
>> initialized after
>> they were created. This would avoid the problems of shared Tids, but
>> each Tid
>> contains a private mailbox, so this would be being duplicated, and that
>> bothers me...it seems like a poor idea. (Maybe I'm wrong about
>> that...but I
>> don't know.)
>
> If my understanding is correct, each Tid contains a reference to the
> corresponding thread's MessageBox (implemented by way of MessageBox
> being a class), not an independent instance. You should be fine to just
> have an array of the relevant Tids in each thread.
>
> Alternatively, a single __gshared array of threads should work, given
> you are sufficiently careful with it. Remember, if no-one is doing any
> writing then you don't need to do any synchronisation of reads.
The following is what I've come up with. I had to use a number of
shared-related casts.
import std.stdio;
import std.concurrency;
import std.datetime;
import std.random;
import core.thread;
enum threadCount = 5;
enum messagePerThread = 3;
// Represents messages sent to threads to start their tasks
struct Start
{}
// Receives the number (id) of this thread and the workers to send
messages to
void workerFunc(size_t id, shared(Tid)[] workers)
{
receiveOnly!Start();
// A local function to reduce code duplication
bool checkMessageForMe(Duration timeout)
{
return receiveTimeout(
timeout,
(size_t from) {
writefln("%s received from %s", id, from);
});
}
// My main task is to send messages to others:
size_t totalSent = 0;
while (totalSent < messagePerThread) {
auto to = uniform(0, workers.length);
// Only send to others; not to self
if (to != id) {
auto chosen = cast(Tid)workers[to];
writefln("%s sending to %s", id, to);
chosen.send(id);
++totalSent;
}
checkMessageForMe(0.seconds);
}
// Process trailing messages sent to me
bool received = false;
do {
received = checkMessageForMe(10.msecs);
} while (received);
}
void main()
{
auto workers = new shared(Tid)[threadCount];
foreach (id; 0 .. threadCount) {
auto worker = spawn(&workerFunc, id, workers);
workers[id] = cast(shared(Tid))worker;
}
foreach (sharedWorker; workers) {
auto worker = cast(Tid)sharedWorker;
worker.send(Start());
}
thread_joinAll();
}
Sample output:
0 sending to 2
4 sending to 3
4 sending to 2
1 sending to 4
3 received from 4
3 sending to 2
0 sending to 1
4 received from 1
1 received from 0
1 sending to 0
0 received from 1
0 sending to 1
1 received from 0
1 sending to 0
0 received from 1
3 sending to 2
4 sending to 2
2 sending to 0
2 received from 0
2 received from 4
3 sending to 1
2 sending to 3
0 received from 2
1 received from 3
2 received from 3
2 sending to 0
3 received from 2
0 received from 2
2 received from 3
2 received from 4
Ali
More information about the Digitalmars-d-learn
mailing list