parallelMerge() uses Promise.race(), and this call has a memory leak: if the same Promise is passed to race() multiple times, its .then() handler is also attached multiple times.
The easiest way to reproduce is to run the following:
const fileStream = fs.createReadStream("...", { highWaterMark: 1 });
const idleStream = new Transform();
let n = 0;
for await (const _ of parallelMerge(fileStream, idleStream)) {
n++;
if (n % 100000 === 0) console.log(n);
}
await delay(100000);
idleStream.destroy();
Internally, for every single byte in fileStream, a Promise.race() is called for promises from fileStream's and idleStream's AsyncIterator, and since idleStream does not emit anything, its promise is reused over and over, and then() handlers are stacked on each other. So if we run the profiler and compare memory footprints before the cycle and right before idleStream.destroy() call, we'll see thousands/millions of dangling promises.
I don't quite have a better alternative approach for parallelMerge() to propose... what's more or less clear is that it's not a proper thing to use Promise.race() to merge streams/iterables.
Hi.
parallelMerge() uses Promise.race(), and this call has a memory leak: if the same Promise is passed to race() multiple times, its .then() handler is also attached multiple times.
The details are here: https://github.com/nodejs/node/issues/17469 (it's a Node's wontfix issue).
The easiest way to reproduce is to run the following:
Internally, for every single byte in fileStream, a Promise.race() is called for promises from fileStream's and idleStream's AsyncIterator, and since idleStream does not emit anything, its promise is reused over and over, and then() handlers are stacked on each other. So if we run the profiler and compare memory footprints before the cycle and right before idleStream.destroy() call, we'll see thousands/millions of dangling promises.
I don't quite have a better alternative approach for parallelMerge() to propose... what's more or less clear is that it's not a proper thing to use Promise.race() to merge streams/iterables.