zebulonj / callbag-subscribe

A callbag sink (listener) that connects an Observer a-la RxJS. 👜
MIT License
18 stars 5 forks source link

Support for subscribing other callbags #10

Open loreanvictor opened 3 years ago

loreanvictor commented 3 years ago

In RxJS, you can call .subscribe() on any Observer, including other Subjects for example. In callbag world, any callbag can be an observer as well, so it would make sense to add support for subscribing other callbags directly. This way, you could invoke subscribe() on callbag subjects for example, mirroring similar behavior in RxJS.

Here is a sample of how that would work. I could make a PR directly, but wanted to ensure that this is something that you do want to add before writing tests for it.

loreanvictor commented 3 years ago

@zebulonj any thoughts?

zebulonj commented 3 years ago

This slipped through my radar. Thanks for re-upping it. I'll give it some thought today.

loreanvictor commented 3 years ago

@zebulonj may I bump this again? 😅

zebulonj commented 3 years ago

Ok. I follow your inquiry. When you say "support for subscribing other callbags directly" you're referring to the subject (for example) as a sink:

import makeSubject from 'callbag-subject';
import interval from 'callbag-interval';
import pipe from 'callbag-pipe';

import subscribe from './subscribe';

const sub = makeSubject();

pipe(
  interval(1000),
  subscribe(sub)
);

I've wrestled with this on my own. Superficially, it's a nice pattern. I find myself hesitating though when considering that without intermediating logic, this suggests that not only data, but also errors and completions would be passed to the target subject. Have you thought through the consequences of that? If the source to which the subject subscribes using your proposed overloading encounters an error, what should happen to the subject? Downstream from the subject?

Have you thought about how you'd implement this overloading safely?

loreanvictor commented 3 years ago

I was thinking of treating given Callbag as a sink, which means passing down errors, termination signals, etc. This is perfectly mirroring the same behavior in RxJS, where a Subject can be treated as an Observer, and hence can be passed to .subscribe() method of any Observable. In that case, it would also receive completion signals or errors.