nodejs / node

Node.js JavaScript runtime ✨🐢🚀✨
https://nodejs.org
Other
104.27k stars 28.06k forks source link

`Readable.flatMap` concurrency isn't working as expected #52796

Open MoLow opened 2 weeks ago

MoLow commented 2 weeks ago

Version

v22.0.0

Platform

Darwin Moshes-MBP.localdomain 23.3.0 Darwin Kernel Version 23.3.0: Wed Dec 20 21:30:44 PST 2023; root:xnu-10002.81.5~7/RELEASE_ARM64_T6000 arm64

Subsystem

stream

What steps will reproduce the bug?

Run this code:

const { Readable } = require('node:stream');
const { setTimeout } = require('node:timers/promises');

const start = Date.now();
const base = Readable.from([1, 2, 3, 4, 5], { highWaterMark: 100 });
const mapped = base.flatMap(async function * (x) {
  await setTimeout(1000);
  yield x * 2;
}, { concurrency: 200 });

(async () => {
  for await (const chunk of mapped) {
    console.log({ chunk }, Math.floor((Date.now() - start) / 1000));
  }
})();

How often does it reproduce? Is there a required condition?

allways

What is the expected behavior? Why is that the expected behavior?

since concurrency is set to 200 - I expect all the chunks to be emitted together, however there is a second between each chuck:

{ chunk: 2 } 1
{ chunk: 4 } 2
{ chunk: 6 } 3
{ chunk: 8 } 4
{ chunk: 10 } 5

What do you see instead?

{ chunk: 2 } 1
{ chunk: 4 } 2
{ chunk: 6 } 3
{ chunk: 8 } 4
{ chunk: 10 } 5

Additional information

when running similar code using Readable.map the concurrency works as expected:

const { Readable } = require('node:stream');
const { setTimeout } = require('node:timers/promises');

const start = Date.now();
const base = Readable.from([1, 2, 3, 4, 5], { highWaterMark: 100 });
const mapped = base.map(async function(x) {
  await setTimeout(1000);
  return x * 2;
}, { concurrency: 200 });

(async () => {
  for await (const chunk of mapped) {
    console.log({ chunk }, Math.floor((Date.now() - start) / 1000));
  }
})();

results in

{ chunk: 2 } 1
{ chunk: 4 } 1
{ chunk: 6 } 1
{ chunk: 8 } 1
{ chunk: 10 } 1
MoLow commented 2 weeks ago

I think the issue is this implementation:

https://github.com/nodejs/node/blob/743aa42df1ae15fceef5a7d657f6c298a59ac150/lib/internal/streams/operators.js#L345-L352

it relays on map internally to perform queuing, but items arent yet generated at the time queuing is performed (i.e what is queued is the async generators, not their generated values)

MoLow commented 2 weeks ago

CC @nodejs/streams

mcollina commented 2 weeks ago

very odd one indeed!