nodejs / node

Node.js JavaScript runtime ✨🐢🚀✨
https://nodejs.org
Other
106.42k stars 29k forks source link

Allow multiple readers to iterate over a stream at the same time #52086

Open ehmicky opened 5 months ago

ehmicky commented 5 months ago

What is the problem this feature will solve?

When multiple callers use Readable[Symbol.asyncIterator], each caller receives a partial result.

import {Readable} from 'node:stream'

const stream = Readable.from(['a', 'b', 'c'])

const iterate = async (readerName) => {
  for await (const chunk of stream) {
    console.log(readerName, chunk)
  }
}

await Promise.all([
  iterate('one'),
  iterate('two'),
])
one a
two b
one c

Some callers might expect that result. But others might expect the following result instead.

one a
two a
one b
two b
one c
two c

What is the feature you are proposing to solve the problem?

Adding an option to readable.iterator() to use readable.on('data') instead of readable.read(). This would enable the above behavior.

What alternatives have you considered?

Implementing this user-land.

See an example of it at https://github.com/sindresorhus/get-stream/pull/121

jakecastelli commented 2 months ago

Hi @ehmicky, I had a look at your implementation and realised you took the effort to make it work with both node 18 and 20, I think if this is implemented in nodejs core it shouldn't have any backward compatibility concern right (since it was a breaking change)? Correct me if I am wrong.

ehmicky commented 2 months ago

Hi @jakecastelli,

The implementation in get-stream works with Node 18, 20 and 22. However, it would have some subtle differences and breaking changes from the current implementation in Node.js.

Overall, IMHO, it would be safer to keep the current Node.js implementation, and provide this new behavior as an opt-in, for backward compatibility. For example, using a boolean option to stream.iterator().


As a side note, please note that the implementation in get-stream does not pass the highWaterMark option to events.on() (see https://github.com/sindresorhus/get-stream/pull/125). That's because get-stream consumes the iterable synchronously and right away.

However, since events.on() buffers incoming chunks, it might be a good idea to allow the user to specify the highWaterMark option, in case they are consuming the iterable too slowly. This will automatically pause/resume the stream in order to prevent the buffer from leaking memory. This is different from stream.readableHighWaterMark: it is measured in number of chunks (not number of bytes), and it paces events.on() (not stream.on('data')). This might presumably be confusing to some users, so I am not sure whether a different name would help clear that confusion.

mycoleb commented 2 months ago

My team is claiming this issue.

MoLow commented 2 months ago

@nodejs/streams should Readable have a tee() helper/method like webstream readable?

benjamingr commented 2 months ago

@mycoleb your what is what?

benjamingr commented 2 months ago

@nodejs/streams should Readable have a tee() helper/method like webstream readable?

Maybe? It's just tricky to get semantics right namely backpressure/watermarking and cleanup/resources. A tee'd stream should get destroyed when all its forks get destroyed, it should communicate backpressure etc - and we need to decide what happens when one "fork" is flowing and one is not

MoLow commented 2 months ago

Maybe? It's just tricky to get semantics right namely backpressure/watermarking and cleanup/resources. A tee'd stream should get destroyed when all its forks get destroyed, it should communicate backpressure etc - and we need to decide what happens when one "fork" is flowing and one is not

I didn't say it is simple, but this is a very common need

benjamingr commented 2 months ago

Yeah my point is we shouldn't land an implementation that doesn't deal with these things

mcollina commented 2 months ago

Unfortunately, using on('data') would be complex and prone to errors due to backpressure requirements. Using .read() and on('readable') simplified quite a lot of that handling.

A proper implementation of this is exceptionally hard. https://www.npmjs.com/package/cloneable-readable works, but it's complex, and I couldn't compress it down in a nice API like .tee() without adding more state varibles.

I'm not opposed to add cloneable-readable to Node.js core, or moving it to the Node.js org, as I always considered it as part of my activities in Node.js.