How to create Multi Producer-Single Consumer concurrency

Bagomot bagomot at gmail.com
Wed Jul 13 09:25:06 UTC 2022


On Tuesday, 16 June 2020 at 09:10:09 UTC, Ali Çehreli wrote:
> On 6/12/20 3:02 PM, adnan338 wrote:
>
> > So there are multiple "download finished" message producers,
> and one
> > consumer of those messages. Furthermore, that producer has a
> callback
> > that triggers an UI object.
>
> That's almost exactly what I do in some of my programs. I use 
> std.concurrency and the following is a working sketch of what I 
> do.
>
> I assumed you get finer individual granularity of progress as 
> opposed to the binary 0% -> 100%.
>
> import std.stdio;
> import std.concurrency;
> import std.algorithm;
> import std.range;
> import std.exception;
> import std.format;
> import core.thread;
>
> struct Progress {
>   Tid tid;          // The id of the reporting thread
>   size_t amount;    // The amount of progress so far
>   size_t total;     // Total progress (can be file size)
> }
>
> void display(Progress[Tid] progresses) {
>   const amount = progresses.byValue.map!(p => p.amount).sum;
>   const total = progresses.byValue.map!(p => p.total).sum;
>   writefln!"%6.2f%%"(100.0 * amount / total);
> }
>
> // The worker thread function
> void download(string url) {
>   writefln!"Worker %s downloading %s."(thisTid, url);
>   enum total = 20;
>   foreach (i; 0 .. total) {
>     // Imitate some progress
>     Thread.sleep(100.msecs);
>
>     // Report progress to owner
>     ownerTid.send(Progress(thisTid, i + 1, total));
>   }
> }
>
> void main() {
>   auto list = [ "dlang.org", "ddili.org" ];
>   auto downloaders = list.length
>                      .iota
>                      .map!(i => spawnLinked(&download, list[i]))
>                      .array;
>
>   Progress[Tid] progresses;
>   size_t finished = 0;
>
>   while (finished != list.length) {
>     receive(
>       (LinkTerminated arg) {
>         ++finished;
>
>         // Check whether this thread is exiting prematurely
>         enforce((arg.tid in progresses) &&
>                 (progresses[arg.tid].amount ==  
> progresses[arg.tid].total),
>                 format!"Thread %s exited 
> unexpectedly"(arg.tid));
>       },
>
>       (Progress progress) {
>         progresses[progress.tid] = progress;
>         progresses.display();
>       }
>     );
>   }
>
>   writeln("Processing the downloaded files.");
> }
>
> Ali

How to do the same with `taskPool` instead of `spawnLinked`?


More information about the Digitalmars-d-learn mailing list