Structured Concurrency vs P2300R7

Sebastiaan Koppe mail at skoppe.eu
Sun Apr 7 17:48:43 UTC 2024


On Sunday, 7 April 2024 at 10:33:08 UTC, Paolo Invernizzi wrote:
> What the way to perform the classic D example:
>
> ```
> auto logs = new double[1_000_000];
>
> foreach (i, ref elem; parallel(logs))
> {
>     elem = log(i + 1.0);
> }
> ```
>
> Are Streams involved?
>
> Thank you

Contrary to P2300, the `whenAll` implementation in the 
concurrency library accepts an array as well.

```
import concurrency.thread : stdTaskPool;
import concurrency.operations : whenAll;
import concurrency : syncWait;
import std.algorithm : map;
import std.array : array;

auto pool = stdTaskPool(32);
auto scheduler = pool.getScheduler();
auto logs = new double[1_000_000]

logs
     .map!((i) =>
         just(i)
             .then((double i) => log(i + 1.0))
             .on(scheduler)
         )
     .array
     .whenAll
     .syncWait
     .value;
```

`whenAll` should be able to work with ranges that provide a size, 
but it doesn't yet, therefor a call to `.array` is required.

Note this is not quite equivalent to your code, since it doesn't 
mutate the array in-place. You can do that using:

```
     .map!((ref i) =>
         just(&i)
             .then((double* i) => *i = log(*i + 1.0))
             .on(scheduler)
         )
```

Unfortunately `then` isn't smart enough to allow `ref double i`, 
so you have to deal with ugly pointers. Something to improve.

---

As for streams, they are actually something that I am going to 
deprecate for the newer Sequence concept.

While that code hasn't been written, the api would allow you to 
do the following:

```
auto result = logs
     .sequence()
     .transform((double i) => log(i + 1.0))
     .parallelize(pool.getScheduler)
     .toList
     .syncWait
     .value
```

For the in-place mutation I suppose we could add a `refSequence` 
(then we don't need `.toList` anymore).

Yet another way would be to use an `asyncScope` and spawn 
individual tasks:

```
import concurrency.thread : stdTaskPool;
import concurrency.operations : whenAll;
import concurrency : syncWait;
import std.algorithm : map;
import std.array : array;
import concurrency.asyncscope;

auto pool = stdTaskPool(32);
auto scheduler = pool.getScheduler();
auto scp = asyncScope();

foreach (i, ref elem; logs) {
     scp.spawn(
         just(&elem)
             .then((double* elem)) => *elem = log(i + 1.0))
             .on(scheduler)
     );
}

scp.cleanup.syncWait();
```


More information about the Digitalmars-d mailing list