ReactiveX / rxjs

A reactive programming library for JavaScript
https://rxjs.dev
Apache License 2.0
30.64k stars 3k forks source link

publishBehavior and publishReplay semantics when completed #453

Closed staltz closed 8 years ago

staltz commented 8 years ago

Continuing the discussion that started here: https://github.com/ReactiveX/RxJS/pull/235/files

RxJS Legacy (v4) passes these tests:

var results1 = [];
var results2 = [];
var subscriptions = 0;

var source = Rx.Observable.create(function (observer) {
  subscriptions++;
  observer.onNext(1);
  observer.onNext(2);
  observer.onNext(3);
  observer.onNext(4);
  observer.onCompleted();
});

var hot = source.shareValue(0);

hot.subscribe(x => results1.push(x));

expect(results1).toBe([0,1,2,3,4]);
expect(results2).toBe([]);

hot.subscribe(x => results2.push(x));

expect(results1).toBe([0,1,2,3,4]);
expect(results2).toBe([]);
expect(subscriptions).toBe(2);

and

var results1 = [];
var results2 = [];
var subscriptions = 0;

var source = Rx.Observable.create(function (observer) {
  subscriptions++;
  observer.onNext(1);
  observer.onNext(2);
  observer.onNext(3);
  observer.onNext(4);
  observer.onCompleted();
});

var hot = source.shareReplay(2);

hot.subscribe(x => results1.push(x));

expect(results1).toBe([1,2,3,4]);
expect(results2).toBe([]);

hot.subscribe(x => results2.push(x));

expect(results1).toBe([1,2,3,4]);
expect(results2).toBe([3,4]);
expect(subscriptions).toBe(2);

Yet RxJS Next behaves basically the opposite way:

Failures:
1) Observable.prototype.publishBehavior() should not emit next events to observer after completed
  Message:
    Expected [ 0 ] to equal [  ].
  Stack:
    Error: Expected [ 0 ] to equal [  ].
      at Object.<anonymous> (./RxJSNext/spec/operators/publishBehavior-spec.js:65:21)
2) Observable.prototype.publishReplay() should emit replayed events to observer after completed
  Message:
    Expected [  ] to equal [ 3, 4 ].
  Stack:
    Error: Expected [  ] to equal [ 3, 4 ].
      at Object.<anonymous> (./RxJSNext/spec/operators/publishReplay-spec.js:98:21)
145 specs, 2 failures
Finished in 12.951 seconds

Because in RxJS legacy there is

    ConnectableObservable.prototype._subscribe = function (o) {
      return this._subject.subscribe(o);
    };

While in RxJS Next:

class ConnectableObservable<T> extends Observable<T> {
  subject: Subject<T>;

  // ...

  _subscribe(subscriber) {
    return this._getSubject().subscribe(subscriber);
  }

  _getSubject() {
    const subject = this.subject;
    if (subject && !subject.isUnsubscribed) {
      return subject;
    }
    // Subject will be recreated
    return (this.subject = this.subjectFactory());
  }

  // ...
}

In my opinion, the new behavior in RxJS Next is illogical at first sight, and a potential source for a lot of developer confusion. Besides that, it differs from RxJS legacy features, making migration a potential headache. I think the semantics argument (with regard to subscriptions after complete(), publishReplay should behave just like a ReplaySubject and publishBehavior should behave just like a BehaviorSubject) is straightforward and also aligned with traditional Rx, such as in Rx.NET: check Lee Campbells book on this part.

benlesh commented 8 years ago

In RxJava this behaves as so:

   ConnectableObservable<Integer> conn = just(1).doOnSubscribe(() -> System.out.println("subscribed")).publish();

        conn.subscribe(System.out::println);
        conn.subscribe(System.out::println);

        conn.connect();

        conn.subscribe(System.out::println);
        conn.subscribe(System.out::println);

        conn.connect();    

outputs:

subscribed
1
1
subscribed
1
1

however in RxJS 4 the same thing:

let s = Observable.create(observer => {
  console.log('subscribed');
  observer.onNext(1);
  observer.onCompleted();
})
  .publish();

s.subscribe(::console.log);
s.subscribe(::console.log);

s.connect();

s.subscribe(::console.log);
s.subscribe(::console.log);

s.connect();

outputs only:

subscribed
1
1

RxJS 5(Next) was designed to match the semantics of RxJava, and also to remove the potential footgun from people passing the same subject into two multicast calls.

staltz commented 8 years ago

Isn't multicast less commonly used than the more popular share variants? And nothing actually stops people from passing the same subject to multicast. The point being, if you're using multicast, you should know what you're doing. If you're using shareFoo or publishFoo, you have a vague yet safe idea of what you're doing.

benlesh commented 8 years ago

The the problem RxJS 5 is trying to solve here is that ConnectableObservables should be "reconnectable", and refCount should return an observable that is cold until subscribed to, then hot until all subscriptions have ended, then cold again.

staltz commented 8 years ago

Why is the lack of reconnectability a problem?

Also, we're talking about publishFoo so far, not yet about refCount. I can understand the refCount argument about from cold to hot and back to cold, but what we have here is simply one connect on a publishReplay observable which behaves oddly.

staltz commented 8 years ago

people passing the same subject into two multicast calls.

I recall Erik Meijer recommending to "make the correct patterns easy and make the wrong patterns possible", so it's easy to detect bad patterns in code reviews. Passing directly a subject to multiple multicasts is easy to catch in code reviews.

benlesh commented 8 years ago

Why is the lack of reconnectability a problem?

If you want to model anything you should be able to reconnect to, say a WebSocket, it's a problem.

Also, we're talking about publishFoo so far

The reason I brought up refCount is it's the behavior of RxJS 2/3/4 regarding ConnectableObservables that makes this impossible. FWIW, I don't think any other flavor of Rx behaves like RxJS in this manner.

I'm not sure what you want is to make ConnectableObservables broken after the the underlying subject has unsubscribed. I think what you want is a specific behavior around publishBehavior, so let's talk about that.

staltz commented 8 years ago

I'm not sure what you want is to make ConnectableObservables broken after the the underlying subject has unsubscribed.

I guess you meant "subject has completed". That's what I mean at least. I think the ConnectableObservable could still be connected. Maybe I should explain what should happen in marble diagrams:

code:       -----C--------------------C?
behaviorsub:     --1-2----3-|
subscriber1: -^----1-2----3-|
subscriber2:                  (^|)
subscriber3: ---------(^2)3-|

Capital C is the connect() moment, ^ is a subscribe moment, and anything between parentheses happens synchronously. Notice that I'm not talking about what happens in subsequent connects, but simply what happens for multiple subscribers related to one connect(). If you assume every BehaviorSubject unsubscribes its subscribers when it completes, maybe we should question that assumption. In RxJS 4 it doesn't. Subscribers must take care of themselves.

If you want to model anything you should be able to reconnect to, say a WebSocket, it's a problem.

Can't this approach below model that?

var reconnectionMoments = // some infinite Observable

reconnectionMoments
  .switchMap(moment => Rx.DOM.fromWebSocket('ws://echo.websockets.org', null))
  .subscribe(/* ... */)
staltz commented 8 years ago

I had a typo/mistake in the marble diagram. Second subscriber should see (^|). Updated now

benlesh commented 8 years ago

Sure that's the behavior of the BehaviorSubject, but ConnectableObservables need to be reconnectable, which means it'll need to recreate the underlying BehaviorSubject.

Can't this approach below model that?

That actually doesn't help, because fromWebSocket returns a Subject, and you'd likely need access to the observer side of the thing.

It's better to be able to say: Rx.DOM.fromWebsocket('ws://echo.websockets.org', null).share() and see a behavior that allowed you to subscribe multiple times without recreating the websocket, then unsubscribe from everyone of those and have the websocket automatically close (because of disposal)... and then be able to reconnect to that websocket simply by subscribing again.

benlesh commented 8 years ago

Question: Why is it important to you that ConnectableObservables never allow reconnection? What use case does that serve?

benlesh commented 8 years ago

I'm not sure how anything passed in those tests above, though.

expect(results1).toBe([0,1,2,3,4]);

would never assert true, because results1 and [0,1,2,3,4] are different instances. You'd need to use toEqual rather than toBe. Unless something has changed in Jasmine.

benlesh commented 8 years ago

... it seems like your issue is specifically with ReplaySubject. Which is the only subject that will actually emit values after it's been disposed/completed/errored.

It seems like the ReplaySubject semantics in RxJS 4/3/2 are actually broken. A Subject that has errored should still emit values. A subject that has completed shouldn't still emit values:

In RxJS 4:

var sub = new Rx.ReplaySubject(2);
sub.subscribe(::console.log, ::console.error);

Rx.Observable.range(0, 3).concat(Rx.Observable.throw('bad')).subscribe(sub);

sub.subscribe(::console.log, ::console.error);

outputs:

0
1
2
error 'bad'
0
1
2
error 'bad'

That doesn't seem right, a Subject that's in error can still emit values?

staltz commented 8 years ago

Sure that's the behavior of the BehaviorSubject, but ConnectableObservables need to be reconnectable, which means it'll need to recreate the underlying BehaviorSubject.

Reconnectability is not my gripe. It's about what happens in the space between the subject's complete and the next connect. In particular, we are discussing what should happen to subscriber2 in this case below:

code:       -----C--------------------C?
behaviorsub:     --1-2----3-|
subscriber1: -^----1-2----3-|
subscriber2:                  ^??
subscriber3: ---------(^2)3-|

I'm saying subscriber2 should see (^|) because it should refer to its latest Observable. For two reasons: what an observer sees after onComplete is important, and an Observer cannot know what comes next. Observers of BehaviorSubject expect to see nothing (that's the BehaviorSubject contract/guarantee), and Observers of ReplaySubject expect to see replayed values. These are guarentees relied on. Second reason about "Observer cannot know what comes next": if subscriber2 is supposed to prepare for the next connection, subscriber2 cannot know if there will eventually be an incoming connect() or not. In the meanwhile, it is certain that subscriber2 could get information from the previous execution of the subject. To which side should we pull subscriber2 to? To refer to the previous or to prepare for the next? Might be a matter of preferred usage style. I have never used or seen the practical need to use connect() multiple times.

That actually doesn't help, because fromWebSocket returns a Subject, and you'd likely need access to the observer side of the thing.

Well then this was an unfortunate example to talk about, because a ConnectableObservable is not an Observer.

and see a behavior that allowed you to subscribe multiple times without recreating the websocket, then unsubscribe from everyone of those and have the websocket automatically close (because of disposal)... and then be able to reconnect to that websocket simply by subscribing again.

This behavior can be built with the switchMap and a publish() (with a one-time connect()), by modifying reconnectionMoments making it emit either true or false, switchMapping to the websocket Observable if true, switchMapping to Observable.never if false. Etc.

Question: Why is it important to you that ConnectableObservables never allow reconnection? What use case does that serve?

Because (refer to the marble diagram above) it serves the use case of giving me a guarantee of what subscribers see after the subject has completed will follow the same conventions as subscribing to an actual subject. These guarantees might be particularly important in Replay case, for instance. I have a cold Observable, I want multiple subscribers to see the same events simultaneously, and after the cold Observable completes, I want it to replay its values to late subscribers. Because that's what ReplaySubjects do. And in case it's a publishBehavior, I do not want it to emit any value after the cold Observable has completed, because that's the BehaviorSubject contract. I do not want to get the initialValue for the next upcoming connect(). And above all, I never had a need for multiple connect(), which is one of the uglier parts of Rx, since it's purely imperative.

expect(results1).toBe([0,1,2,3,4]);

That was wrong indeed. Those tests weren't actually jasmine tests. I just wrote pseudocode. But I'm sure that with console.log results1 is indeed [0,1,2,3,4].

That doesn't seem right, a Subject that's in error can still emit values?

It is correct for two reasons: from the perspective of an Observer, the Observable contract is preserved. Second reason because it's a ReplaySubject and that's what they do: they replay the values and the error.

Your example above of a reconnectable Observable would also "violate" this contract because

let s = Observable.create(observer => {
  console.log('subscribed');
  observer.onNext(1);
  observer.onCompleted();
})
  .publish();

s.subscribe(::console.log);
s.subscribe(::console.log);

s.connect();

s.subscribe(::console.log);
s.subscribe(::console.log);

s.connect();

should emit

subscribed
1
1
subscribed
1
1

And after the first instance, the observable completed, so "why can an Observable which completed still emit values?" Isn't that a Replay feature? ;)

benlesh commented 8 years ago

Well then this was an unfortunate example to talk about, because a ConnectableObservable is not an Observer.

Ah... a side effect of the implementation, I was thinking that this used @trxcllnt's lift override on Subject, which would indeed return a BidirectionalSubject, but it wouldn't for publish()... hmmm...

benlesh commented 8 years ago

In speaking with some RxJava folks, it might be that we just need to implement shareValue and shareReplay without Subjects, and rather just create "1 Producer, Many Consumer" observables for them to return directly. It would probably perform better (fewer object allocations) and it might help us "trim the fat" with our publish operators, as it were.

staltz commented 8 years ago

and rather just create "1 Producer, Many Consumer" observables

That would be a very bold move, would be a big shift in Rx's foundation with regard to cold vs hot.

benlesh commented 8 years ago

would be a big shift

Nah... I'm not talking "all observables" ... just the ones returned from shareReplay or shareValue for example. The Observable returned from refCount is technically a "one producer, many consumers" observable. It's just there's a Subject under the hood. I think we can implement this functionality and skip that completely. Less object allocation, fewer subscriptions.

headinthebox commented 8 years ago

Wow, this is a long thread ;-) But I think that I am with @staltz on this

Reconnectability is not my gripe. It's about what happens in the space between the subject's complete and the next connect.

Until the next connect any "publish" variant should behave like the underlying subject.

benlesh commented 8 years ago

@headinthebox, I don't disagree with that at all. The thing that @staltz didn't like is if the connectable observable completes, at next connect it recycles the underlying subject. This is how it works in RxJava as well, essentially. RxJS 4/3/2 would instead try to use the same underlying subject, meaning the connectable observable was "single use only". @staltz had come to depend on this behavior with regards to replay subjects, I think. In all discussions with @benjchristensen, @jhusian and many others, it was decided this behavior was "broken".

Frikki commented 8 years ago

This is unexpected new behavior. Could anybody clarify how I can prevent that replay values are dropped once complete? I need to be able to replay what has happened in past. Usually, this would have been a simple publishReplay(), now what?

staltz commented 8 years ago

@blesh I started the process of migrating Cycle.js to RxJS Next with the help of the community, and we hit our first big problem with this new behavior of connectable Observables. For my purposes, ConnectableObservable is "broken" here. Please consider that a lot of existing codebases out there might depend on how ConnectableObservable works.

Our case goes like this:

To me, this is a really really big issue, and honestly, I'd consider not using RxJS Next at all in the Cycle.js community. Can you please consider alternatives?

For instance:

The thing that @staltz didn't like is if the collectable observable completes, at next connect it recycles the underlying subject.

Actually it's not recycling at the next connect(), it's recycling on complete. This happens in order in time: (1) first connect(), (2) source completes, (3) second connect(). Recycling is happening on moment (2), not on moment (3). I'd be perfectly ok with recycling on (3). But with it on (2), it goes against BehaviorSubject and ReplaySubject semantics after completed (therefore unintuitive, we're opening possibilities to confuse developers), and also blocks me from accomplishing what I need to accomplish.

The other thing I'd propose is to look for different ways of achieving "reconnection". In my experience I have never had to do multiple calls to connect(). Doing complicated connect() multiple times is IMHO an anti-pattern.

Why can't this approach work for reconnecting? I suggested it above but you didn't comment on it.

and see a behavior that allowed you to subscribe multiple times without recreating the websocket, then unsubscribe from everyone of those and have the websocket automatically close (because of disposal)... and then be able to reconnect to that websocket simply by subscribing again.

This behavior can be built with the switchMap and a publish() (with a one-time connect()), by modifying reconnectionMoments making it emit either true or false, switchMapping to the websocket Observable if true, switchMapping to Observable.never if false. Etc.

Please.

mattpodwysocki commented 8 years ago

@blesh I'm also with @staltz and @headinthebox on this one. Changing the semantics now would be really confusing and not bring much benefit.

staltz commented 8 years ago

For example this test fails (it timeouts):

  it('publishReplay(1) should replay values just after connect() is called', function (done) {
    var obs = Observable.of('foo').publishReplay(1);
    obs.connect();
    obs.subscribe(
      function (x) { expect(x).toBe('foo'); },
      done.fail,
      done
    );
  });
Frikki commented 8 years ago

Is there any particular reason why this issue hasn’t been labeled discussion? Is it not up for discussion? Have the final words been said and the decision is final?

It seems that those, e.g., @benjchristensen, @jhusain et al, who initially made the decision of this new alignment with RxJava are not even participating here.

benlesh commented 8 years ago

Is there any particular reason why this issue hasn’t been labeled discussion?

Sorry... I didn't label it. :\ heh

It seems that those, e.g., @benjchristensen, @jhusain et al, who initially made the decision of this new alignment with RxJava are not even participating here.

FWIW: I'm also one of those people that made the decision, from discussion with several others really early on in the development of this library who include @headinthebox, @abersnaze, @stealthcoder and @mattpodwysocki... There were changes made around multicast to help prevent certain antipatterns (like reusing the same Subject in two multicast calls) as well as facilitate hot observables being resubscribable. The old behavior was quirky and not friendly to new developers in particular.

However, the side-effect of this change is that shareReplay and shareBehavior now act differently when the refCount returns to zero.

Frikki commented 8 years ago

@blesh Thanks for the info.

Frikki commented 8 years ago

@blesh And how do you see the issue now, given @staltz’s comments above (alongside @headinthebox’s and @mattpodwysocki’s)?

benlesh commented 8 years ago

My stance is unchanged. The behavior of having shared (aka published, refCounted) observables be unusable once they've errored or completed solely exists in RxJS, afaik.

Imagine some error-prone scarce resource, like a Web Socket over a mobile connection:

var webSocketOfNumbers = getWebSocketObservable('ws://streamofnumbers.org/or/whatever').share();

var oddNumbers = webSocketOfNumbers.filter(n => n % 0 === 1)::retryConnection();
var evenNumbers = webSocketOfNumbers.filter(n => n % 0 === 0)::retryConnection();
var fiveNumbers = webSocketOfNumbers.filter(n => n % 5 === 0)::retryConnection();

// a shared retry operator.
function retryConnection() {
  return this.retryWhen(errors => errors.switchMap(err => 
    navigator.onLine ? Observable.timer(3000) : Observable.fromEvent(window, 'online').first());
}

In RxJS 4, this plainly will NOT work. Your users will walk out of WiFi range and boom, connection lost... but WORSE... because of the architecture of multicast in RxJS 4, it will simply just never reconnect without error. Not because it can't reconnect, but because the observables you're using your custom retry operator on are inexplicably not reusable.

If we want RxJS to be used by the masses and not just Rx "experts", this behavior needed to change.

benlesh commented 8 years ago

Now, I understand the concern about shareReplay and shareBehavior changing. Just not share.

I think it's worth investigating what the most ergonomic behavior is for these operators.

I'd also like to add that it's my opinion that depending on the behavior I displayed above is a kin to depending on a bug in a framework.

I'm happy to change my mind on the issue, of course, if I can be shown how it's desireable to be unable to retry a shared observable, as well as how it's desirable to have shared observables become unusable after completion. But given how one of the major selling points of Observable, over say Promise, is that it can be retried, I don't think that is the behavior we want.

trxcllnt commented 8 years ago

@staltz why don't you use multicast with a singleton ReplaySubject? That's the old behavior, right?

staltz commented 8 years ago

@blesh I don't contest the need for reusable shared Observables.

I'm happy to change my mind on the issue, of course, if I can be shown how it's desireable to be unable to retry a shared observable, as well as how it's desirable to have shared observables become unusable after completion.

I don't think it's desirable to be unable to retry a shared observable. I don't think it's desirable to have shared observables become unusable after completion.

Having that settled, can we focus the discussion on this? "What should happen after the underlying subject completes and before it's reconnected?"

As I previously mentioned:

This happens in order in time: (1) first connect(), (2) source completes, (3) second connect(). Recycling is happening on moment (2), not on moment (3). I'd be perfectly ok with recycling on (3).

However, (3) is currently not actually possible without some rearchitecturing, because a Subject is a subclass of Subscriber, and hence the Subject is disposed when the Subject completes, and that's how we lose post-completion Subject semantics.

This subject.isUnsubscribed in ConnectableObservable here is a problem:

  _getSubject() {
    const subject = this.subject;
    if (subject && !subject.isUnsubscribed) {
      return subject;
    }
    return (this.subject = this.subjectFactory());
  }

Unless I'm naive and can't foresee obstacles yet, I believe we can restore post-completion semantics of shared observables as happens in RxJS 4 (which are important features for me, Erik, and Matt), while at the same time supporting reusable/reconnectable shared observables (important to Ben and Netflix and others). How? Probably by rethinking whether Subject should be a subclass of Subscriber or not. To be honest, why can't it just be an Observer?

I can see that Subjects might need to be passed around as subscribers, like in here

    subscription = source.subscribe(this._getSubject());

But I think subscribe also supports receiving just a non-Subscriber Observer also.

@staltz why don't you use multicast with a singleton ReplaySubject? That's the old behavior, right?

It would work. But manually creating the ReplaySubject and using multicast to put it in the factory as a closure would not be as ergonomic as publishReplay(1).

benlesh commented 8 years ago

@staltz Just so I understand, you're suggesting that we allow Subject to next values after completion? (Because that's what subclassing Subscriber disallows).

I think that Subjects are supposed to be single use. Once they're done, you can't push new values through them. I believe this is true for all strains of Rx.

staltz commented 8 years ago

you're suggesting that we allow Subject to next values after completion?

ReplaySubjects do that by design.

staltz commented 8 years ago

But also what's very important is: a subscribe to a ConnectableObservable using BehaviorSubject once that BehaviorSubject has completed should not trigger the recreation of a new BehaviorSubject unless I explicitly say connect() a second time (or that it's done for me automatically in connectWhen()/trxcllntControl()).

benlesh commented 8 years ago

ReplaySubjects do that by design.

ReplaySubjects will only replay the previous values after completion or error, they won't allow you to next new values through.

benlesh commented 8 years ago

Let's identify a few things, because I think the position is that both RxJS 4 and RxJS 5 have this wrong in some ways.

  1. What is the desired behavior of shareBehavior on resubscription after completion or error? a. what about on retry or replay?
  2. What is the desired behavior of shareReplay on resubscription after completion or error? a. what about on retry or replay?
staltz commented 8 years ago

@blesh: ReplaySubjects will only replay the previous values after completion or error, they won't allow you to next new values through.

Yes, that's what I meant. As an Observable, ReplaySubject can next values to its subscribers even after it completes. But as an Observer, ReplaySubject will do nothing when next is given to it.

So to clarify this:

@blesh: Just so I understand, you're suggesting that we allow Subject to next values after completion? (Because that's what subclassing Subscriber disallows).

No, I'm not suggesting that. I was just suggesting that we make Subject an Observer and not a Subscriber. If we need to disallow new next values to the Subject as an Observer, we can add that logic directly into Subject, we don't need to use inheritance from Subscriber.

It's the "Subject unsubscribes itself when it terminates" that brings complications because its consequence is "Subject is recreated immediately when it terminates".

@staltz: Unless I'm naive and can't foresee obstacles yet, I believe we can restore post-completion semantics of shared observables as happens in RxJS 4, while at the same time supporting reusable/reconnectable shared observables

@blesh: Let's identify a few things, because I think the position is that both RxJS 4 and RxJS 5 have this wrong in some ways.

After giving thought to all this, I think we are in the position that those two requirements "keep subject post-completion semantics for shared observables" and "allow reconnectable shared observables" are working against each other. Because refCount does automatic connect, a subscription to a shared observable that completed will trigger a new connect, and we want different things for that moment. We need to choose either one of these sides.

I don't think we can find a compromise between these two, but here goes nothing:

  1. What is the desired behavior of shareBehavior on resubscription after completion or error? a. what about on retry or replay?

PS: I think you meant "what about on retry or repeat?"

  it('shareBehavior(0) should emit completed when subscribed after completed', function () {
    var source =     cold(   '-1-2-3----4-|   ');
    var replayed = source.shareBehavior('0');
    var subscriber1 = hot('   a|              ').mergeMapTo(replayed);
    var expected1   =     '---01-2-3----4-|   ';
    var subscriber2 = hot('         b|        ').mergeMapTo(replayed);
    var expected2   =     '---------3---4-|   ';
    var subscriber3 = hot('                 c|').mergeMapTo(replayed);
    var expected3   =     '-----------------| ';

    expectObservable(subscriber1).toBe(expected1);
    expectObservable(subscriber2).toBe(expected2);
    expectObservable(subscriber3).toBe(expected3);
  });

  it('shareBehavior(0).repeat(3) should emit completed when subscribed after completed', function () {
    var source =     cold(   '-1-2-3----4-|   ');
    var replayed = source.shareBehavior('0').repeat(3);
    var subscriber1 = hot('   a|              ').mergeMapTo(replayed);
    var expected1   =     '---01-2-3----4-|   ';
    var subscriber2 = hot('         b|        ').mergeMapTo(replayed);
    var expected2   =     '---------3---4-|   ';
    var subscriber3 = hot('                 c|').mergeMapTo(replayed);
    var expected3   =     '-----------------| ';

    expectObservable(subscriber1).toBe(expected1);
    expectObservable(subscriber2).toBe(expected2);
    expectObservable(subscriber3).toBe(expected3);
  });

  it('shareBehavior(0).retry(1) should restart the source when subscribed after completed', function () {
    var source =     cold(   '-1-2-3----4-#                ');
    var replayed = source.shareBehavior('0').retry(1);
    var subscriber1 = hot('   a|                           ').mergeMapTo(replayed);
    var expected1   =     '---01-2-3----4--1-2-3----4-#    ';
    var subscriber2 = hot('         b|                     ').mergeMapTo(replayed);
    var expected2   =     '---------3---4--1-2-3----4-#    ';
    var subscriber3 = hot('                              c|').mergeMapTo(replayed);
    var expected3   =     '------------------------------# ';

    expectObservable(subscriber1).toBe(expected1);
    expectObservable(subscriber2).toBe(expected2);
    expectObservable(subscriber3).toBe(expected3);
  });
  1. What is the desired behavior of shareReplay on resubscription after completion or error? a. what about on retry or repeat?
  // This works in RxJS 4
  it('shareReplay(2) should replay previous 2 values when subscribed after completed', function () {
    var source =     cold(   '-1-2-3----4-|      ');
    var replayed = source.shareReplay(2);
    var subscriber1 = hot('   a|                 ').mergeMapTo(replayed);
    var expected1   =     '----1-2-3----4-|      ';
    var subscriber2 = hot('         b|           ').mergeMapTo(replayed);
    var expected2   =     '---------(23)4-|      ';
    var subscriber3 = hot('                 c|   ').mergeMapTo(replayed);
    var expected3   =     '-----------------(34|)';

    expectObservable(subscriber1).toBe(expected1);
    expectObservable(subscriber2).toBe(expected2);
    expectObservable(subscriber3).toBe(expected3);
  });

  // This works in RxJS 4
  it('shareReplay(2).repeat(3) should replay previous 2 values 3 times when ' +
  'subscribed after completed', function () {
    var source =     cold(   '-1-2-3----4-|      ');
    var replayed = source.shareReplay(2).repeat(3);
    var subscriber1 = hot('   a|                 ').mergeMapTo(replayed);
    var expected1   =     '----1-2-3----4-(3434|)';
    var subscriber2 = hot('         b|           ').mergeMapTo(replayed);
    var expected2   =     '---------(23)4-(3434|)';
    var subscriber3 = hot('                 c|   ').mergeMapTo(replayed);
    var expected3   =     '-----------------(343434|)';

    expectObservable(subscriber1).toBe(expected1);
    expectObservable(subscriber2).toBe(expected2);
    expectObservable(subscriber3).toBe(expected3);
  });

  // This DOES NOT work in RxJS 4
  it('shareReplay(2).retry(1) should restart the source when subscribed after completed', function () {
    var source =     cold(   '-1-2-3----4-#                   ');
    var replayed = source.shareReplay(2).retry(1);
    var subscriber1 = hot('   a|                              ').mergeMapTo(replayed);
    var expected1   =     '----1-2-3----4--1-2-3----4-#       ';
    var subscriber2 = hot('         b|                        ').mergeMapTo(replayed);
    var expected2   =     '---------(23)4--1-2-3----4-#       ';
    var subscriber3 = hot('                              c|   ').mergeMapTo(replayed);
    var expected3   =     '------------------------------(34#)';

    expectObservable(subscriber1).toBe(expected1);
    expectObservable(subscriber2).toBe(expected2);
    expectObservable(subscriber3).toBe(expected3);
  });

I see two courses of action:

(A) Keep it as it was in RxJS 4. PROS: makes it easy to migrate from RxJS 4 to RxJS 5. Makes the distinction between hot and cold more obvious, as cold observables are the only "generator-like" observables, and hot are singleton-like observables that live despite subscriptions. CONS: no shared reconnectable behavior as you want.

(B) connect() recreates the Subject PROS: you can do your fancy shared reconnection. Supposedly more beginner-friendly. CONS: migration headaches with RxJS 4. Shared observables are hot but not as "hot" as Subjects, because they don't have the same post-completion semantics as Subjects. This could make the cold vs hot confusion worse because typically cold observables are the only ones that work like generators. But shared reconnectable hot observables would also work like generators. Good luck teaching that three-fold distinction to beginners.

If (A) happens, I'll be happy, Matt will be happy, will make coding in Cycle.js easier, and I'll know how to teach RxJS 5 to beginners. And you'll have headaches trying to force shared hot observables to reconnect. (No idea why you haven't considered or commented on the switchMap + publish approach I recommended up there, where we use actual cold observables as the "web socket connectable generators")

If (B) happens, the whole cold vs hot story to me would just escalate to unbearable, and I'd consider staying on good old RxJS 4 or something else like most or Kefir or Bacon.js. In the worst case I'd try to use RxJS 5 in Cycle.js but be careful never to complete hot observables, and that'd probably have bad effects on memory.

Your call?

mattpodwysocki commented 8 years ago

@blesh I agree here with @staltz that this is indeed a deal breaker. You cannot radically change the semantics of how we turn the cold observables hot and hot to cold. It will create wildly different behavior especially for apps that are already using it, especially in OneDrive, Delve, and others at Microsoft, and I refuse to let them be broken. I would recommend to them that they continue using RxJS v4 and not migrate to v5 because at least that gives them the behavior that they are expecting and already use.

headinthebox commented 8 years ago

@blesh The right thing to do is be backwards compatible. Really, the only degree of freedom you have with a mature library like RxJs is to increase perf. Everything else is out of scope. Let's not waste our time on this and leave the semantics as is.

staltz commented 8 years ago

@trxcllnt just showed me a solution:

This is what shareReplay can be equivalent to:

var source = Rx.Observable.range(0, 5)
  .multicast(function () {
    return this.replaySubject ||
      (this.replaySubject = new Rx.ReplaySubject(2));
  });

source.connect();

source.subscribe(x => console.log('Next: ' + x));
// => 3
// => 4

It works. @blesh would you still have proper APIs for achieving the behavior you want? At least using multicast directly you can get the recycling.

trxcllnt commented 8 years ago

@headinthebox @mattpodwysocki from what I understand, the new behavior is consistent with RxJava (and C#?). The behavior that multicast doesn't re-create its Subject upon re-connection has been a wart in JS for a while (it's certainly affected me at times).

mattpodwysocki commented 8 years ago

@trxcllnt nope, .NET has the same semantics as the JavaScript version and always has. RxJava is the outlier and what I would deem to be incorrect behavior. Once again, this behavior here is not negotiable as both @headinthebox and I have stated.

@staltz I'm confused, where does the this come from here? Is it internal to the multicast function to have a member called replaySubject? If so, wouldn't that be an issue if someone was using a fat arrow or a custom bound function there?

benlesh commented 8 years ago

The right thing to do is be backwards compatible.

This is a major version increase and a complete rewrite. Backwards compatability was gone with the move from dispose to unsubscribe and onNext to next. Those are MUCH bigger deviations.

This library is being introduced to the masses with Angular 2. Any operator that results in hard to figure out behaviors like observables that can't be retried is plainly broken.

So... even in the face of the authority of @headinthebox and @mattpodwysocki ... I wholeheartedly disagree that the current behavior should be removed... Because I really think it will screw people up and cost immeasurable development hours.

With the way that multicast was written this time, we can now support both behaviors cleanly.

All we need to do is identify the behaviors we want, appropriate names, and create the operators.

Then, we'll just have to do everything in our power to educate people to never use 'oldShare' or whatever we call it, because of it's strange semantics that break retry, repeat and reusability.

benlesh commented 8 years ago

It works. @blesh would you still have proper APIs for achieving the behavior you want? At least using multicast directly you can get the recycling.

Yes... and so do you... this was sort of what @trxcllnt and I were saying above. (besides my objections to changing the behavior back to RxJS4)

What I would like is if we leave share as it currently is, and we talk about how we'd like shareReplay and shareBehavior to work. I think they could change just slightly to allow retrying and replaying of previous values with limited impact. Is there a particular scenario where anyone we know is counting on the behavior that shareReplayed observables aren't reusable? It seems like a real corner case, or at the very least someone with bad habits.

staltz commented 8 years ago

@mattpodwysocki: I'm confused, where does the this come from here? Is it internal to the multicast function to have a member called replaySubject? If so, wouldn't that be an issue if someone was using a fat arrow or a custom bound function there?

We don't need to use this, we can use closure and IIFE:

var source = Rx.Observable.range(0, 5)
  .multicast(function () {
    var replaySubject;
    return function () {
      return replaySubject || (replaySubject = new Rx.ReplaySubject(2));
    };
  }());

source.connect();

source.subscribe(x => console.log('Next: ' + x));
// => 3
// => 4

What I would like is if we leave share as it currently is, and we talk about how we'd like shareReplay and shareBehavior to work.

Yep, that's what this issue was in the first place. What happens after completion of the source is irrelevant to share, but relevant to shareReplay and shareBehavior.

Is there a particular scenario where anyone we know is counting on the behavior that shareReplayed observables aren't reusable?

reusable shareReplayed is a feature incompatible with shareReplay that behaves like ReplaySubject after the source completes like I explained some messages above. For that reason, I am counting on having the latter, which means I am counting on the behavior that shareReplayed observables aren't reusable.

Backwards compatability was gone with the move from dispose to unsubscribe and onNext to next. Those are MUCH bigger deviations.

TBH those are trivial renaming issues. Changing ConnectableObservable semantics is something completely else. It requires teaching, not just a simple migration guide.

Any operator that results in hard to figure out behaviors like observables that can't be retried is plainly broken.

because of it's strange semantics that break retry, repeat and reusability.

Ben, it would really help if you stop using the argument "it's broken". "Broken" means "does not fulfill the desired requirement". So it's just a matter of perspective. E.g. to me shareReplay in RxJS Next is at the moment "plainly broken". Let's stick to the technical arguments and also the "migration" costs for a huge amount of existing RxJS users.

For instance, someone who I know has a library based on RxJS decided to port it to RxJS Next just for the fun of the performance benefits, and they ended up with confusing bugs. I jumped in to help debug what was going on, and it was precisely this shareReplay complication. To them, RxJS 5 would be "plainly broken" and have "strange semantics".

All we need to do is identify the behaviors we want, appropriate names, and create the operators.

Can we just agree to keep shareReplay and shareBehavior semantics exactly like they were in RxJS 4 (meaning no reusability), while allowing multicast to be used to produce reusable observables by default?

benlesh commented 8 years ago

E.g. to me shareReplay in RxJS Next is at the moment "plainly broken".

I agree. I've always agreed with that. However, I refuse to change share back, because I find it equally broken.

Can we just agree to keep shareReplay and shareBehavior semantics exactly like they were

I don't think so. But I don't want to keep them like they are either.

What I propose is that we change them to retain the state of the subject on resubscription but not necessarily to break retrying and repeating:

repeating a shareReplay:

var e1 = cold( '---a--b--c--d--e--f-|');
var expected = '---a--b--c--d--e--f-(ef)--a--b--c--d--e--f-|';

var source = e1.shareReplay(2).repeat(1);
expectObservable(source).toBe(expected);

I think relying a behavior essentially because "It's always been that way" isn't a good idea if what it's doing doesn't work properly with one of the primary semantics of the type (that they can be reused/retried).

If someone really wants to have the same behavior as RxJS 4, they can still do that:

(Something like...)

var e1 = cold( '---a--b--c--d--e--f-|');
var expected = '---a--b--c--d--e--f-(ef|)';

var replaySubject = new ReplaySubject(2);
var source = e1.multicast(() => replaySubject).repeat(1);
expectObservaBle(source).toBe(expected);

I STRONGLY feel that people relying on the current resubscription semantics of shareReplay are a corner-case. It amounts to relying on a quirk in a library that was caused by an oversight.

benlesh commented 8 years ago

Ben, it would really help if you stop using the argument "it's broken". "Broken" means "does not fulfill the desired requirement".

It's my opinion that it doesn't meet the desired requirement. It's my opinion that it's broken. It's your opinion that it was perfect the way it was, I suppose. I'm cool with that. One man's bug is another man's feature.

What I have yet to see is a compelling reason the feature exists the way it does today. That's what I'm waiting for. I have compelling reasons why we shouldn't change RxJS 5's semantics around this, and zero compelling reasons we should. To me "backwards compatibility" between major versions isn't a compelling reason. No one is forcing people to upgrade from RxJS 4 to RxJS 5. In fact, I think Netflix.com is on 2.0.2 still, and it's just fine. RxJS doesn't have any direct browser dependencies that will force people up upgrade over time like frameworks or JQuery.

staltz commented 8 years ago

What I have yet to see is a compelling reason the feature exists the way it does today. That's what I'm waiting for.

Matching semantics of their respective Subjects. I think I said that already. It's a compelling reason because multicast is essentially "run the cold observable on that subject", and you expect it to behave like that subject. After a multicast, the result should behave just like a Subject. That's the whole point of multicast.

var e1 = cold( '---a--b--c--d--e--f-|');
var expected = '---a--b--c--d--e--f-(ef)--a--b--c--d--e--f-|';

var source = e1.shareReplay(2).repeat(1);
expectObservable(source).toBe(expected);

Repeat is essentially "resubscribe when it completes", so actually if e1 was multicasted to a ReplaySubject(2), I'd expect it to behave precisely like a ReplaySubject(2), which means if you resubscribe to a ReplaySubject(2), it does not restart from the beginning of time.

It's my opinion that it doesn't meet the desired requirement. It's my opinion that it's broken. It's your opinion that it was perfect the way it was, I suppose. I'm cool with that.

What's not cool is an RxJS community fork. We are very close to creating parallel versions of RxJS, and I suppose you don't want RxJS 5 to become like Python 3 compared to Python 2.x.

benlesh commented 8 years ago

Matching semantics of their respective Subjects.

These Observables are not their Subjects. I don't see this as a compelling reason.

What I want to see is a use case where this semantic is important enough to forsake every hapless developer that runs into issues because of it.

I'm not at all okay with the old behavior. It's weird, there has been no reason for it presented other than "that's how it is".

And once again, on the "backwards compatibility" argument: RxJS is not a library that requires major version upgrades. There are no browser dependencies to push people into using the new major version. You could literally use 2.0.2 forever, and face almost no consequences as browsers evolved. If a few people like and depend on the behavior of multicast from RxJS 4 they have a LOT of options:

  1. Use the current multicast to get the same behavior (demonstrated above a few times)
  2. override multicast themselves to be the old version.
  3. Don't use RxJS 5, and use RxJS 4 instead, afterall, flatMap isn't going to change while we're alive.
  4. Use the singleUse operator proposed here: #644

But I really think reliance on this behavior is going to be a corner-case at best.

staltz commented 8 years ago

Ok, so it's going to be a community fork.