How to create Multi Producer-Single Consumer concurrency

Ali Çehreli acehreli at yahoo.com
Tue Jun 16 09:10:09 UTC 2020


On 6/12/20 3:02 PM, adnan338 wrote:

 > So there are multiple "download finished" message producers, and one
 > consumer of those messages. Furthermore, that producer has a callback
 > that triggers an UI object.

That's almost exactly what I do in some of my programs. I use 
std.concurrency and the following is a working sketch of what I do.

I assumed you get finer individual granularity of progress as opposed to 
the binary 0% -> 100%.

import std.stdio;
import std.concurrency;
import std.algorithm;
import std.range;
import std.exception;
import std.format;
import core.thread;

struct Progress {
   Tid tid;          // The id of the reporting thread
   size_t amount;    // The amount of progress so far
   size_t total;     // Total progress (can be file size)
}

void display(Progress[Tid] progresses) {
   const amount = progresses.byValue.map!(p => p.amount).sum;
   const total = progresses.byValue.map!(p => p.total).sum;
   writefln!"%6.2f%%"(100.0 * amount / total);
}

// The worker thread function
void download(string url) {
   writefln!"Worker %s downloading %s."(thisTid, url);
   enum total = 20;
   foreach (i; 0 .. total) {
     // Imitate some progress
     Thread.sleep(100.msecs);

     // Report progress to owner
     ownerTid.send(Progress(thisTid, i + 1, total));
   }
}

void main() {
   auto list = [ "dlang.org", "ddili.org" ];
   auto downloaders = list.length
                      .iota
                      .map!(i => spawnLinked(&download, list[i]))
                      .array;

   Progress[Tid] progresses;
   size_t finished = 0;

   while (finished != list.length) {
     receive(
       (LinkTerminated arg) {
         ++finished;

         // Check whether this thread is exiting prematurely
         enforce((arg.tid in progresses) &&
                 (progresses[arg.tid].amount ==  progresses[arg.tid].total),
                 format!"Thread %s exited unexpectedly"(arg.tid));
       },

       (Progress progress) {
         progresses[progress.tid] = progress;
         progresses.display();
       }
     );
   }

   writeln("Processing the downloaded files.");
}

Ali



More information about the Digitalmars-d-learn mailing list