dvlsg / async-csp

CSP style channels using ES7 async/await
MIT License
317 stars 18 forks source link

A different approach to filtering? #17

Open rauschma opened 7 years ago

rauschma commented 7 years ago

I love CSP and think it may be a better approach than asynchronous iteration. For filtering channels, I’d prefer an approach that is different than constructor + callback. What do you think? If this makes sense, it could be added as, e.g., Channel.filter().

Error handling and the sentinel value END_OF_CHANNEL feel like work-arounds. Are there ways of handling closing and errors that are more in line with the style of the API?

Example:

import fs from 'fs';

import Channel from 'async-csp';

const END_OF_CHANNEL = Symbol();

function filter(inputChannel, ...filterFuncs) {
    for (const filterFunc of filterFuncs) {
        const outputChannel = new Channel();
        filterFunc(inputChannel, outputChannel);
        inputChannel = outputChannel;
    }
    return inputChannel;
}

function readFile(fileName) {
    const channel = new Channel();
    const readStream = fs.createReadStream(fileName,
        { encoding: 'utf8', bufferSize: 1024 });
    readStream.on('data', buffer => {
        const str = buffer.toString('utf8');
        channel.put(str);
    });
    readStream.on('error', err => {
        channel.put(err);
    });
    readStream.on('end', () => {
        // Signal end of output sequence
        channel.put(END_OF_CHANNEL);
    });
    return channel;
}

async function splitLines(input, output) {
    let previous = '';
    while (true) {
        const chunk = await input.take();
        if (chunk === END_OF_CHANNEL) break;
        if (chunk instanceof Error) {
            output.put(chunk);
            return;
        }
        previous += chunk;
        let eolIndex;
        while ((eolIndex = previous.indexOf('\n')) >= 0) {
            const line = previous.slice(0, eolIndex);
            output.put(line);
            previous = previous.slice(eolIndex+1);
        }
    }
    if (previous.length > 0) {
        output.put(previous);
    }
    output.put(END_OF_CHANNEL);
}

async function prefixLines(input, output) {
    while (true) {
        const line = await input.take();
        if (line === END_OF_CHANNEL) {
            output.put(END_OF_CHANNEL);
            return;
        }
        if (line instanceof Error) {
            output.put(line);
            return;
        }
        output.put('> '+line);
    }
}

async function main() {
    const fileName = process.argv[2];

    const ch = filter(readFile(fileName), splitLines, prefixLines);

    while (true) {
        const line = await ch.take();
        if (line === END_OF_CHANNEL) break;
        console.log(line);
    }
}
main();
trusktr commented 6 years ago

@rauschma This is an interesting example. How would you write this example using the constructor+callback pattern (curious to see how it compares to this idea, and to see if it wouldn't be as clean).

rauschma commented 6 years ago

@trusktr IINM, each of the filterFuncs would be a callback of a channel and the channels would be piped. Obviously, they’d have to be restructured appropriately; not trivial in the case of splitLines().

dvlsg commented 6 years ago

I have actually considered going the whole 9 yards, and trying to support things like filter, map, reduce, etc. Or transducers directly. Does that fit in line with what you were thinking? The constructor callback does try to be a one-size-fits-all, which in turn tends to mean it probably does too much.

dvlsg commented 6 years ago

One other thing to consider, humor me and pretend .filter and .map exist on the following setup, and assume they are "immutable" (in the sense that they return a new Channel each time, setting up the pipes as expected):

const ch = new Channel([1, 2, 3, 4, 5])
  .map(x => {
    if (x === 3) { throw new Error(); }
    return x + 1;
  })
  .filter(x => x % 2 === 0)

await ch.take();
await ch.take();
await ch.take(); //=> Error lifted into a throw / rejection here, from the `#map()` call?

Seems like it may be tricky to get the error all the way through to the "userland" #take(). But I suppose that would be the least-surprising implementation?

So if that's the case, I would need a way to pipe the error from the mapped channel to the filtered channel, at least until I find a user-specified #take() call that isn't part of the pipe.

But that introduces async issues to consider. Even if a #take() is not specified right now on the mapped channel, it may be attached in the future. Is missing that intended #take() because we piped the error all the way through a problem, or intentional?

dvlsg commented 6 years ago

Considering making an error in one channel propagate the error to each channel in a pipeline, closing them all.

There's some discussion around whether or not to do this in native streams here. Seems a bit unclear whether or not they landed on a solution.

dvlsg commented 6 years ago

To give you an eye into what I'm considering, I have started a new branch here.

Currently, the only new method implemented is #map() (ignoring #toArray() anyways). I included some functor law tests, as well. Other methods like #filter() and #reduce() would likely be close to follow, if I like the way it plays out. Going to sit and mull this one over.