0no-co / wonka

🎩 A tiny but capable push & pull stream library for TypeScript and Flow
MIT License
709 stars 29 forks source link

`concatMap` omits values that `mergeMap` catches #92

Closed Pet3ris closed 4 years ago

Pet3ris commented 4 years ago

Hi There,

I've added two extensions to Wonka that potentially highlight an incompatibility between concatMap and mergeMap.

iterate A new source that represents a potentially infinite generator from a starting value and an iteration step. Analogous to Haskell's iterate (https://hackage.haskell.org/package/base-4.14.0.0/docs/Prelude.html#v:iterate) but with optional termination.

let iterate =
    (start: 'a, next: 'a => option('a)): Wonka.Types.sourceT(int) => {
  Wonka.Types.curry(sink => {
    let state: Wonka.trampolineT(option('a)) = {
      ended: false,
      looping: false,
      pulled: false,
      current: Some(start),
    };

    sink(.
      Wonka.Types.Start(
        (. signal) =>
          switch (signal, state.looping) {
          | (Pull, false) =>
            state.pulled = true;
            state.looping = true;

            while (state.pulled && !state.ended) {
              switch (state.current) {
              | Some(x) =>
                state.current = next(x);
                state.pulled = false;
                sink(. Push(x));
              | None =>
                state.ended = true;
                sink(. End);
              };
            };
          | (Pull, true) => state.pulled = true
          | (Close, _) => state.ended = true
          },
      ),
    );
  });
};

downTo An operator for getting numbers start, start - 1, ... down to a provided bound. E.g., downTo(3, 1) would produce [|3, 2, 1|] in a stream.

let downTo = (start: int, bound: int): Wonka.Types.sourceT(int) =>
  iterate(start, x => x <= bound ? None : Some(x - 1));

toArrayAsync A sink that captures all stream values in an array, wrapped up in a promise. This fixes toArray to work with asynchronous streams.

let toAsyncArray = (source: Wonka.Types.sourceT('a)): Js.Promise.t(array('a)) =>
  Js.Promise.make((~resolve, ~reject as _) => {
    let state: Wonka.toArrayStateT('a) = {
      values: Rebel.MutableQueue.make(),
      talkback: (. _) => (),
      value: None,
      ended: false,
    };

    source((. signal) =>
      switch (signal) {
      | Start(x) =>
        state.talkback = x;
        x(. Pull);
      | Push(value) =>
        Rebel.MutableQueue.add(state.values, value);
        state.talkback(. Pull);
      | End =>
        state.ended = true;
        state.talkback(. Close);
        let finalArray = Rebel.MutableQueue.toArray(state.values);
        resolve(. finalArray);
      }
    );

    ();
  });

Some tests to demonstrate how these work:

test("Wonka iterate down.", () =>
  Library.Wonkatools.downTo(3, 0)
  |> Wonka.toArray
  |> expect == [|3, 2, 1, 0|]
);

test("Wonka sync arrays dont collect", () =>
  [|1, 2, 3|]
  |> Wonka.fromArray
  |> Wonka.mergeMap((. x) => Js.Promise.resolve(x) |> Wonka.fromPromise)
  |> Wonka.toArray
  |> expect == [||]
);

testPromise("Wonka async arrays do collect", () =>
  [|1, 2, 3|]
  |> Wonka.fromArray
  |> Wonka.mergeMap((. x) => Js.Promise.resolve(x) |> Wonka.fromPromise)
  |> Library.Wonkatools.toAsyncArray
  |> Js.Promise.then_(array => array |> expect == [|1, 2, 3|] |> Js.Promise.resolve)
);

The issue

Finally, here is a description of the issue. Below is a test that times out (concatMap fails to collect any values). It times out:

Timeout - Async callback was not invoked within the 5000ms timeout specified by jest.setTimeout.Timeout - Async callback was not invoked within the 5000ms

If I replace concatMap with mergeMap, this test succeeds.

testPromise("Wonka concat problem", () =>
  Library.Wonkatools.downTo(3, 0)
  |> Wonka.concatMap((. x) => Js.Promise.resolve(x) |> Wonka.fromPromise)
  |> Library.Wonkatools.toAsyncArray
  |> Js.Promise.then_(array => array |> expect == [|3, 2, 1, 0|] |> Js.Promise.resolve)
);

Any thoughts on why concatMap is skipping these values?

Tests use bs-jest.

kitten commented 4 years ago

It was pretty tricky to make swotchMap behave well. You can see lots of compliance tests around operators and it took a while to make them all pass for this particular operator. It's possible that there's an edge case being triggered here, but I'm not quite sure which part it may be.

It could be the internal queue maybe, since the source it uses emits only a single value after a tick, but that value will immediately have a Push and an End event.

kitten commented 4 years ago

Actually, have you checked whether it's state.looping = false being missing in your source? You may want to add that: https://github.com/kitten/wonka/blob/c1108636590327f5648095d1b44fb17f9c93f0a7/src/Wonka_sources.re#L70

I used to have a built in trampoline factory. Maybe it's time to add it back 😅

Pet3ris commented 4 years ago

@kitten You're a legend - everything works!

kitten commented 4 years ago

I see! So I suppose as concatMap was waiting for the promise source to resolve, when it was pulling again the source was stuck in a looping state which didn't allow it to push anymore, while mergeMap would eagerly pull, which allowed the promises to just resolve in order. That makes sense 😅

Glad this is resolved! I should probably add iterate to the built-in sources (Great Name btw!)

Pet3ris commented 4 years ago

@kitten definitely feel free to, name taken from Haskell ;)! In terms of the API, I wonder if there's a general iterable constructor that would be useful, something like makeIterable: (unit => option('a)) => streamT('a).