Streaming library

Denis Koroskin 2korden at gmail.com
Wed Oct 13 09:16:38 PDT 2010


On Wed, 13 Oct 2010 18:32:15 +0400, Andrei Alexandrescu  
<SeeWebsiteForEmail at erdani.org> wrote:

> On 10/11/2010 07:49 PM, Daniel Gibson wrote:
>> Andrei Alexandrescu schrieb:
>>> Agreed. Maybe this is a good time to sart making a requirements list
>>> for streams. What are the essential features/feature groups?
>>>
>>> Andrei
>>
>> Maybe something like the following (I hope it's not too extensive):
>>
>> * Input- Output- and InputAndOutput- Streams
>> - having InputStream and OutputStream as an interface like in the old
>> design may be a good idea
>> - implementing the standard operations that are mostly independent from
>> the data source/sink
>> like read/write for basic types, strings, ... in mixin templates is
>> probably elegant to create
>> streams that are both Input and Output (one mixin that implements most
>> of InputStream and
>> one that implements most of OutputStream)
>
> So far so good. I will point out, however, that the classic read/write  
> routines are not all that good. For example if you want to implement a  
> line-buffered stream on top of a block-buffered stream you'll be forced  
> to write inefficient code.
>

Never heard of filesystems that allow reading files in lines - they always  
read in blocks, and that's what streams should do. That's because most of  
the steams are binary streams, and there is no such thing as a "line" in  
them (e.g. how often do you need to read a line from a SocketStream?).

I don't think streams should buffer anything either (what an underlying OS  
I/O API caches should suffice), buffered streams adapters can do that in a  
stream-independent way (why duplicate code when you can do that as  
efficiently with external methods?).

Besides, as you noted, the buffering is redundant for byChunk/byLine  
adapter ranges. It means that byChunk/byLine should operate on unbuffered  
streams.

I'll explain my I/O streams implementation below in case you didn't read  
my message (I've changed some stuff a little since then). My Stream  
interface is very simple:

// A generic stream
interface Stream
{
     @property InputStream input();
     @property OutputStream output();
     @property SeekableStream seekable();
     @property bool endOfStream();
     void close();
}

You may ask, why separate Input and Output streams? Well, that's because  
you either read from them, write from them, or both.
Some streams are read-only (think Stdin), some write-only (Stdout), some  
support both, like FileStream. Right?

Not exactly. Does FileStream support writing when you open file for  
reading? Does it support reading when you open for writing?
So, you may or may not read from a generic stream, and you also may or may  
not write to a generic stream. With a design like that you can make a  
mistake: if a stream isn't readable, you have no reference to invoke  
read() method on.

Similarly, a stream is either seekable, or not. SeekableStreams allow  
stream cursor manipulation:

interface SeekableStream : Stream
{
     long getPosition(Anchor whence = Anchor.begin);
     void setPosition(long position, Anchor whence = Anchor.begin);
}

InputStream doesn't really has many methods:

interface InputStream
{
	// reads up to buffer.length bytes from a stream
	// returns number of bytes read
	// throws on error
	size_t read(ubyte[] buffer);

	// reads from current position
	AsyncReadRequest readAsync(ubyte[] buffer, Mailbox* mailbox = null);
}

So is OutputStream:

interface OutputStream
{
	// returns number of bytes written
	// throws on error
	size_t write(const(ubyte)[] buffer);

	// writes from current position
	AsyncWriteRequest writeAsync(const(ubyte)[] buffer, Mailbox* mailbox =  
null);
}

They basically support only reading and writing in blocks, nothing else.  
However, they support asynchronous reads/writes, too  (think of mailbox as  
a std.concurrency's Tid).

Unlike Daniel's proposal, my design reads up to buffer size bytes for two  
reasons:
- it avoids potential buffering and multiple sys calls
- it is the only way to go with SocketStreams. I mean, you often don't  
know how many bytes an incoming socket message contains. You either have  
to read it byte-by-byte, or your application might stall for potentially  
infinite time (if message was shorter than your buffer, and no more  
messages are being sent)

Why do my streams provide async methods? Because it's the modern approach  
to I/O - blocking I/O (aka one thread per client) doesn't scale. E.g. Java  
adds a second revision of Async I/O API in JDK7 (called NIO2, first  
appeared in February, 2002), C# has asynchronous operations as part of  
their Stream interface since .NET 1.1 (April, 2003).

With async I/O you can server many clients with one thread. Here is an  
example (pseude-code, usings std.concurrency):

foreach (connection; networkConnections) {
     connection.receiveMessage(getTid());
}

receiveOnly!( (NetworkMessage message) { /* do stuff */ }

This is still not the most performant solution, but it's still a lot  
better than one thread per client.

Async I/O not only needed for network stuff. Here is a code snippet from  
DMD (comments added):

#define ASYNCREAD 1
#if ASYNCREAD
     AsyncRead *aw = AsyncRead::create(modules.dim);
     for (i = 0; i < modules.dim; i++)
     {
         m = (Module *)modules.data[i];
         aw->addFile(m->srcfile);
     }
     aw->start(); // executes async request, doesn't block
#else
     // Single threaded
     for (i = 0; i < modules.dim; i++)
     {
         m = (Module *)modules.data[i];
         m->read(0); // blocks
     }
#endif

     // Do some other stuff

     for (i = 0; i < modules.dim; i++)
     {
         ...
#if ASYNCREAD
         aw->read(i); // waits until async operation finishes
#endif

Walter told that this small change gave quite a speed up in compilation  
time.

Also, my async methods return a reference to AsyncRequest interface that  
allows waiting for completion (that's what Walter does in DMD), canceling,  
querying a status (complete, in progress, failed), reporting an error, etc  
and that's very useful, too.

I strongly believe we shouldn't ignore this type of API.

P.S. For threads this deep it's better fork a new one, especially when  
changing the subject.


More information about the Digitalmars-d mailing list