Structured Concurrency vs P2300R7
Sebastiaan Koppe
mail at skoppe.eu
Sun Apr 7 17:48:43 UTC 2024
On Sunday, 7 April 2024 at 10:33:08 UTC, Paolo Invernizzi wrote:
> What the way to perform the classic D example:
>
> ```
> auto logs = new double[1_000_000];
>
> foreach (i, ref elem; parallel(logs))
> {
> elem = log(i + 1.0);
> }
> ```
>
> Are Streams involved?
>
> Thank you
Contrary to P2300, the `whenAll` implementation in the
concurrency library accepts an array as well.
```
import concurrency.thread : stdTaskPool;
import concurrency.operations : whenAll;
import concurrency : syncWait;
import std.algorithm : map;
import std.array : array;
auto pool = stdTaskPool(32);
auto scheduler = pool.getScheduler();
auto logs = new double[1_000_000]
logs
.map!((i) =>
just(i)
.then((double i) => log(i + 1.0))
.on(scheduler)
)
.array
.whenAll
.syncWait
.value;
```
`whenAll` should be able to work with ranges that provide a size,
but it doesn't yet, therefor a call to `.array` is required.
Note this is not quite equivalent to your code, since it doesn't
mutate the array in-place. You can do that using:
```
.map!((ref i) =>
just(&i)
.then((double* i) => *i = log(*i + 1.0))
.on(scheduler)
)
```
Unfortunately `then` isn't smart enough to allow `ref double i`,
so you have to deal with ugly pointers. Something to improve.
---
As for streams, they are actually something that I am going to
deprecate for the newer Sequence concept.
While that code hasn't been written, the api would allow you to
do the following:
```
auto result = logs
.sequence()
.transform((double i) => log(i + 1.0))
.parallelize(pool.getScheduler)
.toList
.syncWait
.value
```
For the in-place mutation I suppose we could add a `refSequence`
(then we don't need `.toList` anymore).
Yet another way would be to use an `asyncScope` and spawn
individual tasks:
```
import concurrency.thread : stdTaskPool;
import concurrency.operations : whenAll;
import concurrency : syncWait;
import std.algorithm : map;
import std.array : array;
import concurrency.asyncscope;
auto pool = stdTaskPool(32);
auto scheduler = pool.getScheduler();
auto scp = asyncScope();
foreach (i, ref elem; logs) {
scp.spawn(
just(&elem)
.then((double* elem)) => *elem = log(i + 1.0))
.on(scheduler)
);
}
scp.cleanup.syncWait();
```
More information about the Digitalmars-d
mailing list