JakeWharton / RxReplayingShare

An RxJava transformer which combines replay(1), publish(), and refCount() operators.
Apache License 2.0
626 stars 28 forks source link

Possible issue with certain Observables #25

Closed RobLewis closed 6 years ago

RobLewis commented 6 years ago

An issue appeared with the Bluetooth package RxAndroidBle that might be of interest. The authors have deprecated their own ConnectionSharingAdapter for sharing BLE connections, and now recommend the use of RxReplayingShare. This is my understanding:

The package has a method Observable<RxBleConnection> establishConnection( ... ). This Observable is somewhat unusual in that it emits a single item (the connection), and never completes. Unsubscribing from it terminates the connection, and if the connection is lost due to a Bluetooth fault, the subscription is canceled with an onError() exception.

Apparently the issue is that RxReplayingShare will continue to emit the "stale" cached connection to subscribers even after the original connection is broken. According to the OP, the problem is that ReplayingShare does not react to finalizations. He describes a fix that might be worth considering. Comments appreciated.

JakeWharton commented 6 years ago

Yep. Good find. Seems like we need to clear the cached value upon error. I'm not sure about whether or not to do it for a complete event just yet. Need to think on that more...

streetsofboston commented 6 years ago

I found this issue when using it with RxAndroidBle, where the RxBleConnection becomes invalid after the Observable that emits it is finalized. I agree with you @JakeWharton that always clearing the cached value of the ReplayingShare on finalization may not be a good idea. Doing this fixes the RxBleConnection issue, but may not be a good general solution.

I created a new class ReplayingUnfinalizedShare that does clear the cached value on finalization of the shared stream and it fixes the RxBleConnection issues we found (still, there are race-conditions that I would need to address).

For a solution we could have variations of the ReplayingShare, like the ReplayingUnfinalizedShare, that clear the cached value each in a unique way. Let me know if it is worth doing this and I could open an issue/PR on your repo.

JakeWharton commented 6 years ago

I would prefer not to provide variants or configuration options unless there is no other path forward.

streetsofboston commented 6 years ago

For our use case, we had to be able to set that cached value to null. Not sure how we could have worked around this, apart from not using ReplayingShare.

RobLewis commented 6 years ago

Any further thoughts/discussion on this?

JakeWharton commented 6 years ago

I haven't had the bandwidth to think about it. Now that I/O is over, I'll put it back on the queue.

On Fri, May 11, 2018, 10:10 AM RobLewis notifications@github.com wrote:

Any further thoughts/discussion on this?

— You are receiving this because you were mentioned.

Reply to this email directly, view it on GitHub https://github.com/JakeWharton/RxReplayingShare/issues/25#issuecomment-388426301, or mute the thread https://github.com/notifications/unsubscribe-auth/AAEEEVHL1XUvUmcN3BVYxo67q2m5aHlOks5txcYWgaJpZM4TLnSO .

PaulWoitaschek commented 6 years ago

'm not sure about whether or not to do it for a complete event just yet. Need to think on that more...

Yes I would do that.

I for example use ReplayingShare to share a connection to my database. When the last subscriber disposes the connection, the value of the LastSeen does not get updated. When I now put another value into my database, there are no subscribers and the Flowable does not emit a new value. When I now re-subscribe to the chain that has the ReplayingShare applied, I receive the the old event.

For my use cases the fact that it delivers the stale values is very undesired.

PaulWoitaschek commented 6 years ago

This is the pitfall I fell into:

  @Test
  fun doesNotHoldOldValueAfterDisposed() {
    val subject = BehaviorSubject.create<String>()
    val observable = subject.compose(ReplayingShare.instance())

    val observer = observable.test()
    subject.onNext("Foo")
    observer.dispose()

    subject.onNext("Bar")

    observable.test()
        .assertNever("Foo")
  }
JakeWharton commented 6 years ago

If you don't want the last value then you should be using vanilla replay(1).refCount().

On Thu, May 31, 2018, 3:14 AM Paul Woitaschek notifications@github.com wrote:

This is the pitfall I fell into:

@Test fun doesNotHoldOldValueAfterDisposed() { val subject = BehaviorSubject.create() val observable = subject.compose(ReplayingShare.instance())

val observer = observable.test()
subject.onNext("Foo")
observer.dispose()

subject.onNext("Bar")

observable.test()
    .assertNever("Foo")

}

— You are receiving this because you were mentioned.

Reply to this email directly, view it on GitHub https://github.com/JakeWharton/RxReplayingShare/issues/25#issuecomment-393432454, or mute the thread https://github.com/notifications/unsubscribe-auth/AAEEEVrnWXjlKmbG3AULmG_sw728zRkvks5t35hTgaJpZM4TLnSO .

RobLewis commented 6 years ago

I proposed the use of .replay(1).refCount() to the RxAndroidBle folks but that won't work. Their explanation: if you have:

Observable<RxBleConnection> sharedConnectionObservable = device.establishConnection( false )
    .replay(1).refCount(); 

and do:

Disposable disp1 = sharedConnectionObservable.subscribe( ... ); 
// wait until connection is established
Disposable disp2 = sharedConnectionObservable.subscribe( ... );

Then the second .subscribe() will never get the emitted RxBleConnection as it happened before the second subscription to .refCount().

.refCount() makes only a single subscription to the upstream therefore .replay() will get subscribed only once