Parallel ByLine, ByChunk?
dsimcha
dsimcha at yahoo.com
Tue Aug 9 13:22:40 PDT 2011
Ok, I've actually managed to come up with a good way to do this without writing
any new low-level concurrency code. The idea is to create two delegates: nextDel
reads the next element into a user-provided buffer. emptyDel tests whether
there's anything left to read. Then, RoundRobinBuffer turns this into a range,
cycling through a fixed set of buffers. Then, std.parallelism.asyncBuf handles
filling the first half of the buffers while reading the second half and
vice-versa. This pattern can be encapsulated as an overload of TaskPool.asyncBuf
with some signature like:
auto asyncBuf(T)
(void delegate(ref T[]) nextDel, bool delegate() emptyDel, size_t nBuffers);
Prototype/demo code:
import std.stdio;
struct RoundRobinBuffer(T) {
private:
T[][] bufs;
size_t index;
void delegate(ref T[]) nextDel;
bool delegate() emptyDel;
bool _empty;
bool primed;
void prime() {
scope(success) primed = true;
nextDel(bufs[index]);
}
public:
this(
void delegate(ref T[]) nextDel,
bool delegate() emptyDel,
size_t nBuffers
) {
this.nextDel = nextDel;
this.emptyDel = emptyDel;
bufs.length = nBuffers;
}
T[] front() @property {
if(!primed) prime();
return bufs[index];
}
void popFront() {
if(empty || emptyDel()) {
_empty = true;
return;
}
index = (index + 1) % bufs.length;
primed = false;
}
bool empty() @property const pure nothrow @safe {
return _empty;
}
}
void main() {
import std.parallelism;
auto f = File("test2.d");
void readNext(ref char[] buf) {
f.readln(buf);
import std.algorithm;
if(std.algorithm.endsWith(buf, '\n')) {
buf.length -= 1;
}
}
auto b = RoundRobinBuffer!char(
&readNext,
&f.eof,
20
);
foreach(line; taskPool.asyncBuf(b, 10)) {
writeln(line);
}
}
More information about the Digitalmars-d
mailing list