ReactiveX / IxJS

The Interactive Extensions for JavaScript
https://reactivex.io/IxJS/
MIT License
1.33k stars 74 forks source link

flatMap causes memory to balloon when concurrent executions are consumed #375

Open thepont opened 1 month ago

thepont commented 1 month ago

IxJS version:

7.0.0

Code to reproduce:

This example demonstrates the that producer is not waiting for the consumer to finish processing before another item is made available. This creates issues where the memory balloons if the consumer is slower then the producer. This seems to be isolated to when a flatMap is used. Despite all concurrent executions being consumed, more values are still produced.

import {from} from "ix/asynciterable";
import {flatMap} from "ix/asynciterable/operators";

let current = 0;

let range = {
  [Symbol.asyncIterator]() { 
    return {
      async next() {
          await new Promise(resolve => setTimeout(resolve, 1));
          return { done: false, value: current++ };
      }
    };
  }
};

export async function example() {
  let results = (from(range)).pipe(
      flatMap(async (ii) => {
        await new Promise(resolve => setTimeout(resolve, 1000));
        return ii
      }, 1)
  );

  for await (let message of results) {
      // print heap usage
      console.log(`Got Message ${message} messages, current: ${current}, diff: ${current - message}`);
  }
}

example();

Expected behavior:

If all concurrency is used then no more values should be requested until concurrency is freed for processing, The iterator should only produce values is the consumer is able to consume them.

Expected Output

Got Message 0 messages, current: 0, diff: 0
Got Message 1 messages, current: 1, diff: 0
Got Message 2 messages, current: 2, diff: 0
Got Message 3 messages, current: 3, diff: 0

Actual behavior:

Flatmap doesn't prevent values from being produced.

Got Message 0 messages, current: 844, diff: 844
Got Message 1 messages, current: 1703, diff: 1702
Got Message 2 messages, current: 2556, diff: 2554
Got Message 3 messages, current: 3413, diff: 3410

Additional information:

trxcllnt commented 1 month ago

~From your description, it seems the behavior you are looking for is in concatMap()?~

edit: nevermind, I see the implementation in your PR. This is an interesting case, I'll comment more there.