JakeWharton / RxReplayingShare

An RxJava transformer which combines replay(1), publish(), and refCount() operators.
Apache License 2.0
628 stars 28 forks source link

Subscription happening later than a previous stream termination #26

Open kaushikgopal opened 6 years ago

kaushikgopal commented 6 years ago

We’re using replayingShare() for our location provider service with the idea of caching the last location (beyond release of costly GPS radio resources etc.)

class LocationService {
    private val locationSource: Flowable<Location> by lazy {
        Observable.compose(MyFancyRxLocationLibraryConfigurator())
            ...
            .doOnSubscribe { Timber.log 1 }
            .replayingShare()
    }

    // public api
    fun locations(): Flowable<Location> = locationSource
}

In my use case, at another point of location interest, i’m trying to see if at least 1 location data point was available. If available, i’d like to end the current stream and proceed with some business logic. So i do something like this:

locationService
        .locations()
        .take(1)
        .doOnSubscribe { 
            Timber.log 2  // this is called
        }
        .subscribe(
            { 
                onNext() // do some BL
            } ,
            { onError() },
            { 
                onComplete()
                Timber.log 3 // onComplete triggerred courtesy .take
            }

        )

If we’ve never spun up the GPS radio and acquired a location, then all works well

 // locationService onSubscribe Timber.log 2 seen
 // locationSource onSubscribe Timber.log 1 seen
// locationService onComplete Timber.log 3 recieved

I noticed though that I immediately perform the atleast 1 location check again (the take(1) use case mentioned above), instead I notice the following:

// locationService onSubscribe Timber.log 2 seen
// locationService onComplete Timber.log 3 recieved
// locationSource onSubscribe Timber.log 1 seen

🤔

The reason this gets tricky is because the GPS radio is now constantly being pinged since it happens to see the subscription a tad bit later.

Clearing the upstream (on any termination event by using a customized .replayingShare does the trick and makes my use case “work” as suggested in this other issue), but i’m trying to understand what’s the deal here with the onSubscribe call being triggered later than the onComplete when this check is done rapidly?

JW surmised a race condition. So I tried removing all the subscribeOn + observeOn in this chain and I'm still noticing the behavior.