Streaming library

Denis Koroskin 2korden at gmail.com
Thu Oct 14 21:59:16 PDT 2010


In the byLine example I was searching for a pattern with a memchr until I  
find a delimeter ('\n' by default). Once found, I copied that data to  
user-supplied buffer. One of the improvement we could make it to support  
external predicates. Next, I noticed that it isn't even necessary to copy  
data to user-supplied buffer in many cases (e.g. if you want to write that  
line to output stream). Let's change a prototype to reflect this:

struct BufferedStream
{
	...
	size_t consume(size_t delegate(ubyte[]) sink);
	...
}

In this function, sink is a delegate that accept next chunk of data and  
returns an amount of data it wants a stream to skip. Returning 0 means we  
are done. Here is how we can read one line from a stream and write it to  
stdout:

size_t printUntilDelim(ubyte[] haystack)
{
	void* ptr = memchr(haystack.ptr, haystack.length, '\n');
	size_t numBytes = (ptr is null ? haystack.length : ptr - haystack.ptr);
	
	printf("%.*s", numBytes, haystack.ptr);
	
	return numBytes;
}

auto numBytes = stream.consume(&printUntilDelim);

If we only need to count number of lines in a file, we don't need to copy  
anything at all:

size_t findNewLine(ubyte[] haystack)
{
	const(void)* ptr = memchr(haystack.ptr, '\n', haystack.length);
	return (ptr is null) ? haystack.length : ptr - haystack.ptr + 1; //  
including '\n'
}

int numLines = 0;
int numChars = 0;
while (true) {
	size_t chars = byLine.consume(&findNewLine);
	if (chars == 0) {
		break;
	}

	numChars += chars;
	numLines++;
}

With this change, run time has decreased from 68 to 47ms, and the code  
became a lot more clear, too:

size_t consume(size_t delegate() sink)
{
	if (bufferPtr is null) {
		return 0;
	}

	size_t totalBytesConsumed = 0;
	size_t bytesBuffered = bufferEnd - bufferPtr;

	while (true) {
		size_t bytesConsumed = sink(bufferPtr[0..bytesBuffered]);
		totalBytesConsumed += bytesConsumed;

		if (bytesConsumed == bytesBuffered) {
			refill();
			if (bufferPtr !is bufferEnd) {
				bytesBuffered = bufferEnd - bufferPtr;
				continue;
			}

			bufferPtr = null;
			return totalBytesConsumed;
		}

		bufferPtr += bytesConsumed;
		return totalBytesConsumed;
	}
}

A copying version might be still required, so here is a helper:

ubyte[] consumeAndCopy(size_t delegate(ubyte[]) sink, ubyte[] buffer); //  
grows if required


More information about the Digitalmars-d mailing list