Streaming library

Denis Koroskin 2korden at gmail.com
Wed Oct 13 13:41:32 PDT 2010


On Wed, 13 Oct 2010 20:55:04 +0400, Andrei Alexandrescu  
<SeeWebsiteForEmail at erdani.org> wrote:
>
> Why doesn't Sean's concurrency API scale for your needs? Can that be  
> fixed? Would you consider submitting some informed bug reports?
>

Okay, now I got a few extra time so I can share what I missed in  
std.concurrency that made me implement my own message passing API:

1) Next to impossible to create and use multiple message boxes, which is a  
big pain for writing library code.
std.concurrency operates on Tid, and you can't create new Tids (ctor is  
private). Tid is a very simple wrapper around a MessageBox class, and  
while you can create custom MessageBoxes, none of the public  
std.concurrency APIs work with them directly. As such you are stuck with  
one Tid per thread, and that's a no go for me. E.g. I have hundreds of  
concurrent socket connections, and I'd like to have different event  
handlers for different event sources (i.e. different SocketStreams).

2) Even if you are able to create N Tids, that's what event handling  
occurs:

for (int i = 0; i < messageBoxes.length; ++i) {
	messageBoxes[i].tid.receiveTimeout(0 /* no timeout, blocking is not  
allowed*/, messageBoxes[i].callback );
}

This doesn't scale well. With hunders of message boxes, this loop will  
consume all the CPU time. Besides, callback must be defined as "void  
delegate(Variant)" and loses all the type information. It just throws all  
the events into the same bag. That's not okay for me.

3) I want to bind callbacks to event types in one place, and poll for  
events in another one. E.g. instead of:

void foo(FooMessage message) { ... }
...

tid.receiveTimeout(0, &foo, &bar, &baz};

I'd want to be able to

tid.register(&foo);
tid.register(&bar);
tid.register(&baz);
...
tid.poll(0 /* no wait */);

4) Event chaining is impossible to achieve. This is mostly because of #3.
Here is a more concrete example: I want to receive a incoming socket  
message notification, parse that message (extract http headers/contents  
etc) and then possibly dispatch new event. All this needs to be  
transparent to the user. E.g.:

HttpConnection connection = new HttpConnection("google.com"); //  
HttpConnection is needed to send multiple http requests over the same  
socket connection
HttpRequest request = new HttpRequest("/"); // main page

connection.execute(request, tid); // start

HttpConnection is using SocketStream under the hood.
When you call connection.execute(request), it connects to

tid.receiveOnly( (HttpResponse response) { writeln(response.contents); } );

With std.concurrency it is impossible to implement. Problem is that  
HttpResponse event is never sent, because SocketStream message is never  
received, because it is never polled for. The following could improve the  
situation:

class HttpConnection {
	void execute(HttpRequest request) { ... tid.register(&onNewMessage); ... }
}

tid.register(&onHttpResponse);
tid.poll(); // not it polls for both messages!

When a new message arrives, the control is passed to HttpConnection, which  
is then passed to HttpRequest, which parses socket message and generates  
HttpResponse event, which is then received by user.

5) std.concurrency doesn't know about ThreadPools, and doesn't allow event  
processing in threads other than current one. This prevents code  
parallelization.

6) Tid can't redirect events to other Tids.

Here is an example:

void onNewEvent(Event e) { writeln(e.toString()); }

Mailbox m1;
m1.register(&onNewEvent);

Mailbox m2 = Mailbox(&m1);  // m1 is now a parent to m2
m2.raiseEvent(new Event()); // redirects to m1

m1.poll(INFINITE); // triggers event handling

Useful when you have hunders of mailboxes. Just poll one and all the  
events will be triggered.

That's pretty much that I needed (and my Mailbox provides) but  
std.concurrency lacks.

My mailbox implementation is very-very slim, full source code available  
here:
http://bitbucket.org/korDen/io/src/tip/io/mailbox.d


More information about the Digitalmars-d mailing list