std.concurrency thread communication problem

Ali Çehreli via Digitalmars-d-learn digitalmars-d-learn at puremagic.com
Sat May 17 12:59:22 PDT 2014


On 05/17/2014 12:33 PM, John Colvin wrote:
> On Saturday, 17 May 2014 at 18:43:25 UTC, Charles Hixson via
> Digitalmars-d-learn wrote:
>> I'm building a program which I intend to have many threads that can
>> each send
>> messages to (and receive messages from) each other.  The obvious way
>> to do
>> this would be to have a shared array of Tids, but this seems to not
>> work.  I'm
>> continually fighting the system to get it to compile, and this makes
>> me think
>> it should probably be done some other way...but what?
>>
>> One possibility is to have each thread maintain a separate array that
>> contains
>> all the threads, which would mean that they would need to be
>> initialized after
>> they were created.  This would avoid the problems of shared Tids, but
>> each Tid
>> contains a private mailbox, so this would be being duplicated, and that
>> bothers me...it seems like a poor idea.  (Maybe I'm wrong about
>> that...but I
>> don't know.)
>
> If my understanding is correct, each Tid contains a reference to the
> corresponding thread's MessageBox (implemented by way of MessageBox
> being a class), not an independent instance. You should be fine to just
> have an array of the relevant Tids in each thread.
>
> Alternatively, a single __gshared array of threads should work, given
> you are sufficiently careful with it. Remember, if no-one is doing any
> writing then you don't need to do any synchronisation of reads.

The following is what I've come up with. I had to use a number of 
shared-related casts.

import std.stdio;
import std.concurrency;
import std.datetime;
import std.random;
import core.thread;

enum threadCount = 5;
enum messagePerThread = 3;

// Represents messages sent to threads to start their tasks
struct Start
{}

// Receives the number (id) of this thread and the workers to send 
messages to
void workerFunc(size_t id, shared(Tid)[] workers)
{
     receiveOnly!Start();

     // A local function to reduce code duplication
     bool checkMessageForMe(Duration timeout)
     {
         return receiveTimeout(
             timeout,
             (size_t from) {
                 writefln("%s received from %s", id, from);
             });
     }

     // My main task is to send messages to others:
     size_t totalSent = 0;
     while (totalSent < messagePerThread) {
         auto to = uniform(0, workers.length);

         // Only send to others; not to self
         if (to != id) {
             auto chosen = cast(Tid)workers[to];
             writefln("%s sending to %s", id, to);
             chosen.send(id);
             ++totalSent;
         }

         checkMessageForMe(0.seconds);
     }

     // Process trailing messages sent to me
     bool received = false;
     do {
         received = checkMessageForMe(10.msecs);
     } while (received);
}

void main()
{
     auto workers = new shared(Tid)[threadCount];

     foreach (id; 0 .. threadCount) {
         auto worker = spawn(&workerFunc, id, workers);
         workers[id] = cast(shared(Tid))worker;
     }

     foreach (sharedWorker; workers) {
         auto worker = cast(Tid)sharedWorker;
         worker.send(Start());
     }

     thread_joinAll();
}

Sample output:

0 sending to 2
4 sending to 3
4 sending to 2
1 sending to 4
3 received from 4
3 sending to 2
0 sending to 1
4 received from 1
1 received from 0
1 sending to 0
0 received from 1
0 sending to 1
1 received from 0
1 sending to 0
0 received from 1
3 sending to 2
4 sending to 2
2 sending to 0
2 received from 0
2 received from 4
3 sending to 1
2 sending to 3
0 received from 2
1 received from 3
2 received from 3
2 sending to 0
3 received from 2
0 received from 2
2 received from 3
2 received from 4

Ali



More information about the Digitalmars-d-learn mailing list