gcanti / fp-ts-rxjs

fp-ts bindings for RxJS
https://gcanti.github.io/fp-ts-rxjs/
MIT License
187 stars 29 forks source link

Perform a side effect for every emission on the source Observable #54

Open siuvdlec opened 3 years ago

siuvdlec commented 3 years ago

🚀 Feature request

Add two new type class members (something like chainFirstTaskK, chainFirstIOK) into Observable module to perform a side effect for every emission without creating new observable

Current Behavior

import * as T from "fp-ts/Task";
import { flow, pipe } from "fp-ts/function";
import * as R from "fp-ts-rxjs/Observable";

declare const fetch: () => T.Task<void>

const r = pipe(
  source$,
  R.chainFirst(flow(fetch, R.fromTask))
)

Desired Behavior

import * as T from "fp-ts/Task";
import { flow, pipe } from "fp-ts/function";
import * as R from "fp-ts-rxjs/Observable";

declare const fetch: () => T.Task<void>

const r = pipe(
  source$,
  R.chainFirstTask(fetch)
)

Suggested Solution

Maybe the tap operator can be used
I'm not sure if the names are congruent (chainFirstTaskK, chainFirstIOK)

const chainFirstTaskK = <A, B>(f: (a: A) => Task<B>) => (ma: Observable<A>): Observable<A> =>
  ma.pipe(rxTap(a => f(a)()))

const chainFirstIOK = <A, B>(f: (a: A) => I.IO<B>) => (ma: Observable<A>): Observable<A> =>
  ma.pipe(rxTap(a => f(a)()))

Who does this impact? Who is this for?

This is useful for me, for example when I have to send an analytics event when something happens or update URL after a filter submission

siuvdlec commented 3 years ago

Yes, it should be the same that I have written in Current Behavior. The goal of my proposal is to not create a new observable.

mlegenhausen commented 3 years ago

Yes, it should be the same that I have written in Current Behavior.

Sorry my fault

The goal of my proposal is to not create a new observable.

Still don't understand what is the problem with creating a new observable? It's just a wrapper around the promise returned by the task?

siuvdlec commented 3 years ago

It can be an utility to improve dx and useful not to create something that is unnecessary

mlegenhausen commented 3 years ago

Maybe I understand now what you want. You want a fire and forget function that does not track if the e. g. the event you want to track was successfully transmitted.

I don't think that we can add something like this to the library cause it would allow operations where we are unable to track possible errors that can occur in the these fire and forget side effects.

Even in rxjs this would be a bad design. I would recommend a separate observable that handles your tracking of events.

const events$ = new Subject<string>()

events$.subscribe(event => {
  // Transmit here and handle errors here
})

const r = pipe(
  source$,
  // Save sync operation
  rxTap(() => events$.next('clicked the button'))
)
siuvdlec commented 3 years ago

I partially agree, because Task andIO represent a computation that never fails

mlegenhausen commented 3 years ago

@siuvdlec you have a point there

Ok lets take a look at your functions. We can see that B is irrelevant in both cases. The implementation is equal and can be reduced to one. chainFirstTaskK with a tap implementation can not exist or we can not name it like so, cause we do not track the state of the Promise. I would recommend here to wrap Task in an IO<void> to make clear that you are not interested in the result. Then we could define the following.

const chainFirstIOK = <A>(f: (a: A) => IO<void>): ((ma: Observable<A>): Observable<A>) =>
  rxTap(a => f(a)())

// or with chainFirst (recommended)
const chainFirstIOK = <A>(f: (a: A) => IO<void>): ((ma: Observable<A>): Observable<A>) =>
  chainFirst(a => fromIOK(f(a)))

const chainFirstTaskK = <A>(f: (a: A) => Task<void>): ((ma: Observable<A>): Observable<A>) =>
  chainFirst(a => fromTaskK(f(a)))

When we start adding these functions I would consider to define them also in the parent library fp-ts too. They already exist for chain like chainIOK but not for chainFirst.