Tracing/Profiling D Applications
Ali Çehreli
acehreli at yahoo.com
Sun May 29 18:52:37 UTC 2022
On 5/27/22 06:55, Christian Köstlin wrote:
> I wonder how I can synchronize the "dumping" and the
> collection of the threads. Would be cool to have an efficient lockless
> implementation of appender ...
That turned out to be nontrivial.
The following is a draft I played with. Collector collects and Dumper
dumps. They use a SpinLock, an unpublished feature of core.internal for
locking. The implementation of spinlock (e.g. at
/usr/include/dlang/dmd/core/internal/spinlock.d) has a reference to
"test and test-and-set (TTAS)":
https://en.wikipedia.org/wiki/Test_and_test-and-set
I learned about TTAS from Rikki Cattermole yesterday at TeaConf. :)
The code is attached and works on my system.
Ali
import std;
import std.datetime.stopwatch;
import core.thread;
import core.atomic;
import core.internal.spinlock;
enum workerCount = 8;
enum threadRunTime = 4.seconds;
enum mainRunTime = threadRunTime + 1.seconds;
shared struct ScopeLock {
@disable this(this);
@disable void opAssign(ref const(typeof(this)));
SpinLock * lock;
this(shared(SpinLock) * lock) {
this.lock = lock;
lock.lock();
}
~this() {
lock.unlock();
}
}
struct Collector {
long[] data;
shared(SpinLock) lock;
auto scopeLock() shared {
return ScopeLock(&lock);
}
// Adds a data point to this collector.
void add(long i) shared {
auto sl = scopeLock();
/// Some crazy way of adding data points. Real code should
// make more sense.
data ~= i;
}
// Adds the data of this collector to the specified array
// array. Again, real code should use a more sophisticated
// method.
void aggregate(ref long[] where) shared {
auto sl = scopeLock();
where ~= data.sum;
data.length = 0;
(cast(long[])data).assumeSafeAppend();
}
}
// A variable to help us trust the code. We will print this at
// the end of main.
long allThatHasBeenDumped = 0;
// Used only for validating the code.
shared long allCollectedByThreads;
synchronized class Dumper {
private:
shared(Collector)*[] collectors;
void register(shared(Collector) * collector) shared {
writeln("registering ", collector);
collectors ~= collector;
}
// Dumps current results.
void dump(File output) shared {
long[] data;
foreach (collector; collectors) {
collector.aggregate(data);
}
const allData = data.sum;
if (allData != 0) {
stdout.writefln!"Just collected:%-(\n %,s%)"(data);
allThatHasBeenDumped += allData;
}
}
}
shared(Dumper) dumper;
shared static this() {
writeln("Making a Dumper");
dumper = new Dumper();
}
shared(Collector) * collector;
static this() {
writeln("Making a Collector");
collector = new shared(Collector)();
dumper.register(cast(shared)collector);
}
// Main thread function
void doWork() {
try {
doWorkImpl();
} catch (Throwable exc) {
stderr.writeln("Caught Throwable: ", exc.msg);
}
}
// The implementation of each thread.
void doWorkImpl() {
auto sw = StopWatch();
sw.start();
long i = 0;
while (sw.peek < threadRunTime) {
(cast(shared)collector).add(i);
++i;
}
--i;
auto total = i * (i + 1) / 2;
writefln("Thread collected %,s items equaling %,s with %s",
i, total, collector);
atomicOp!"+="(allCollectedByThreads, total);
}
void main() {
writeln("main started");
iota(workerCount).each!(_ => spawn(&doWork));
auto sw = StopWatch();
sw.start();
while (sw.peek < mainRunTime) {
dumper.dump(stdout);
Thread.sleep(100.msecs);
}
// One final collection (and dump):
dumper.dump(stdout);
assert(allThatHasBeenDumped == allCollectedByThreads);
}
More information about the Digitalmars-d-learn
mailing list