How to create Multi Producer-Single Consumer concurrency
Ali Çehreli
acehreli at yahoo.com
Tue Jun 16 09:10:09 UTC 2020
On 6/12/20 3:02 PM, adnan338 wrote:
> So there are multiple "download finished" message producers, and one
> consumer of those messages. Furthermore, that producer has a callback
> that triggers an UI object.
That's almost exactly what I do in some of my programs. I use
std.concurrency and the following is a working sketch of what I do.
I assumed you get finer individual granularity of progress as opposed to
the binary 0% -> 100%.
import std.stdio;
import std.concurrency;
import std.algorithm;
import std.range;
import std.exception;
import std.format;
import core.thread;
struct Progress {
Tid tid; // The id of the reporting thread
size_t amount; // The amount of progress so far
size_t total; // Total progress (can be file size)
}
void display(Progress[Tid] progresses) {
const amount = progresses.byValue.map!(p => p.amount).sum;
const total = progresses.byValue.map!(p => p.total).sum;
writefln!"%6.2f%%"(100.0 * amount / total);
}
// The worker thread function
void download(string url) {
writefln!"Worker %s downloading %s."(thisTid, url);
enum total = 20;
foreach (i; 0 .. total) {
// Imitate some progress
Thread.sleep(100.msecs);
// Report progress to owner
ownerTid.send(Progress(thisTid, i + 1, total));
}
}
void main() {
auto list = [ "dlang.org", "ddili.org" ];
auto downloaders = list.length
.iota
.map!(i => spawnLinked(&download, list[i]))
.array;
Progress[Tid] progresses;
size_t finished = 0;
while (finished != list.length) {
receive(
(LinkTerminated arg) {
++finished;
// Check whether this thread is exiting prematurely
enforce((arg.tid in progresses) &&
(progresses[arg.tid].amount == progresses[arg.tid].total),
format!"Thread %s exited unexpectedly"(arg.tid));
},
(Progress progress) {
progresses[progress.tid] = progress;
progresses.display();
}
);
}
writeln("Processing the downloaded files.");
}
Ali
More information about the Digitalmars-d-learn
mailing list