How to create Multi Producer-Single Consumer concurrency
Bagomot
bagomot at gmail.com
Wed Jul 13 09:25:06 UTC 2022
On Tuesday, 16 June 2020 at 09:10:09 UTC, Ali Çehreli wrote:
> On 6/12/20 3:02 PM, adnan338 wrote:
>
> > So there are multiple "download finished" message producers,
> and one
> > consumer of those messages. Furthermore, that producer has a
> callback
> > that triggers an UI object.
>
> That's almost exactly what I do in some of my programs. I use
> std.concurrency and the following is a working sketch of what I
> do.
>
> I assumed you get finer individual granularity of progress as
> opposed to the binary 0% -> 100%.
>
> import std.stdio;
> import std.concurrency;
> import std.algorithm;
> import std.range;
> import std.exception;
> import std.format;
> import core.thread;
>
> struct Progress {
> Tid tid; // The id of the reporting thread
> size_t amount; // The amount of progress so far
> size_t total; // Total progress (can be file size)
> }
>
> void display(Progress[Tid] progresses) {
> const amount = progresses.byValue.map!(p => p.amount).sum;
> const total = progresses.byValue.map!(p => p.total).sum;
> writefln!"%6.2f%%"(100.0 * amount / total);
> }
>
> // The worker thread function
> void download(string url) {
> writefln!"Worker %s downloading %s."(thisTid, url);
> enum total = 20;
> foreach (i; 0 .. total) {
> // Imitate some progress
> Thread.sleep(100.msecs);
>
> // Report progress to owner
> ownerTid.send(Progress(thisTid, i + 1, total));
> }
> }
>
> void main() {
> auto list = [ "dlang.org", "ddili.org" ];
> auto downloaders = list.length
> .iota
> .map!(i => spawnLinked(&download, list[i]))
> .array;
>
> Progress[Tid] progresses;
> size_t finished = 0;
>
> while (finished != list.length) {
> receive(
> (LinkTerminated arg) {
> ++finished;
>
> // Check whether this thread is exiting prematurely
> enforce((arg.tid in progresses) &&
> (progresses[arg.tid].amount ==
> progresses[arg.tid].total),
> format!"Thread %s exited
> unexpectedly"(arg.tid));
> },
>
> (Progress progress) {
> progresses[progress.tid] = progress;
> progresses.display();
> }
> );
> }
>
> writeln("Processing the downloaded files.");
> }
>
> Ali
How to do the same with `taskPool` instead of `spawnLinked`?
More information about the Digitalmars-d-learn
mailing list