Reactive-Extensions / RxJS

The Reactive Extensions for JavaScript
http://reactivex.io
Other
19.49k stars 2.1k forks source link

Combinelatest cannot work on shared single observable #1507

Closed TylerYang closed 7 years ago

TylerYang commented 7 years ago

Hi guys, it looks like combineLatest doesn't work as expected on a shared observable collection. Consider the code below,

const data1$ = Rx.Observable.of('test').map(data => 'value1$').share()

const data2$ = Rx.Observable.interval(1000).map(data => 'value2$').share()

// Comment these 3 lines then combinelatest could work.
data1$.subscribe(Rx.Observer.create(function (data){ 
    console.log('data1$ subscribe:' + data)
}))

Rx.Observable.combineLatest(data1$, data2$).subscribe(Rx.Observer.create(function(data) {
    console.log('combineLatest results: ' + data)
}))

In this case combineLastest won't output any result. I'm not sure if I used it correctly, but only if It could work only if I commented out data1$.subscribe....

I created this jsfiddle link for testing. http://jsfiddle.net/zba86st0/238/

Any comments would be appreciated. Thx : )

paulpdaniels commented 7 years ago

It isn't so much a problem with any of the operators as it is with the source. data1$ is a an of operator which is cold by default but by using share you are turning it into a hot Observable. The first subscription causes the single value from data1$ to emit so that by the time that you get to the second subscription it has already emitted. combineLatest will only emit once it has data from both sources so it will never emit. You can see this work if you change of to timer(1000) for instance, because data1$ will then emit well after both subscriptions have been made.

whiteinge commented 7 years ago

We deal with a lot of hot observables at my work and see this regularly, like Paul described. When you're using multiple subscriptions and if you can't easily ensure that all subscriptions are in place before the needed values are emitted you can use shareReplay(1) instead which will cache the last value to come through and replay it to any late subscribers like the combineLatest above.

TylerYang commented 7 years ago

hi @paulpdaniels @whiteinge Thx so much for the detailed explanation!

Now I got the idea of hot and code observable! And shareReplay could satisfy my need perfectly!