Tracing/Profiling D Applications

Ali Çehreli acehreli at yahoo.com
Sun May 29 18:52:37 UTC 2022


On 5/27/22 06:55, Christian Köstlin wrote:

 > I wonder how I can synchronize the "dumping" and the
 > collection of the threads. Would be cool to have an efficient lockless
 > implementation of appender ...

That turned out to be nontrivial.

The following is a draft I played with. Collector collects and Dumper 
dumps. They use a SpinLock, an unpublished feature of core.internal for 
locking. The implementation of spinlock (e.g. at 
/usr/include/dlang/dmd/core/internal/spinlock.d) has a reference to 
"test and test-and-set (TTAS)":

   https://en.wikipedia.org/wiki/Test_and_test-and-set

I learned about TTAS from Rikki Cattermole yesterday at TeaConf. :)

The code is attached and works on my system.

Ali

import std;
import std.datetime.stopwatch;
import core.thread;
import core.atomic;
import core.internal.spinlock;

enum workerCount = 8;
enum threadRunTime = 4.seconds;
enum mainRunTime = threadRunTime + 1.seconds;

shared struct ScopeLock {
   @disable this(this);
   @disable void opAssign(ref const(typeof(this)));

   SpinLock * lock;

   this(shared(SpinLock) * lock) {
     this.lock = lock;
     lock.lock();
   }

   ~this() {
     lock.unlock();
   }
}

struct Collector {
   long[] data;

   shared(SpinLock) lock;

   auto scopeLock() shared {
     return ScopeLock(&lock);
   }

   // Adds a data point to this collector.
   void add(long i) shared {
     auto sl = scopeLock();

     /// Some crazy way of adding data points. Real code should
     // make more sense.
     data ~= i;
   }

   // Adds the data of this collector to the specified array
   // array. Again, real code should use a more sophisticated
   // method.
   void aggregate(ref long[] where) shared {
     auto sl = scopeLock();

     where ~= data.sum;
     data.length = 0;
     (cast(long[])data).assumeSafeAppend();
   }
}

// A variable to help us trust the code. We will print this at
// the end of main.
long allThatHasBeenDumped = 0;
// Used only for validating the code.
shared long allCollectedByThreads;

synchronized class Dumper {
private:
   shared(Collector)*[] collectors;

   void register(shared(Collector) * collector) shared {
     writeln("registering ", collector);
     collectors ~= collector;
   }

   // Dumps current results.
   void dump(File output) shared {
     long[] data;

     foreach (collector; collectors) {
       collector.aggregate(data);
     }

     const allData = data.sum;

     if (allData != 0) {
       stdout.writefln!"Just collected:%-(\n  %,s%)"(data);
       allThatHasBeenDumped += allData;
     }
   }
}

shared(Dumper) dumper;

shared static this() {
   writeln("Making a Dumper");
   dumper = new Dumper();
}

shared(Collector) * collector;

static this() {
   writeln("Making a Collector");
   collector = new shared(Collector)();
   dumper.register(cast(shared)collector);
}

// Main thread function
void doWork() {
   try {
     doWorkImpl();

   } catch (Throwable exc) {
     stderr.writeln("Caught Throwable: ", exc.msg);
   }
}

// The implementation of each thread.
void doWorkImpl() {
   auto sw = StopWatch();
   sw.start();

   long i = 0;
   while (sw.peek < threadRunTime) {
     (cast(shared)collector).add(i);
     ++i;
   }

   --i;
   auto total = i * (i + 1) / 2;
   writefln("Thread collected %,s items equaling %,s with %s",
            i, total, collector);

   atomicOp!"+="(allCollectedByThreads, total);
}

void main() {
   writeln("main started");
   iota(workerCount).each!(_ => spawn(&doWork));

   auto sw = StopWatch();
   sw.start();

   while (sw.peek < mainRunTime) {
     dumper.dump(stdout);
     Thread.sleep(100.msecs);
   }

   // One final collection (and dump):
   dumper.dump(stdout);

   assert(allThatHasBeenDumped == allCollectedByThreads);
}



More information about the Digitalmars-d-learn mailing list