gcanti / fp-ts-rxjs

fp-ts bindings for RxJS
https://gcanti.github.io/fp-ts-rxjs/
MIT License
187 stars 29 forks source link

Add `liftEither` for `ObservableEither` #39

Open anthonyjoeseph opened 3 years ago

anthonyjoeseph commented 3 years ago

I often end up needing a way to use OperatorFunction's on the right case of ObservableEither's

E.g. filter or takeUntil for ObservableEither. Most operators, really

const liftEither = <E, A, B>(
  f: (a: r.Observable<A>) => r.Observable<B>
): ((a: OBE.ObservableEither<E, A>) => OBE.ObservableEither<E, B>) =>
  flow(OB.separate, ({left, right}) =>
    r.merge(pipe(left, ro.map(E.left)), pipe(right, f, ro.map(E.right)))
  )

I can make a P.R. if desired, but I'm not sure if this is the best way to do this.

anthonyjoeseph commented 3 years ago

Here's a simple example usage

import {pipe} from 'fp-ts/pipeable'
import * as r from 'rxjs'
import * as E from 'fp-ts/Either'
import * as OB from 'fp-ts-rxjs/lib/Observable'
import * as OBE from 'fp-ts-rxjs/lib/ObservableEither'

const isOdd = (a: number): boolean => a % 2 !== 0

const fa: OBE.ObservableEither<never, number> = pipe(
  r.from([1, 2, 3]),
  OBE.rightObservable
)
const fb: OBE.ObservableEither<never, number> = pipe(
  fa,
  liftEither(OB.filter(isOdd))
)
// fb will emit E.right(1), E.right(3)

(edit: a bit more terse)

anthonyjoeseph commented 3 years ago

Just a note - this solution is not ideal, because the input observable is subscribed to twice due to the r.merge call. The redundant emissions are filtered out, but it can make debugging difficult

anthonyjoeseph commented 3 years ago

Here's a solution without redundant subscriptions:

export const liftEither = <Err, A, B>(
  f: (a: r.Observable<A>) => r.Observable<B>
) => (obs: OBE.ObservableEither<Err, A>): OBE.ObservableEither<Err, B> => {
  const subjA = new r.Subject<A>()
  const subjB = new r.Subject<Err>()
  obs.subscribe(
    E.fold<Err, A, void>(
      (err) => subjB.next(err),
      (val) => subjA.next(val)
    )
  )
  return r.merge(pipe(subjA, f, OB.map(E.right)), pipe(subjB, OB.map(E.left)))
}

(edit: a little more terse)