nodejs / node

Node.js JavaScript runtime ✨🐢🚀✨
https://nodejs.org
Other
107.34k stars 29.48k forks source link

Add a promise-based API and a synchronous API to the stream module #47257

Closed geryogam closed 1 year ago

geryogam commented 1 year ago

What is the problem this feature will solve?

Recently, I used the idea of this great Stack Overflow post to configure a Node HTTP service application in a flexible way from command-line, environment, and file with CommandLineConfig, EnvironmentConfig, and FileConfig classes.

My FileConfig class has a constructor(path) method taking a path to a configuration file and has a get(key) method taking a key of a value to look up in the configuration file. The problem with the design of that class is that initialization with a path is not convenient to unit test since it requires interacting with the filesystem during the tests (thereby low performing and cluttering up the filesystem with temporary files) or mock the functions of the fs module which are called by the class’s implementation (thereby coupling the tests to the implementation).

import fs from 'node:fs';

class FileConfig {

  constructor(path) {
    this.config = JSON.parse(fs.readFileSync(path));
  }

  get(key) {
    return this.config[key];
  }
}

Application:

import {App} from './app.js';
import {FileConfig} from './config.js';

config = new FileConfig('config.json');
app = new App(config.get('host'), config.get('port'));
app.start();

Unit tests:

import assert from 'node:assert/strict';
import test from 'node:test';

import {FileConfig} from '../src/config.js';

test.test(
  () => {
    config = new FileConfig('config.json');
    assert.equal(config.get('host'), 'localhost');
    assert.equal(config.get('port'), 8000);
  }
);

So I decided to use streams instead of paths because they are more general as they can be file streams but also network streams and in-memory streams, letting the client decide, and in particular in-memory streams are what I’m after for unit tests. So I updated the constructor(readableStream) method to take a readable stream over a configuration file.

import fs from 'node:fs';

class FileConfig {

  constructor(readableStream) {
    this.readableStream = readableStream;
  }

  async get(key) {
    if (!this.config) {
      this.config = JSON.parse(await read(this.readableStream));
    }
    return this.config[key];
  }
}

function read(readableStream) {
  return new Promise(
    (resolve, reject) => {
      let data = '';
      readableStream.on(
        'readable', () => {
          let chunk;
          while (chunk = readableStream.read()) {
            data += chunk;
          }
        }
      );
      readableStream.on(
        'end', () => {
          resolve(data);
        }
      );
      readableStream.on(
        'error', (error) => {
          reject(error);
        }
      );
    }
  );
}

Application:

import {App} from './app.js';
import {FileConfig} from './config.js';

config = new FileConfig('config.json');
app = new App(await config.get('host'), await config.get('port'));
app.start();

Unit tests:

import assert from 'node:assert/strict';
import stream from 'node:stream';
import test from 'node:test';

import {FileConfig} from '../src/config.js';

test.test(
  async () => {
    const readableStream = stream.Readable.from('{"host": "localhost", "port": 8000}');
    const config = new FileConfig(readableStream);
    assert.equal(await config.get('host'), 'localhost');
    assert.equal(await config.get('port'), 8000);
  }
);

The stream approach works great, but the stream module only has a callback-based API so I have two problems with it:

What is the feature you are proposing to solve the problem?

Add a promise-based API and a synchronous API to the stream module, which currently has only a callback-based API. For reference, the fs module already has the three APIs.

What alternatives have you considered?

Using the existing callback-based API of the stream module like described in the stream implementation above.

MadhuSaini22 commented 1 year ago

one alternative could be to continue using the stream module's callback-based API and wrap it in a promise-based API using a utility library such as util.promisify. This would allow for easier consumption of the stream API by developers who prefer working with promises instead of callbacks. However, this approach would still not address the lack of a synchronous API in the stream module.

climba03003 commented 1 year ago

Doesn't it has a way for async generator already?


class FileConfig {

  constructor(readableStream) {
    this.readableStream = readableStream;
  }

  async get(key) {
    if (!this.config) {
      let data = ''
      for await (const chunk of this.readableStream) {
        data += chunk
      }
      this.config = JSON.parse(data);
    }
    return this.config[key];
  }
}
aduh95 commented 1 year ago

You can also use Buffer.concat(await stream.toArray()), which is already the promise API you were asking for I think (see docs). Or or you asking for something else?

aduh95 commented 1 year ago

Regarding the sync API, you can call readable.read([size]) which is going to synchronously return the data available, which I think cover the other part of your feature request. Should we close the issue?

tniessen commented 1 year ago

There are various ways to use streams in a way that is compatible with Promises. Support for async iterables is great, and you can also construct your own async iterators using events.on, for example. pipeline() and finished() also work nicely with Promises.

As @aduh95 mentioned, toArray() returns a Promise, and there are also forEach(), map(), etc. to solve common tasks.

If you want an API that is built on top of Promises, you can also use the Web Streams API.

All that being said, you are reading an entire stream and then pass the buffered contents to JSON.parse(). Doing so eliminates virtually all benefits of streams. Streams reduce latency and memory requirements only if the data is processed chunk by chunk, but passing it to JSON.parse() means that it is processed all at once. Why do you use a stream in the first place instead of a string or a Promise<string>? Either can be obtained easily in a sync or async fashion, and it can also be mocked trivially for testing.

geryogam commented 1 year ago

one alternative could be to continue using the stream module's callback-based API and wrap it in a promise-based API using a utility library such as util.promisify.

@MadhuSaini22 Thanks for the suggestion. According to the documentation, the util.promisify function applies to functions taking an error-first callback (i.e. a (error, value) => { … } callback) as last argument. But the stream.Readable.on method does not follow that style so I’m not sure it’s suitable here.

tniessen commented 1 year ago

@geryogam I believe that a variety of existing APIs have been suggested that appear to solve your particular issue (aside from the fact that using streams likely has no benefits in your example). Do you have a specific API in mind that is missing, or can this issue be closed?

geryogam commented 1 year ago

Doesn't it has a way for async generator already?

class FileConfig {

  constructor(readableStream) {
    this.readableStream = readableStream;
  }

  async get(key) {
    if (!this.config) {
      let data = ''
      for await (const chunk of this.readableStream) {
        data += chunk
      }
      this.config = JSON.parse(data);
    }
    return this.config[key];
  }
}

@climba03003 Thanks a lot, your async iterator solution works very well and greatly simplify my code.

geryogam commented 1 year ago

You can also use Buffer.concat(await stream.toArray()), which is already the promise API you were asking for I think (see docs). Or or you asking for something else?

@aduh95 Thanks a lot, your promise-based stream.Readable.toArray() solution works great (and Buffer.concat() is not needed) and is even simpler than the async iterator solution:

  async get(key) {
    if (!this.config) {
      this.config = JSON.parse(await this.readableStream.toArray());
    }
    return this.config[key];
  }
geryogam commented 1 year ago

Regarding the sync API, you can call readable.read([size]) which is going to synchronously return the data available, which I think cover the other part of your feature request.

@aduh95 The problem is that stream.Readable.read() returns null when there is no data available in its internal buffer, that is at the start and at the end of reading the whole stream. So how would you implement that?

tniessen commented 1 year ago

The problem is that stream.Readable.read() returns null when there is no data available in its internal buffer, that is at the start and at the end of reading the whole stream. So how would you implement that?

@geryogam In general, streams cannot be read in a fully synchronous way. If the code reading the stream never yields to other code that is waiting to respond to I/O events, then it effectively blocks the event loop, which eventually halts all I/O progress in a push-based stream implementation such as Node.js streams.

aduh95 commented 1 year ago

and Buffer.concat() is not needed

Likely it is, if you try to JSON.parse an array you're unlikely to get what you're after (I'm guessing you tested with a file too small to hit a bug, but trust me with larger files you'll get unexpected commas).

The problem is that stream.Readable.read() returns null when there is no data available in its internal buffer, that is at the start and at the end of reading the whole stream

Yeah, if you try to read a stream synchronously, it's expected that sometimes there'll be no data available. There's nothing you can do about that, the stream is an interface for async operation, if you block the event loop you'll never get the data, and if you let it tick, it won't be sync anymore 🤷‍♂️ It's as sync as you'll ever get with streams.

geryogam commented 1 year ago

If you want an API that is built on top of Promises, you can also use the Web Streams API.

@tniessen Thanks very much! I wasn’t aware of this API but the streamConsumers.text() function works great and seems more universal than the stream.Readable.toArray() solution since it works on streamWeb.ReadableStream, stream.Readable, and AsyncIterator:

import streamConsumers from 'node:stream/consumers';

[…]

  async get(key) {
    if (!this.config) {
      this.config = JSON.parse(await streamConsumers.text(this.readableStream));
    }
    return this.config[key];
  }

The documentation of this API starts with:

The WHATWG Streams Standard (or "web streams") defines an API for handling streaming data. It is similar to the Node.js Streams API but emerged later and has become the "standard" API for streaming data across many JavaScript environments.

Does it mean that the Node Streams API is obsolete and developers should use the Web Streams API in new projects?

tniessen commented 1 year ago

Does it mean that Node Streams API is obsolete and developers should use the Web Streams API in new projects?

Most of the Node.js ecosystem relies on Node.js streams, so I don't see them going away anytime soon. The main reason for implementing WHATWG Streams in Node.js at all is interoperability with other JavaScript environments and web standards. Also, unfortunately, the Node.js implementation of Web Streams is much slower than Node.js streams so far.

All in all, I would not call Node.js streams obsolete, nor would I recommend Web Streams if compatibility with other runtimes is not a concern. In many cases, it's easy to support both as you discovered yourself, e.g., using async iterators or text().

geryogam commented 1 year ago

All that being said, you are reading an entire stream and then pass the buffered contents to JSON.parse(). Doing so eliminates virtually all benefits of streams. Streams reduce latency and memory requirements only if the data is processed chunk by chunk, but passing it to JSON.parse() means that it is processed all at once. Why do you use a stream in the first place instead of a string or a Promise<string>? Either can be obtained easily in a sync or async fashion, and it can also be mocked trivially for testing.

@tniessen That’s an excellent remark, actually I hadn’t thought about this simple solution. And it seems the only synchronous solution as current stream-based approaches are all asynchronous as you mentioned. So I suppose that buries my feature request for adding a synchronous stream API, since one might as well pass data directly? Would it even be possible to implement such an API?

tniessen commented 1 year ago

Would it even be possible to implement such an API?

In general, no. Some types of streams might be able to use a pull mechanism to achieve this, but it won't work in general (and is an anti-pattern in any case).

geryogam commented 1 year ago

In general, no. Some types of streams might be able to use a pull mechanism to achieve this, but it won't work in general (and is an anti-pattern in any case).

By synchronous stream API I was thinking about adding a stream.readSync() method to the stream module. It’s surprising that it’s not possible in general since for instance Python has a synchronous stream.read() method that returns all the bytes from the stream until EOF. That allows you to write synchronous functions on streams like this:

>>> import json
>>> def f(stream):
...     return json.loads(stream.read())
... 
>>> import io
>>> f(open('config.json', encoding='utf-8'))
{'host': 'localhost', 'port': 8000}
>>> f(io.StringIO('{"host": "localhost", "port": 8000}'))
{'host': 'localhost', 'port': 8000}
tniessen commented 1 year ago

It’s surprising that it’s not possible in general

Node.js streams are very flexible. Consider this transform stream:

const { Transform } = require('node:stream');

const myTransform = new Transform({
  transform(chunk, encoding, callback) {
    setTimeout(() => callback(null, chunk), 1000);
  },
});

If you pipe any data into this stream, it will be asynchronously delayed by a second before being passed on to whatever code is reading the stream. Don't actually use this code, it's just for demonstration purposes! In reality, imagine that such delays arise from network latency or so.

How should a hypothetical readSync() function read data from this stream? If the function is synchronous, i.e., blocks the event loop, the event scheduled by setTimeout() will never occur, thus, the transform stream will never emit any data.

for instance Python has a synchronous stream.read() method that returns all the bytes from the stream until EOF.

Node.js I/O model was designed to scale well from the beginning, both to large amounts of data and to many concurrent streams being processed in parallel. Using Python's stream.read() to read the entire stream does not scale in either direction. And that's probably why newer versions of Python have async streams.

geryogam commented 1 year ago

Thanks for the explanation! So since solutions were provided to the two problems listed in my original post, and the JavaScript stream module already has a promise-based asynchronous API, and a synchronous API is not generally possible with the single threaded model of Node, I think we can now close this feature request. Thanks to everyone involved in this conversation, I learnt a lot.