Proposal for a MessageQueue (was Re: public MessageBox)

Sean Kelly sean at invisibleduck.org
Thu Mar 22 17:13:50 PDT 2012


On Mar 22, 2012, at 4:01 PM, Nathan M. Swan wrote:

> On Thursday, 22 March 2012 at 21:27:40 UTC, Sean Kelly wrote:
>> On Mar 22, 2012, at 12:06 PM, "Nathan M. Swan" <nathanmswan at gmail.com> wrote:
>> 
>>> On Thursday, 22 March 2012 at 15:53:56 UTC, Sean Kelly wrote:
>>>> I can see adapting the API so that each thread has a default message queue (keep in mind that we'll be adding interprocess messaging at some point via the same routines). I'm not yet clear how the existence of alternate message queues could be communicated to other portions of the code though. register() is one way I suppose. Really what's happening here is that Tid is being replaced by a queue ID, not extended with a mutable variable.
>>> I think they would be passed as parameters to spawn or received from the default message queue.
>> 
>> But will either of those solve the problem you outlined where user code is calling receiveOnly and bumping into a message meant for a third-party API?  If the API is spawning threads they typically won't be running user code, or at least would certainly impose restrictions on message queue use by called user code. And in the case of sending the Qid to the default queue, you end up with a race condition where user code might call receiveOnly.
>> 
> 
> But what if the client spawns threads?
> 
> An example would be with a desktop GUI. In a background thread meant for a CPU-intensive task, they want to update a progress indicator and send partially-calculated data to a main-thread.
> 
> void mainThread() {
>    string data;
>    auto mq = new MessageQueue();
>    spawn(&backgroundThread, mq, pi);
>    pi.onChange = (double val) {
>        if (val == 0.5) {
>            data = me.receiveOnly!string();
>        } else {
>            data ~= me.receiveOnly!string();
>        }
>    };
> }
> 
> void backgroundThread(MessageQueue me, ProgressIndicator pi) {
>    // part 1 of calculations...
>    me.send(partiallyCalculatedData);
>    pi.value = 0.5; // implementation: this._queue.send(UpdateValue(value))
>    // part 2...
>    me.send(theRestOfTheData);
>    pi.value = 1.0;
> }
> 
> With one MessageQueue per thread, the mailbox would contain a (string, UpdateValue, string, UpdateValue). The mainThread would expect a (UpdateValue, string, UpdateValue, string).

While sending messages like a bare string might be good for example code, any real application is going to use structured messages whose type is specific to what the message is for, contains fields like sender Tid, etc.  It seems like you're aiming more for CSP where you'd create a separate communication channel per use.  You could even fake it by wrapping send/receive with your own CSP-like API, though it's quite likely that a from-scratch CSP style implementation would be faster because there'd be no need to package messages.


>>>> I guess Tid would become an alias for the Qid created when a thread is spawned. What I really don't want though, is for receive() to operate on a message queue created in a different thread. Messaging would become substantially slower if receive() had to be synchronized.
>>> That's a drawback I haven't considered. To solve this, it would be made part of the contract that receiving must all be done in one thread.
>>> I can't think of a use where receiving in multiple threads would apply, but if it would, a SynchronizedMessageQueue subclass could easily be drawn up that broadens the contract and synchronizes for receive().
>>> BTW, how do you unittest just the std.concurrency module?
>> 
>> Not easily, since a failure often means that a thread hangs.
> 
> Linking fails (I'm on OSX):
> 
> $ rdmd --main -unittest std/concurrency.d
> Undefined symbols for architecture x86_64:
>  "_D3std3utf10strideImplFNaNeamZk", referenced from:
>      _D3std3utf15__T6strideTAxaZ6strideFNaNfxAamZk in concurrency.d.o
>      _D3std3utf14__T6strideTAaZ6strideFNaNfxAamZk in concurrency.d.o
>      _D3std3utf15__T6strideTAyaZ6strideFNaNfxAyamZk in concurrency.d.o
> ld: symbol(s) not found for architecture x86_64
> collect2: ld returned 1 exit status
> --- errorlevel 1

Used to work, and std.concurrency doesn't even use std.utf.  Not sure what's going on there.


More information about the Digitalmars-d mailing list