nodejs / node

Node.js JavaScript runtime ✨🐢🚀✨
https://nodejs.org
Other
107.33k stars 29.47k forks source link

async iterators/generators in stream.pipeline() #27140

Closed boneskull closed 4 years ago

boneskull commented 5 years ago

Apologies for all the code here.

We can consume a Readable stream using an async iterator:

// source: http://2ality.com/2018/04/async-iter-nodejs.html
async function printAsyncIterable(iterable) {
  for await (const chunk of iterable) {
    console.log('>>> '+chunk);
  }
}

printAsyncIterable(fs.createReadStream('my-file.txt', 'utf8'));

And we can use async generators similarly to how one would use a Transform stream:

/**
 * Parameter: async iterable of chunks (strings)
 * Result: async iterable of lines (incl. newlines)
 */
async function* chunksToLines(chunksAsync) {
  let previous = '';
  for await (const chunk of chunksAsync) {
    previous += chunk;
    let eolIndex;
    while ((eolIndex = previous.indexOf('\n')) >= 0) {
      // line includes the EOL
      const line = previous.slice(0, eolIndex+1);
      yield line;
      previous = previous.slice(eolIndex+1);
    }
  }
  if (previous.length > 0) {
    yield previous;
  }
}

/**
 * Parameter: async iterable of lines
 * Result: async iterable of numbered lines
 */
async function* numberLines(linesAsync) {
  let counter = 1;
  for await (const line of linesAsync) {
    yield counter + ': ' + line;
    counter++;
  }
}

Then, we can "pipe" these together like so:

async function main() {
  printAsyncIterable(
    numberLines(
      chunksToLines(
        fs.createReadStream('my-file.txt', 'utf8')
      )
    )
  );
}
main();

That's neat, but also kind of hideous. What if we could leverage stream.pipeline() to do something like this?

async function main() {
  stream.pipeline(
    fs.createReadStream('my-file.txt', 'utf8'),
    chunksToLines,
    numberLines,
    printAsyncIterable
  );
}
main();

I'm unfamiliar with the guts of stream.pipeline()--and completely new to async iterators and generators--so don't know how feasible something like this is.

FWIW, the "hideous nested function calls" can be naively replaced by use of the godlike Array.prototype.reduce():

const pipeline = async (...args) => args.reduce((acc, arg) => arg(acc));

async function main() {
  pipeline(
    fs.createReadStream('my-file.txt', 'utf8'),
    chunksToLines,
    numberLines,
    printAsyncIterable
  );
}
main();

Reference: https://twitter.com/b0neskull/status/1115325542566227968

boneskull commented 5 years ago

cc @mcollina

addaleax commented 5 years ago

So … basically, the desired feature here is that stream.pipeline() should transparently convert async generators to object-mode Transform streams?

boneskull commented 5 years ago

@addaleax I guess that'd be pretty great. Readable#pipe too, maybe?

boneskull commented 5 years ago

something something migration path from streams to async iterators and generators

devsnek commented 5 years ago

if we can coerce async iterables to streams, that would enable some nice compat with whatwg streams. +1 from me

mcollina commented 5 years ago

How can we coerce to a transform? What signature would you expect from a transform async iterator?

addaleax commented 5 years ago

@mcollina I think any function that takes one argument, which is an async-iterable for Buffers or strings, and returns an async-iterable of that kind should work?

More generally, I think we could create an utility class that turns async-iterables of Buffers/strings into a Readable stream, and use that as a building block for the .pipe() utility.

mcollina commented 5 years ago

https://github.com/mcollina/stream-iterators-utils/blob/master/README.md#utilstoreadablegenerator-opts this is a prototype implementation of that Readable pattern.

felixfbecker commented 5 years ago

It's awesome that we now have streams.Readable.from() and readable[Symbol.asyncIterator], the only piece missing in the puzzle now is streams.Transform.from() with a signature like

class Transform<I, O> {
  static from<I, O>(generatorFn: (input: AsyncIterable<I>) => AsyncIterable<O>): Transform<I, O>
}
ronag commented 4 years ago

We added support for async generators in pipeline in v13.x. I believe that full-fills this topic. Please re-open of there is anything further to address.