dmfay / massive-js

A data mapper for Node.js and PostgreSQL.
2.49k stars 158 forks source link

Feature: iterator/generator based output as alternative to streams #613

Closed rijnhard closed 6 years ago

rijnhard commented 6 years ago

Streams are a nice interface to handling larger amounts of query data. but streams are fairly verbose to implement, especially if you do the error handling and backpressure properly.

Generators are a much more familiar interface, and much simpler in the use case

e.g. using async iterators (which aren't standard yet)

async function processQueued() {
    const iterator = b.tests.find({priority: 'low'}, {iterator: true});

    for await (const item of iterator) {
        await doRandomProcess(item);
    }
}

This is much simpler then handling streams, the only real drawback is that async generators are not standard yet, but if resolving a promise with an iterator is sufficient then this isn't an issue.

vitaly-t commented 6 years ago

The syntax shown in the example requires Node.js v9.2+

rijnhard commented 6 years ago

Not saying it should be done immediately, but it is a nice usability enhancement for the future

On Fri, 29 Jun 2018, 20:52 Vitaly Tomilov, notifications@github.com wrote:

The syntax shown in the example requires Node.js v9.2+

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/dmfay/massive-js/issues/613#issuecomment-401442866, or mute the thread https://github.com/notifications/unsubscribe-auth/AEF36Frr26D63dfRPI2-dUkgWj0XWTsfks5uBndogaJpZM4U9DLT .

dmfay commented 6 years ago

It would be cool! I'm going to punt on it though -- most of the work to be done here would fit in better at the driver level so there's a single consistent implementation whether you're using Massive or not. So if you're interested in working toward it I'd suggest pulling pg-promise (hi Vitaly!) instead.

rijnhard commented 6 years ago

Thank you for the feedback @dmfay

@vitaly-t should I open a feature request on pg-promise?

Edit: about the only thing I haven't tried to figure out is if a standard (not async) iterator will be sufficient, but I doubt it.

vitaly-t commented 6 years ago

@rijnhard I don't know what feature request you could open against pg-promise, as it already supports both ES6 generators and ES7 async/await in a generic way.

rijnhard commented 6 years ago

@vitaly-t I may be missing the boat, so excuse me if this is a dumb statement. But I had a look at pg-promise and how it handles generators and thats mainly in the transaction/task area, I'm talking about having an alternative to the stream API, because it has the same use case as the stream API, in that it allows you to handle large datasets with backpressure just like streams. The only real difference is that it's a much nicer API to work with, where streams get clunky and error-prone quickly.

vitaly-t commented 6 years ago

in that it allows you to handle large datasets with backpressure just like streams

For that pg-promise supports sequences, see method sequence and page that let you implement any custom back-pressure, to process an abstract data source.

rijnhard commented 6 years ago

@vitaly-t, @dmfay this may be asking for a bit much, but I'm struggling to figure out how to use sequences in say this example massive-js#search, could you give me an example?

rijnhard commented 6 years ago

TBH at this rate I think it's probably just going to be easier if I map the stream to a generator on my own, kind of how es6-stream does it (but in a nicer way) and maybe find a way to upstream that with a similar structure to node-pg-query-stream.

dmfay commented 6 years ago

Afraid I haven't gotten into pgp's sequences much at all, so I can't really help you there. If it makes it in upstream though it shouldn't be too much work to add proper support for it through Massive though!

rijnhard commented 6 years ago

So for completeness and anyone else looking to make this work:

I decided stuff it and forked node-pg-query-stream into node-pg-query-iterator and got about 70% of the way with it (with some babel issues) and then came across this gem and tweaked it into this:

/**
 * Converts a stream to an async iterator
 * @template T
 * @param {ReadableStream<T>} stream
 * @return {Promise<AsyncIterable<T>|Error>}
 * @see https://jakearchibald.com/2017/async-iterators-and-generators/
 * @todo figure out how it handles stream error events?
 */
export async function* streamToIterator(stream) {
    // Get a lock on the stream
    const reader = stream.getReader();

    try {
        // initial read so that we can error handle better
        let { done, value } = await reader.read();

        while (!done) {
            yield value; // chunk

            // Read from the stream
            ({ done, value } = await reader.read());
        }
        reader.releaseLock();
    } catch (e) {
        reader.releaseLock();
        // on an error the iterator does stop.
        throw e;
    }
}

But then found this stream_readable_symbol_asynciterator

So I dont think any of this is needed, just sitback and wait for it to be adopted, and if you cant wait, just use the transformer above (I am using the transformer for now)