Streaming library
Denis Koroskin
2korden at gmail.com
Thu Oct 14 21:59:16 PDT 2010
In the byLine example I was searching for a pattern with a memchr until I
find a delimeter ('\n' by default). Once found, I copied that data to
user-supplied buffer. One of the improvement we could make it to support
external predicates. Next, I noticed that it isn't even necessary to copy
data to user-supplied buffer in many cases (e.g. if you want to write that
line to output stream). Let's change a prototype to reflect this:
struct BufferedStream
{
...
size_t consume(size_t delegate(ubyte[]) sink);
...
}
In this function, sink is a delegate that accept next chunk of data and
returns an amount of data it wants a stream to skip. Returning 0 means we
are done. Here is how we can read one line from a stream and write it to
stdout:
size_t printUntilDelim(ubyte[] haystack)
{
void* ptr = memchr(haystack.ptr, haystack.length, '\n');
size_t numBytes = (ptr is null ? haystack.length : ptr - haystack.ptr);
printf("%.*s", numBytes, haystack.ptr);
return numBytes;
}
auto numBytes = stream.consume(&printUntilDelim);
If we only need to count number of lines in a file, we don't need to copy
anything at all:
size_t findNewLine(ubyte[] haystack)
{
const(void)* ptr = memchr(haystack.ptr, '\n', haystack.length);
return (ptr is null) ? haystack.length : ptr - haystack.ptr + 1; //
including '\n'
}
int numLines = 0;
int numChars = 0;
while (true) {
size_t chars = byLine.consume(&findNewLine);
if (chars == 0) {
break;
}
numChars += chars;
numLines++;
}
With this change, run time has decreased from 68 to 47ms, and the code
became a lot more clear, too:
size_t consume(size_t delegate() sink)
{
if (bufferPtr is null) {
return 0;
}
size_t totalBytesConsumed = 0;
size_t bytesBuffered = bufferEnd - bufferPtr;
while (true) {
size_t bytesConsumed = sink(bufferPtr[0..bytesBuffered]);
totalBytesConsumed += bytesConsumed;
if (bytesConsumed == bytesBuffered) {
refill();
if (bufferPtr !is bufferEnd) {
bytesBuffered = bufferEnd - bufferPtr;
continue;
}
bufferPtr = null;
return totalBytesConsumed;
}
bufferPtr += bytesConsumed;
return totalBytesConsumed;
}
}
A copying version might be still required, so here is a helper:
ubyte[] consumeAndCopy(size_t delegate(ubyte[]) sink, ubyte[] buffer); //
grows if required
More information about the Digitalmars-d
mailing list