Multi-processing : processes, mutexes and shared data
Luhrel
lucien.perregaux at gmail.com
Fri Jul 15 17:11:19 UTC 2022
Hello there,
I wanted to create a multi-process program, and share some data
between the processes (for a heavy task, so a thread wouldn't be
efficient). However, I didn't see any way to multi-process a
function in the druntime/phobos, so I created a simple Process
class :
```
module process;
import core.sys.posix.unistd : fork, _exit;
import core.sys.posix.sys.wait;
import std.process : ProcessException;
import std.functional : toDelegate;
class Process
{
private:
pid_t pid;
public:
this(T, Args...)(T function(Args) fn, Args args)
{
this(fn.toDelegate, args);
}
this(T, Args...)(T delegate(Args) dg, Args args)
if (is(T == int) || is(T == void))
{
pid_t pid = fork();
if (pid < 0)
{
throw ProcessException.newFromErrno("Failed to spawn
new process");
}
else if (pid == 0)
{
// child
static if (is(T == int))
{
_exit(dg(args));
}
else
{
dg(args);
_exit(0);
}
}
else
{
// parent
this.pid = pid;
}
}
int wait()
{
int status = void;
waitpid(pid, &status, 0);
return WEXITSTATUS(status);
}
}
```
Nice. Now, let's use it :
```
import process;
import std.stdio;
import core.thread.osthread : Thread;
import core.time;
void job(int i)
{
Thread.sleep(dur!"seconds"(1));
writeln(i);
}
void main()
{
enum NB_PROCESS = 15;
Process[NB_PROCESS] processes;
foreach(i ; 0 .. NB_PROCESS)
{
processes[i] = new Process(&job, i);
}
scope(exit)
{
foreach(i ; 0 .. NB_PROCESS)
{
processes[i].wait();
}
}
}
```
[x] create multiple processes.
Ok, now we need to share data between all the processes.
Maybe the `synchronized` keyword ?
[no](https://dlang.org/spec/statement.html#SynchronizedStatement), that's only for threads.
Let's use a Mutex from
[core.sync.mutex](https://dlang.org/phobos/core_sync_mutex.html)
then. Wait, nope, it does not support multi-processes,
`pthread_mutexattr_getpshared` is not set to
`PTHREAD_PROCESS_SHARED` [[source
code](https://github.com/dlang/dmd/blob/master/druntime/src/core/sync/mutex.d#L92=)][[man](https://man7.org/linux/man-pages/man3/pthread_mutexattr_getpshared.3.html)].
So, I need to modify it. Let's copy-pasta `core.sync.mutex` and
add the following modifications to the [line
92](https://github.com/dlang/dmd/blob/69ab16a7e81c24bc893851d1fbf68a0ae8baeb53/druntime/src/core/sync/mutex.d#L92=) :
```
!pthread_mutexattr_setpshared(&attr,
PTHREAD_PROCESS_SHARED) ||
abort("Error: pthread_mutexattr_setpshared
failed.");
```
Rename the module to avoid conflicts, and voilà !
[x] Mutex for multiple processes.
Now let's give it a try. But how to check if it correctly works ?
We need to create a shared memory space. Maybe a `shared`
variable ?
[No](http://ddili.org/ders/d.en/concurrency_shared.html), same
problem as before ; it's only for multi-threading. Arg, got it. D
isn't made for multi-processing, I'll code it.
*3 hours later* :
```
module shared_memory;
import core.sys.posix.sys.mman;
import core.sys.posix.unistd;
import core.sys.posix.fcntl;
import std.conv : emplace;
class SharedMemory(T, Args...)
{
private:
static if (is(T == class))
T cl;
T* ptr;
string ident;
public:
@nogc
this(string identifier, Args args)
{
ident = identifier;
static if (is(T == class))
size_t size = __traits(classInstanceSize, T);
else
size_t size = T.sizeof;
int shm_fd = shm_open(ident.ptr, O_CREAT | O_RDWR,
0x1b6); // 0x1b6 = 0666 (octal)
ftruncate(shm_fd, size);
void[] memory = mmap(null, size, PROT_READ | PROT_WRITE,
MAP_SHARED, shm_fd, 0)[0..size];
static if (is(T == class))
{
cl = emplace!(T, Args)(memory, args);
ptr = &cl;
}
else
{
ptr = emplace!(T, Args)(memory, args);
}
}
//@nogc
void unlink()
{
static if (is(T == class))
destroy(cl);
shm_unlink(ident.ptr); // shm_unlink also closes the file
descriptor
}
T* data()
{
return ptr;
}
}
```
Seems to work, now let's create a test program :
```
import process;
import mutex;
import shared_memory;
import std.stdio;
import core.thread.osthread : Thread;
import core.time;
void job(int i, SharedMemory!Mutex mutex, SharedMemory!ulong sm)
{
auto m = *mutex.data;
Thread.sleep(dur!"msecs"(1000)); // force the scheduler to
change the execution context
m.lock();
Thread.sleep(dur!"msecs"(100)); // ditto
foreach (j; 1 .. 100)
*sm.data += i * j - i + i*i*i*i; // "random" calculations
m.unlock();
}
void main()
{
enum NB_PROCESS = 100;
auto sum = new SharedMemory!ulong("sum");
auto mutex = new SharedMemory!Mutex("mutex");
*sum.data = 0;
Process[NB_PROCESS] processes;
foreach(i ; 0 .. NB_PROCESS)
{
processes[i] = new Process(&job, i, mutex, sum);
}
foreach(i ; 0 .. NB_PROCESS)
{
processes[i].wait();
}
if (*sum.data != 193107012120)
{
writeln("failed ! sum: ", *sum.data);
}
import core.memory : GC;
GC.collect(); // force collection
sum.unlink();
mutex.unlink();
}
```
Let's run our program a hundred times : `for i in 0..100; do
./mulproc; done`. No output, so everything worked properly ! Yaay
!
[x] Share data between processes
*Achievement get: multi-processing in D.*
---
Fine. This was fun to do.
However, I have two questions :
- Why wasn't this implemented in druntime/phobos ?
- Can we implement this (also for Non-POSIX systems) in
druntime/phobos ? If yes, how to do it properly (i.e. by not
using my code)
Bests,
Luhrel
More information about the Digitalmars-d
mailing list