Parallel ByLine, ByChunk?

dsimcha dsimcha at yahoo.com
Tue Aug 9 13:22:40 PDT 2011


Ok, I've actually managed to come up with a good way to do this without writing
any new low-level concurrency code.  The idea is to create two delegates:  nextDel
reads the next element into a user-provided buffer.  emptyDel tests whether
there's anything left to read.  Then, RoundRobinBuffer turns this into a range,
cycling through a fixed set of buffers.  Then, std.parallelism.asyncBuf handles
filling the first half of the buffers while reading the second half and
vice-versa.  This pattern can be encapsulated as an overload of TaskPool.asyncBuf
with some signature like:

auto asyncBuf(T)
(void delegate(ref T[]) nextDel, bool delegate() emptyDel, size_t nBuffers);

Prototype/demo code:

import std.stdio;

struct RoundRobinBuffer(T) {
private:
    T[][] bufs;
    size_t index;
    void delegate(ref T[]) nextDel;
    bool delegate() emptyDel;
    bool _empty;
    bool primed;

    void prime() {
        scope(success) primed = true;
        nextDel(bufs[index]);
    }

public:

    this(
        void delegate(ref T[]) nextDel,
        bool delegate() emptyDel,
        size_t nBuffers
    ) {
        this.nextDel = nextDel;
        this.emptyDel = emptyDel;
        bufs.length = nBuffers;
    }

    T[] front() @property {
        if(!primed) prime();
        return bufs[index];
    }

    void popFront() {
        if(empty || emptyDel()) {
            _empty = true;
            return;
        }

        index = (index + 1) % bufs.length;
        primed = false;
    }

    bool empty() @property const pure nothrow @safe {
        return _empty;
    }
}

void main() {
    import std.parallelism;

    auto f = File("test2.d");

    void readNext(ref char[] buf) {
        f.readln(buf);

        import std.algorithm;
        if(std.algorithm.endsWith(buf, '\n')) {
            buf.length -= 1;
        }
    }

    auto b = RoundRobinBuffer!char(
        &readNext,
        &f.eof,
        20
    );

    foreach(line; taskPool.asyncBuf(b, 10)) {
        writeln(line);
    }
}


More information about the Digitalmars-d mailing list