gcanti / fp-ts-rxjs

fp-ts bindings for RxJS
https://gcanti.github.io/fp-ts-rxjs/
MIT License
187 stars 29 forks source link

Add `evalMapAccumulate` operator #70

Open Swoorup opened 2 years ago

Swoorup commented 2 years ago

🚀 Feature request

Add something similar to fs2's evalMapAccumulate:fs2.Stream[F2,(S,O2)])

Suggested Solution

Who does this impact? Who is this for?

Allows creating Finite State Machines easily.

Possible implementation

export const evalMapAccumulate: <S>(s: S) => <O, O2>(f: (s: [S, O]) => io.IO<[S, O2]>) => (ma: Rx.Observable<O>) => Rx.Observable<[S, O2]> =
  <S>(s: S) => <O, O2>(f: (s: [S, O]) => io.IO<[S, O2]>) => (ma: Rx.Observable<O>) => {
    return pipe(
      ma,
      Rx.scan<O, [S, O.Option<O2>]>(([s1,], v) => {
        const [newState, output] = f([s1, v])()
        return [newState, O.some(output)]
      }, [s, O.none]),
      Rx.concatMap(([s, opt]) =>
        pipe(
          opt,
          O.match(() => Rx.EMPTY, (o) => Rx.of([s, o] as [S, O2]))
        )
      )
    )
  }