JakeWharton / RxReplayingShare

An RxJava transformer which combines replay(1), publish(), and refCount() operators.
Apache License 2.0
628 stars 28 forks source link

Possible missed emission when racing with subscribers #40

Open damianw opened 3 years ago

damianw commented 3 years ago

There appears to be a race condition associated with RxReplayingShare which isn't present when using .replay(1).refCount(). I can't seem to reliably reproduce the issue when running a JVM unit test on my laptop, but can do so fairly reliably when running on an Android emulator.

I have this test function run when the app starts:

private fun runTest() {
    val observable = Observable.just(Unit)
        .delay(30, TimeUnit.MILLISECONDS)
        .replay(1)
        .refCount()
    val events = Collections.synchronizedList(mutableListOf<String>())
    repeat(1000) { i ->
        events += "Iteration: $i"
        observable
            .doOnSubscribe { events += "Subscribe: $i" }
            .doOnError { events += "Error: $i $it" }
            .doOnDispose { events += "Dispose: $i" }
            .doOnComplete { events += "Complete: $i" }
            .doFinally { events += "Finally: $i" }
            .subscribe { events += "Next: $i" }
    }
    Thread.sleep(10_000L)
    File(filesDir, "logs.txt").printWriter().use { writer ->
        for (event in events) {
            writer.println(event)
        }
    }
}

The logs.txt file is used in place of println/Log, since logcat seemed to be missing some of the output, and a file was more reliable.

When I run this, I get 5000 log entries, which is expected: One for each Iteration, Subscribe, Next, Complete, and Finally (order varies).

However, if I change .replay(1).refCount() to .replayingShare(), I get significantly fewer Next emissions - sometimes missing up to 100 emissions. My hypothesis is that this is happening because the following sequence of events occurs every once in a while:

  1. Subscriber A subscribes to the replayingShare, no cached value is available, so one is not emitted
  2. The replayingShare makes the first subscription to the share, and it opens the connection
  3. Subscriber B subscribes to the replayingShare, no cached value is available, so one is not emitted
  4. The upstream observable (Observable.just(Unit).delay(30, TimeUnit.MILLISECONDS)) makes the first emission: this is cached to the LastSeen, but too late for subscriber B to see it
  5. The replayingShare makes the second subscription to the share - the upstream for which has no values left to emit
  6. The stream terminates having only emitted the value to subscriber B.

If my assumption is correct, this is similar to the race condition that occurs when using share by itself.

Given that this operator presents itself as comparable to .replay(1).refCount() with the added feature of retaining the latest emission while dormant, I don't think that this behavior should be expected.

As quick workaround, I've tried replacing the .share() in RxReplayingShare's implementation with .replay(1).refCount(). This works for my use cases, but does end up producing many duplicate emissions (also about ~100 in this test with my hardware on the emulator I'm using).