Reactive-Extensions / RxJS

The Reactive Extensions for JavaScript
http://reactivex.io
Other
19.48k stars 2.1k forks source link

Fixing multicast() and publish().refCount() #602

Closed benlesh closed 9 years ago

benlesh commented 9 years ago

Getting in line with the JVM implementations

I was discussing the .singleInstance() operator with @benjchristensen yesterday, and he seemed shocked to discover an Observable that is returned from .publish().refCount() could not be resubscribed to after completion. I guess they saw that issue in RxJava, and opted to use a factory function as part of the multicast to provide a new Subject whenever the refCount increments up from 0 to 1.

In effect, that means that .publish().refCount() should work exactly like .singleInstance() does. Meaning it gives you a cold observable, that is hot after the first subscription, then goes cold again whent he refCount hits zero.

So I thought, "well maybe the multicast overload that takes a function will handle this"... But apparently, it doesn't return a ConnectableObservable on that code path.

Proposed fix:

I think that should solve the problem of ConnectableObservables returned from publish().refCount() being "dead" once they complete.

After that .singleInstance() could be deprecated and/or just use it to wrap .publish().refCount()

mattpodwysocki commented 9 years ago

@blesh @benjchristensen why would you be shocked that a hot observable cannot be subscribed to again? After all, you took away all guarantees of idempotency by calling publish in the first place which then shares the side effects, therefore you can never assume `range(0,3).sequenceEqual(range(0, 3).publish().refCount());

We had this very issue arise with pausable where make it a ConnectableObservable and then added pause and resume If it was a cold observable, then it would just start at the beginning after resuming, which is obviously not the right behavior you'd want.

I'd like to bring @bartdesmet into this to get his thoughts? ConnectableObservable is by far the biggest wart on Rx at the moment trying to take pure streams and make them impure by sharing those side effects.

benlesh commented 9 years ago

I don't want to put words in @benjchristensen's mouth. I said he seemed shocked. :p But he's very tall, so maybe I was misreading his facial features.

FWIW: The first time I used .publish().refCount(), I was extremely confused by the current behavior. Then I realized that publish made Observables that could not be re-subscribed. I thought it was odd, shrugged and thought, "Well, they know best" and continued. This resulted in the singleInstance operator.

I don't think we're disagreeing on idempotency, if ConnectableObservables were to be fixed in the manner I'm proposing, you still couldn't guarantee the same result twice if comparing the currently emitted sequence to a cold observable. Only cold observables would really have that feature, I think. What I'm proposing would make a ConnectableObservable into an observable that was "cold" until first subscription, then "hot" until the reference counts decremented back to zero, at which point it would be "cold" again.

The only real difference here is that it returns to a "cold" state when it's complete. In all other aspects the behavior would be identical.

If it's desirable that an Observable not allow resubscription, perhaps that should be a separate operator?

benlesh commented 9 years ago

As a general discussion point... I'm proposing that we make .publish() default to .multicast(function() { return new Subject() }) internally, and we make sure multicast is returning a ConnectableObservable. Then we take the ctor for ConnectableObservable and have it optionally accept a factory function to create it's Subject (passed from multicast).

From there, we have the ConnectableObservable track it's own instance of it's subject, and when connect is called, if it doesn't have a subject yet, we call the factory function and create it.

In refCount, when the count returns to zero, we null the subject on the instance.

It's not very invasive, and it shouldn't break anything unless there was code expecting completed/erred ConnectableObservables to be "dead". Which seems odd.

benjchristensen commented 9 years ago

Here is how RxJava works:

import java.util.concurrent.TimeUnit;

import rx.Observable;

public class RefCountExample {

    public static void main(String... args) {
        Observable<Long> interval = Observable.interval(1, TimeUnit.SECONDS).publish().refCount();

        interval.take(3).subscribe(i -> System.out.println("A: " + i));
        interval.take(3).subscribe(i -> System.out.println("B: " + i));

        try {
            Thread.sleep(4000);
        } catch (InterruptedException e) {
        }

        interval.take(3).subscribe(i -> System.out.println("C: " + i));
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
        interval.take(3).toBlocking().forEach(i -> System.out.println("D: " + i));
    }
}

This outputs:

A: 0
B: 0
A: 1
B: 1
A: 2
B: 2
C: 0
C: 1
D: 1
C: 2
D: 2
D: 3
bman654 commented 9 years ago

FWIW one of the things that makes the publish().refCount() behavior so confusing is that if you let the refcount drop to 0 before it completes, then the observable will "reset" and start from the beginning.

in other words:


var source = range(0,3).publish().refCount();
source.take(2).subscribe(...); // yields 0,1
source.take(2).subscribe(...); // yields 0,1
source.take(3).subscribe(...); // yields 0,1,2
source.take(3).subscribe(...); // yields 0,1,2
source.take(4).subscribe(...); // yields 0,1,2
source.take(4).subscribe(...); // yields nothing!!

This behavior is rather unintuitive.

benlesh commented 9 years ago

@bman654 that's not a matter of letting the refCount drop to 0, it's a matter of the underlying observable signaling completion and completing the Subject you're multicasting over. Subjects that have completed can no longer be nexted over.

benlesh commented 9 years ago

I'm closing this for now, because I think we've got some good solutions for this problem in RxJS Next.