ReactiveX / IxJS

The Interactive Extensions for JavaScript
https://reactivex.io/IxJS/
MIT License
1.33k stars 74 forks source link

Operator flat concurrency issue with async iterables #351

Closed the-spyke closed 1 year ago

the-spyke commented 2 years ago

IxJS version:

5.0.0

Code to reproduce:

const result = from(asyncIterable).pipe(
  tap({
    next() {
      console.log('outer');
    }
  }),
  flat(1, 1),
);

for await (const item of result) {
  console.log('inner');
}

Where asyncIterable is a sequence of another async iterables.

Expected behavior:

Outer should be pulled only if there's no inner to continue:

outer
inner
inner
outer
inner
inner

Actual behavior:

Outers are pulled several times in a row:

outer
outer
inner
inner
inner
inner

Additional information:

I didn't find the exact issue as _flatten is quite complicated, but src/asynciterable/operators/flat.ts:35 recursively calls itself without the second argument and loose concurrent value. However adding this argument doesn't fix the issue.

trxcllnt commented 1 year ago

You're right that flat() needs to propagate the concurrent parameter down in its recursive call. But I believe the underlying behavior you're seeing is working as intended.

Here's a more complete example to illustrate:

const { from, interval } = require('ix/asynciterable/index');
const { map, flat, take } = require('ix/asynciterable/operators/index');

(async () => {
  const asynciterable = from([
    interval(100).pipe(take(3)),
    interval(100).pipe(take(3)),
    interval(100).pipe(take(3)),
  ]);
  const result = asynciterable.pipe(
    map((_, i) => {
      console.log('outer:', i);
      return _.pipe(map((y) => `${i}-${y}`));
    }),
    flat(1, 1),
  );

  for await (const x of result) {
    console.log('inner:', x);
  }
})();

// outer: 0
// outer: 1
// outer: 2
// inner: 0-0
// inner: 0-1
// inner: 0-2
// inner: 1-0
// inner: 1-1
// inner: 1-2
// inner: 2-0
// inner: 2-1
// inner: 2-2

As we can see, the inner values are enumerated sequentially.

The outer async iterables are pulled immediately because we must use Promise.race() (here) to enumerate both the outer and concurrent inner sequence(s) simultaneously.

Without this, the FlattenConcurrentAsyncIterable wouldn't be notified that the outer sequence has yielded a new sequence that potentially needs to be flattened concurrently with the existing inner sequences. The outer sequence does yield new inner sequences immediately, but flat doesn't exhaust the inner sequence until the previous inner sequence has completed.

This may not be desirable in the flat(1, 1) case, so we've kept the previous behavior alive in concatMap(), which you can use to implement flattenSequential like this:

const { isAsyncIterable } = require('./targets/ix/util/isiterable');
const { from, interval } = require('./targets/ix/asynciterable/index');
const { map, flat, take, concatMap } = require('./targets/ix/asynciterable/operators/index');

(async () => {
  const asynciterable = from([
    interval(100).pipe(take(3)),
    interval(100).pipe(take(3)),
    interval(100).pipe(take(3)),
  ]);
  const result = asynciterable.pipe(
    map((_, i) => {
      console.log('outer:', i);
      return _.pipe(map((y) => `${i}-${y}`));
    }),
    flattenSequential(1),
  );

  for await (const x of result) {
    console.log('inner:', x);
  }
})();

function flattenSequential(depth = -1) {
  depth = (depth < 0 ? Infinity : depth);
  return function flattenOperatorFunction(source) {
    return concatMap((item) => {
      if (isAsyncIterable(item)) {
        return depth > 0 ? flat(depth - 1)(item) : item;
      }
      return [item];
    })(source);
  };
}

// outer: 0
// inner: 0-0
// inner: 0-1
// inner: 0-2
// outer: 1
// inner: 1-0
// inner: 1-1
// inner: 1-2
// outer: 2
// inner: 2-0
// inner: 2-1
// inner: 2-2
the-spyke commented 1 year ago

@trxcllnt Thank you for the explanation. I'm trying to use lazy pipelines for data streaming to keep memory footprint and CPU profile low. My current use case:

from(
  dbCursor({ pageSize })
).pipe(
  map(async (page) => join(page, await apiCall(page))),
  flat(1, 1),
  map(transform),
  ...
);

By the time of the first transform the cursor may be exhausted, the full dataset is in memory, and it made a dozen of API calls.

Will the Infinite concurrency flood Node's event loop? I tried an example with non uniform timing and it produces results out of order: https://codesandbox.io/s/lingering-shape-kf1nui?file=/src/index.js

trxcllnt commented 1 year ago

The concurrent parameter refers to the number of inner sequences that are flattened at the same time. So if you have an outer source AsyncIterable that immediately yields 10 inner sequences, then concurrent=Infinity will flatten all 10 inner sequences and yield their values in the order they arrive. This aligns with the behavior of Rx's flatMap().

I can't tell exactly what you're trying to achieve from your example, but it seems if you know you only need to flatten a single depth, you may want to use concatMap instead of map and flat?

from(
  dbCursor({ pageSize })
).pipe(
  concatMap(async (page) => join(page, await apiCall(page))),
  map(transform),
  ...
);
the-spyke commented 1 year ago

I was saying that readme mentions iterators, so it is not RxJS (observables). The next operation in the chain drives the pipeline. If I have the outer data source that could produce 1000 items, each of which could be flat() into 1000 more. By the time of the second map(transform) how many items will be in the memory? 1 million - 1?

I will try concatMap tomorrow, thank you!

trxcllnt commented 1 year ago

There will only be 1 element in memory at any given moment (assuming extremely aggressive GC behavior, obviously), and transform will have been called 1M times. If you introduce an unbounded buffer before transform (or use an operator that buffers internally, like join()), then yes there will be 1M elements in the buffer.

the-spyke commented 1 year ago

From your comment above:

// outer: 0
// outer: 1
// outer: 2
// inner: 0-0

When you see outer in my case a DB query is executed. So, by the time of the inner 0-0 3 queries were executed. Each produces 1K items. The query-0 is in memory, 2 others may possibly be too (depends on network, etc). Then flat() calls itself recursively if the depth wasn't set, which means right after query-0 fulfilled items of that query will be outer for the internal flat which will do the same thing again. All this is staying in memory until it could be freed after themap.

If my explanation is confusing, then please count how many loaded you see before the first inner in the console here: https://codesandbox.io/s/ixjs-flat-concurrency-kf1nui?file=/src/index.js

trxcllnt commented 1 year ago

Could you wrap each outer sequence in AsyncIterable defer, so the DB query is only executed when the AsyncIterable is enumerated?