gcanti / fp-ts-rxjs

fp-ts bindings for RxJS
https://gcanti.github.io/fp-ts-rxjs/
MIT License
187 stars 29 forks source link

Chain implementation causes memory leaks #27

Open raveclassic opened 4 years ago

raveclassic commented 4 years ago

Hi,

Currently chain is implemented using mergeMap which is known to not unsubscribe from the passed observable on source emit. It's recommended to use switchMap in such cases. Is mergeMap used intentionally here (for some laws to hold) or it can be replaced with switchMap?

raveclassic commented 4 years ago

@gcanti What do you think?

gcanti commented 4 years ago

Using switchMap does produce a lawful Monad / Alt instance?

raveclassic commented 4 years ago

That's what I asked :) Ok, I'll try to add some tests using fp-ts-laws. But anyway it's working correctly in my projects

raveclassic commented 4 years ago

So I've added tests for chain, all green

anilanar commented 3 years ago

There's a Scala implementation of Rx observables here: https://github.com/OlivierBlanvillain/monadic-html It uses switchMap for bind/chain/flatMap. The author also provides a proof for monadic laws but I don't know if the proof is correct.

anilanar commented 3 years ago

Also the author implements Semigroup (by proving only associativity unlike Alt which needs distributivity) using merge.

OliverJAsh commented 3 years ago

I came here because I wanted to use ObservableEither.chain but I need cancellation (e.g. the Observable should be chained via switchMap).

On the ObservableEither instance specifically, it would be good to have the option over which merge strategy to use (mergeMap, switchMap, concatMap, exhaustMap). Perhaps we could provide operators for each of these?

waynevanson commented 3 years ago

@OliverJAsh I don't think we need to implement them specifically, unless they're not pipeable. We can just use them in the pipe operator as they are.

anthonyjoeseph commented 3 years ago

They should be pipeable, I think the tricky thing is that they aren't transformed with Either, so the types don't quite match up:

const b = pipe(
  from([1, 2, 3, 4]),
  _.rightObservable, 
  switchMap((e: E.Either<unknown, number>) => ...),   // only accepts an either as input
)
import * as E from 'fp-ts/Either'

const b = pipe(
  from([1, 2, 3, 4]),
  _.rightObservable, 
  switchMap(E.chain((a): E.Either<unknown, number> => ...)),   // won't accept an ObservableEither as Output
)

I believe for the proper behavior you have to re-implement EitherT.chain:

const b = pipe(
  from([1, 2, 3, 4]),
  _.rightObservable, 
  switchMap((e) => (E.isLeft(e) ? of(E.left(e.left)) : ... )),
)

I agree with @OliverJAsh that operators would be helpful, or maybe different applicative instances (similar to taskSeq) (which has been discussed before)

OliverJAsh commented 2 years ago

I've been avoiding chain and instead using the RxJS operators (mergeMap/switchMap/concatMap/exhaustMap) so that I'm forced to make an explicit decision about which type of chaining I want to use. However, I would like to start using convenience functions such as Observable.bind/ObservableOption.fold and these rely on Observable.chain under the hood. For this reason I'm starting to think that we need multiple versions of these functions which rely on chain, e.g. Observable.bindSwitch/ObservableOption.foldSwitch which would use switchMap under the hood. We would need a better naming convention though. 🤔

mlegenhausen commented 2 years ago

@OliverJAsh during my vacations (where else 😉) I had the idea of splitting up the Monad instances in four different packages Merge, Concat, Switch and Exhaust.

These packages will each export their own chain function and all depended functions. This could look like that:

import * as R from 'fp-ts-rxjs/Observable'
import * as Rm from 'fp-ts-rxjs/Observable/Merge'
import * as Rc from 'fp-ts-rxjs/Observable/Concat'
import * as Rs from 'fp-ts-rxjs/Observable/Switch'
import * as Re from 'fp-ts-rxjs/Observable/Exhaust'

pipe(
  Rm.bind('a', () => R.of(1)), // uses mergeMap
  Rs.bind('b', () => R.of(2)), // uses switchMap
  Re.bind('c', () => R.of(3)) // uses exhaustMap
  Rc.bind('d', () => R.of(4)) // uses concatMap
)

What do you think?

OliverJAsh commented 2 years ago

That's a good idea!

mlegenhausen commented 2 years ago

I could also image that the Merge instance will contain a getMonad(concurrency: number) function that generates Concat instance by using getMonad(1) and the default Merge instances can be defined by getMonad(Infinity).