Proposal for a MessageQueue (was Re: public MessageBox)

Nathan M. Swan nathanmswan at gmail.com
Thu Mar 22 16:01:46 PDT 2012


On Thursday, 22 March 2012 at 21:27:40 UTC, Sean Kelly wrote:
> On Mar 22, 2012, at 12:06 PM, "Nathan M. Swan" 
> <nathanmswan at gmail.com> wrote:
>
>> On Thursday, 22 March 2012 at 15:53:56 UTC, Sean Kelly wrote:
>>> I can see adapting the API so that each thread has a default 
>>> message queue (keep in mind that we'll be adding interprocess 
>>> messaging at some point via the same routines). I'm not yet 
>>> clear how the existence of alternate message queues could be 
>>> communicated to other portions of the code though. register() 
>>> is one way I suppose. Really what's happening here is that 
>>> Tid is being replaced by a queue ID, not extended with a 
>>> mutable variable.
>> 
>> I think they would be passed as parameters to spawn or 
>> received from the default message queue.
>
> But will either of those solve the problem you outlined where 
> user code is calling receiveOnly and bumping into a message 
> meant for a third-party API?  If the API is spawning threads 
> they typically won't be running user code, or at least would 
> certainly impose restrictions on message queue use by called 
> user code. And in the case of sending the Qid to the default 
> queue, you end up with a race condition where user code might 
> call receiveOnly.
>

But what if the client spawns threads?

An example would be with a desktop GUI. In a background thread 
meant for a CPU-intensive task, they want to update a progress 
indicator and send partially-calculated data to a main-thread.

void mainThread() {
     string data;
     auto mq = new MessageQueue();
     spawn(&backgroundThread, mq, pi);
     pi.onChange = (double val) {
         if (val == 0.5) {
             data = me.receiveOnly!string();
         } else {
             data ~= me.receiveOnly!string();
         }
     };
}

void backgroundThread(MessageQueue me, ProgressIndicator pi) {
     // part 1 of calculations...
     me.send(partiallyCalculatedData);
     pi.value = 0.5; // implementation: 
this._queue.send(UpdateValue(value))
     // part 2...
     me.send(theRestOfTheData);
     pi.value = 1.0;
}

With one MessageQueue per thread, the mailbox would contain a 
(string, UpdateValue, string, UpdateValue). The mainThread would 
expect a (UpdateValue, string, UpdateValue, string).

This way, the client code is separated from the library.

The default queue is an idea suggested for backward 
compatibility, and new programmers wouldn't be encouraged to use 
it.

>>> I guess Tid would become an alias for the Qid created when a 
>>> thread is spawned. What I really don't want though, is for 
>>> receive() to operate on a message queue created in a 
>>> different thread. Messaging would become substantially slower 
>>> if receive() had to be synchronized.
>> 
>> That's a drawback I haven't considered. To solve this, it 
>> would be made part of the contract that receiving must all be 
>> done in one thread.
>> 
>> I can't think of a use where receiving in multiple threads 
>> would apply, but if it would, a SynchronizedMessageQueue 
>> subclass could easily be drawn up that broadens the contract 
>> and synchronizes for receive().
>> 
>> BTW, how do you unittest just the std.concurrency module?
>
> Not easily, since a failure often means that a thread hangs.

Linking fails (I'm on OSX):

$ rdmd --main -unittest std/concurrency.d
Undefined symbols for architecture x86_64:
   "_D3std3utf10strideImplFNaNeamZk", referenced from:
       _D3std3utf15__T6strideTAxaZ6strideFNaNfxAamZk in 
concurrency.d.o
       _D3std3utf14__T6strideTAaZ6strideFNaNfxAamZk in 
concurrency.d.o
       _D3std3utf15__T6strideTAyaZ6strideFNaNfxAyamZk in 
concurrency.d.o
ld: symbol(s) not found for architecture x86_64
collect2: ld returned 1 exit status
--- errorlevel 1

Thanks, NMS



More information about the Digitalmars-d mailing list