Streaming library
Denis Koroskin
2korden at gmail.com
Wed Oct 13 13:41:32 PDT 2010
On Wed, 13 Oct 2010 20:55:04 +0400, Andrei Alexandrescu
<SeeWebsiteForEmail at erdani.org> wrote:
>
> Why doesn't Sean's concurrency API scale for your needs? Can that be
> fixed? Would you consider submitting some informed bug reports?
>
Okay, now I got a few extra time so I can share what I missed in
std.concurrency that made me implement my own message passing API:
1) Next to impossible to create and use multiple message boxes, which is a
big pain for writing library code.
std.concurrency operates on Tid, and you can't create new Tids (ctor is
private). Tid is a very simple wrapper around a MessageBox class, and
while you can create custom MessageBoxes, none of the public
std.concurrency APIs work with them directly. As such you are stuck with
one Tid per thread, and that's a no go for me. E.g. I have hundreds of
concurrent socket connections, and I'd like to have different event
handlers for different event sources (i.e. different SocketStreams).
2) Even if you are able to create N Tids, that's what event handling
occurs:
for (int i = 0; i < messageBoxes.length; ++i) {
messageBoxes[i].tid.receiveTimeout(0 /* no timeout, blocking is not
allowed*/, messageBoxes[i].callback );
}
This doesn't scale well. With hunders of message boxes, this loop will
consume all the CPU time. Besides, callback must be defined as "void
delegate(Variant)" and loses all the type information. It just throws all
the events into the same bag. That's not okay for me.
3) I want to bind callbacks to event types in one place, and poll for
events in another one. E.g. instead of:
void foo(FooMessage message) { ... }
...
tid.receiveTimeout(0, &foo, &bar, &baz};
I'd want to be able to
tid.register(&foo);
tid.register(&bar);
tid.register(&baz);
...
tid.poll(0 /* no wait */);
4) Event chaining is impossible to achieve. This is mostly because of #3.
Here is a more concrete example: I want to receive a incoming socket
message notification, parse that message (extract http headers/contents
etc) and then possibly dispatch new event. All this needs to be
transparent to the user. E.g.:
HttpConnection connection = new HttpConnection("google.com"); //
HttpConnection is needed to send multiple http requests over the same
socket connection
HttpRequest request = new HttpRequest("/"); // main page
connection.execute(request, tid); // start
HttpConnection is using SocketStream under the hood.
When you call connection.execute(request), it connects to
tid.receiveOnly( (HttpResponse response) { writeln(response.contents); } );
With std.concurrency it is impossible to implement. Problem is that
HttpResponse event is never sent, because SocketStream message is never
received, because it is never polled for. The following could improve the
situation:
class HttpConnection {
void execute(HttpRequest request) { ... tid.register(&onNewMessage); ... }
}
tid.register(&onHttpResponse);
tid.poll(); // not it polls for both messages!
When a new message arrives, the control is passed to HttpConnection, which
is then passed to HttpRequest, which parses socket message and generates
HttpResponse event, which is then received by user.
5) std.concurrency doesn't know about ThreadPools, and doesn't allow event
processing in threads other than current one. This prevents code
parallelization.
6) Tid can't redirect events to other Tids.
Here is an example:
void onNewEvent(Event e) { writeln(e.toString()); }
Mailbox m1;
m1.register(&onNewEvent);
Mailbox m2 = Mailbox(&m1); // m1 is now a parent to m2
m2.raiseEvent(new Event()); // redirects to m1
m1.poll(INFINITE); // triggers event handling
Useful when you have hunders of mailboxes. Just poll one and all the
events will be triggered.
That's pretty much that I needed (and my Mailbox provides) but
std.concurrency lacks.
My mailbox implementation is very-very slim, full source code available
here:
http://bitbucket.org/korDen/io/src/tip/io/mailbox.d
More information about the Digitalmars-d
mailing list