Tracing/Profiling D Applications

Christian Köstlin christian.koestlin at gmail.com
Sun May 29 20:47:38 UTC 2022


On 2022-05-29 20:52, Ali Çehreli wrote:
> On 5/27/22 06:55, Christian Köstlin wrote:
> 
>  > I wonder how I can synchronize the "dumping" and the
>  > collection of the threads. Would be cool to have an efficient lockless
>  > implementation of appender ...
> 
> That turned out to be nontrivial.
> 
> The following is a draft I played with. Collector collects and Dumper 
> dumps. They use a SpinLock, an unpublished feature of core.internal for 
> locking. The implementation of spinlock (e.g. at 
> /usr/include/dlang/dmd/core/internal/spinlock.d) has a reference to 
> "test and test-and-set (TTAS)":
> 
>    https://en.wikipedia.org/wiki/Test_and_test-and-set
> 
> I learned about TTAS from Rikki Cattermole yesterday at TeaConf. :)
> 
> The code is attached and works on my system.
> 
> Ali
> 
> import std;
> import std.datetime.stopwatch;
> import core.thread;
> import core.atomic;
> import core.internal.spinlock;
> 
> enum workerCount = 8;
> enum threadRunTime = 4.seconds;
> enum mainRunTime = threadRunTime + 1.seconds;
> 
> shared struct ScopeLock {
>    @disable this(this);
>    @disable void opAssign(ref const(typeof(this)));
> 
>    SpinLock * lock;
> 
>    this(shared(SpinLock) * lock) {
>      this.lock = lock;
>      lock.lock();
>    }
> 
>    ~this() {
>      lock.unlock();
>    }
> }
> 
> struct Collector {
>    long[] data;
> 
>    shared(SpinLock) lock;
> 
>    auto scopeLock() shared {
>      return ScopeLock(&lock);
>    }
> 
>    // Adds a data point to this collector.
>    void add(long i) shared {
>      auto sl = scopeLock();
> 
>      /// Some crazy way of adding data points. Real code should
>      // make more sense.
>      data ~= i;
>    }
> 
>    // Adds the data of this collector to the specified array
>    // array. Again, real code should use a more sophisticated
>    // method.
>    void aggregate(ref long[] where) shared {
>      auto sl = scopeLock();
> 
>      where ~= data.sum;
>      data.length = 0;
>      (cast(long[])data).assumeSafeAppend();
>    }
> }
> 
> // A variable to help us trust the code. We will print this at
> // the end of main.
> long allThatHasBeenDumped = 0;
> // Used only for validating the code.
> shared long allCollectedByThreads;
> 
> synchronized class Dumper {
> private:
>    shared(Collector)*[] collectors;
> 
>    void register(shared(Collector) * collector) shared {
>      writeln("registering ", collector);
>      collectors ~= collector;
>    }
> 
>    // Dumps current results.
>    void dump(File output) shared {
>      long[] data;
> 
>      foreach (collector; collectors) {
>        collector.aggregate(data);
>      }
> 
>      const allData = data.sum;
> 
>      if (allData != 0) {
>        stdout.writefln!"Just collected:%-(\n  %,s%)"(data);
>        allThatHasBeenDumped += allData;
>      }
>    }
> }
> 
> shared(Dumper) dumper;
> 
> shared static this() {
>    writeln("Making a Dumper");
>    dumper = new Dumper();
> } >
> shared(Collector) * collector;
> 
> static this() {
>    writeln("Making a Collector");
>    collector = new shared(Collector)();
>    dumper.register(cast(shared)collector);
> }
> 
> // Main thread function
> void doWork() {
>    try {
>      doWorkImpl();
> 
>    } catch (Throwable exc) {
>      stderr.writeln("Caught Throwable: ", exc.msg);
>    }
> }
> 
> // The implementation of each thread.
> void doWorkImpl() {
>    auto sw = StopWatch();
>    sw.start();
> 
>    long i = 0;
>    while (sw.peek < threadRunTime) {
>      (cast(shared)collector).add(i);
>      ++i;
>    }
> 
>    --i;
>    auto total = i * (i + 1) / 2;
>    writefln("Thread collected %,s items equaling %,s with %s",
>             i, total, collector);
> 
>    atomicOp!"+="(allCollectedByThreads, total);
> }
> 
> void main() {
>    writeln("main started");
>    iota(workerCount).each!(_ => spawn(&doWork));
> 
>    auto sw = StopWatch();
>    sw.start();
> 
>    while (sw.peek < mainRunTime) {
>      dumper.dump(stdout);
>      Thread.sleep(100.msecs);
>    }
> 
>    // One final collection (and dump):
>    dumper.dump(stdout);
> 
>    assert(allThatHasBeenDumped == allCollectedByThreads);
> }
> 

Hi Ali,

thanks a lot for that, I will first have to digest that.
Just one first question: Our discussion with using TLS for the 
collectors proposed to not need any lock on the add method for 
collector, because its thread local and with that thread safe?

Kind regards,
Christian


More information about the Digitalmars-d-learn mailing list