Multi-processing : processes, mutexes and shared data

Luhrel lucien.perregaux at
Fri Jul 15 17:11:19 UTC 2022

Hello there,

I wanted to create a multi-process program, and share some data 
between the processes (for a heavy task, so a thread wouldn't be 
efficient). However, I didn't see any way to multi-process a 
function in the druntime/phobos, so I created a simple Process 
class :
module process;

import core.sys.posix.unistd : fork, _exit;
import core.sys.posix.sys.wait;
import std.process : ProcessException;
import std.functional : toDelegate;

class Process
     pid_t pid;

     this(T, Args...)(T function(Args) fn, Args args)
         this(fn.toDelegate, args);

     this(T, Args...)(T delegate(Args) dg, Args args)
         if (is(T == int) || is(T == void))
         pid_t pid = fork();
         if (pid < 0)
             throw ProcessException.newFromErrno("Failed to spawn 
new process");
         else if (pid == 0)
             // child
             static if (is(T == int))
             // parent
    = pid;

     int wait()
         int status = void;
         waitpid(pid, &status, 0);
         return WEXITSTATUS(status);

Nice. Now, let's use it :

import process;
import std.stdio;
import core.thread.osthread : Thread;
import core.time;

void job(int i)

void main()
     enum NB_PROCESS = 15;

     Process[NB_PROCESS] processes;
     foreach(i ; 0 .. NB_PROCESS)
         processes[i] = new Process(&job, i);

         foreach(i ; 0 .. NB_PROCESS)
[x] create multiple processes.

Ok, now we need to share data between all the processes.

Maybe the `synchronized` keyword ? 
[no](, that's only for threads.
Let's use a Mutex from 
then. Wait, nope, it does not support multi-processes, 
`pthread_mutexattr_getpshared` is not set to 

So, I need to modify it. Let's copy-pasta `core.sync.mutex` and 
add the following modifications to the [line 
92]( :
                 abort("Error: pthread_mutexattr_setpshared 
Rename the module to avoid conflicts, and voilà !

[x] Mutex for multiple processes.

Now let's give it a try. But how to check if it correctly works ? 
We need to create a shared memory space. Maybe a `shared` 
variable ? 
[No](, same 
problem as before ; it's only for multi-threading. Arg, got it. D 
isn't made for multi-processing, I'll code it.

*3 hours later* :
module shared_memory;

import core.sys.posix.sys.mman;
import core.sys.posix.unistd;
import core.sys.posix.fcntl;
import std.conv : emplace;

class SharedMemory(T, Args...)
     static if (is(T == class))
         T cl;
     T* ptr;
     string ident;

     this(string identifier, Args args)
         ident = identifier;

         static if (is(T == class))
             size_t size = __traits(classInstanceSize, T);
             size_t size = T.sizeof;

         int shm_fd = shm_open(ident.ptr, O_CREAT | O_RDWR, 
0x1b6); // 0x1b6 = 0666 (octal)
         ftruncate(shm_fd, size);
         void[] memory = mmap(null, size, PROT_READ | PROT_WRITE, 
MAP_SHARED, shm_fd, 0)[0..size];

         static if (is(T == class))
             cl = emplace!(T, Args)(memory, args);
             ptr = &cl;
             ptr = emplace!(T, Args)(memory, args);

     void unlink()
         static if (is(T == class))
         shm_unlink(ident.ptr); // shm_unlink also closes the file 

     T* data()
         return ptr;

Seems to work, now let's create a test program :
import process;
import mutex;
import shared_memory;

import std.stdio;
import core.thread.osthread : Thread;
import core.time;

void job(int i, SharedMemory!Mutex mutex, SharedMemory!ulong sm)
     auto m = *;
     Thread.sleep(dur!"msecs"(1000)); // force the scheduler to 
change the execution context
     Thread.sleep(dur!"msecs"(100)); // ditto
     foreach (j; 1 .. 100)
         * += i * j - i + i*i*i*i; // "random" calculations

void main()
     enum NB_PROCESS = 100;

     auto sum = new SharedMemory!ulong("sum");
     auto mutex = new SharedMemory!Mutex("mutex");

     * = 0;

     Process[NB_PROCESS] processes;
     foreach(i ; 0 .. NB_PROCESS)
         processes[i] = new Process(&job, i, mutex, sum);

     foreach(i ; 0 .. NB_PROCESS)

     if (* != 193107012120)
         writeln("failed ! sum: ", *;

     import core.memory : GC;
     GC.collect(); // force collection

Let's run our program a hundred times : `for i in 0..100; do 
./mulproc; done`. No output, so everything worked properly ! Yaay 

[x] Share data between processes

*Achievement get: multi-processing in D.*


Fine. This was fun to do.

However, I have two questions :
- Why wasn't this implemented in druntime/phobos ?
- Can we implement this (also for Non-POSIX systems) in 
druntime/phobos ? If yes, how to do it properly (i.e. by not 
using my code)


More information about the Digitalmars-d mailing list