ReactiveX / IxJS

The Interactive Extensions for JavaScript
https://reactivex.io/IxJS/
MIT License
1.32k stars 73 forks source link

flatMap throwing when concurrency is larger than iterable size #366

Closed aviv-rozenboim-lmnd closed 3 months ago

aviv-rozenboim-lmnd commented 3 months ago

IxJS version: 5.0.0

Code to reproduce:

import { from } from "ix/asynciterable";
import { flatMap } from "ix/asynciterable/operators";

// Works
from([1, 2])
  .pipe(flatMap(async (x) => [x], 2))
  .forEach(() => {});

// Throws
from([1, 2])
  .pipe(flatMap(async (x) => [x], 3))
  .forEach(() => {});

Expected behavior: flatMap to work concurrently

Actual behavior:

/Users/aviv.rozenboim/dev/ts-experiments/node_modules/ix/util/util/returniterator.ts:14
  if (typeof it.return === 'function') {
                ^
TypeError: Cannot read properties of undefined (reading 'return')
    at returnAsyncIterator (/Users/aviv.rozenboim/dev/ts-experiment/node_modules/ix/util/util/returniterator.ts:14:17)
    at Array.map (<anonymous>)
    at FlattenConcurrentAsyncIterable.[Symbol.asyncIterator] (/Users/aviv.rozenboim/dev/ts-experiment/node_modules/ix/asynciterable/operators/asynciterable/operators/_flatten.ts:135:66)
    at async FlattenConcurrentAsyncIterable.forEach (/Users/aviv.rozenboim/dev/ts-experiment/node_modules/ix/asynciterable/asynciterable/asynciterablex.ts:33:22)

Additional information: As far as I can tell, flatMap throws when there's less input values than it has concurrency? I don't fully understand this, but I've included a simple reproduction, and I would advise trying out different input sizes/concurrency values to get to the bottom of it.

trxcllnt commented 3 months ago

Thanks for the report. This sounds like something I forgot to account for in the implementation. Will open a PR with a fix soon.