Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
12.94k stars 1.84k forks source link

Unable to get intended replay configuration with completing shared flows #2890

Open zakhenry opened 3 years ago

zakhenry commented 3 years ago

I have a requirement to have a flow that is converted to a SharedFlow as multiple downstream subscribers consume the result, but the originating flow is particularly expensive so multiple collection is unacceptable.

Using shareIn solves this issue, however a requirement is that the downstream subscribers must also know of the completion status of the original flow. I've implemented a materialize & demateralize function as mentioned in both #2751 & https://github.com/Kotlin/kotlinx.coroutines/issues/2034#issue-620040531

As a side-effect of this error-handling design decision, the SharedFlow never completes. A call collect { ... } on a SharedFlow must be canceled to terminate it. However, if completion is needed, then it can always be materialized by a special emitted value. A collector can apply takeWhile operator to complete the resulting flow when this special value is encountered.

I've wrapped this up in a generalised function shareUntilComplete with the same signature as shareIn, there remains however a significant problem with implementing the replay functionality - the gist of the function body is as follows:

return materialize()
        .shareIn(coroutineScope, started, replay)
        .dematerialize()

The issue is that the completion symbol will count towards the replay allocation, and I can't simply do replay + 1, because a subscriber that joins before completion will end up in one more than the intended replay count from buffer, and if you just do replay directly, then subscribers before completion will get the expected result, but late subscribers would only get an empty completing flow (because the only thing in cache was the completion notification).

Is there a simple solution I'm overlooking here?

zakhenry commented 3 years ago

Here's a minimal repro playground demonstrating the issue https://pl.kotl.in/tC25RSijl

You can see that without the replay + 1, the late subscriber will only be receiving the completion notification from cache, not the actual value that they would have been interested in, and with the replay + 1, then the "during" subscriber will immediately receive two values on subscription, when they were only interested in one (and the buffer is oversized by 1 too).

qwwdfsad commented 3 years ago

Thanks for a detailed write-up with a self-contained example. This is definitely a design issue that we have to solve when working on #2092.

Alas, I do not know a simple solution out-of-the-box right now.

zakhenry commented 3 years ago

I have since come up with an albeit horrible workaround that I believe fulfils the requirements that I am after with a sentinel null value and some buffer trickery. I'm unconvinced this is the cleanest solution, but it passes my tests!

https://pl.kotl.in/ARnUWIxZI

the salient bit:

    /**
     * this awkward chain handles the materialized completion in a special way so that the _replay_ count is correctly
     * preserved. Without it, and a `replay = 1` for example, the completion event would be replayed, but not the
     * last Notification.Result. To side-step this, a sentinel null is emitted _before_ each Notification.Result that
     * would be buffered. When the error or completion result comes through, the oldest sentinel value will be bumped
     * off the buffer, crucially preserving the actual result that wants to be replayed in the buffer, _and_ the new
     * terminating notification. This sentinel value is just stripped out before dematerializing.
     * The downside is that the actual buffer will have every second value as a pointless null, but there is no simpler
     * workaround found yet.
     * @see https://github.com/Kotlin/kotlinx.coroutines/issues/2890
     */
    // set the buffer replay to double the requested replay size to fit the sentinel nulls, prevent overflow.
    val bufferReplayIncludingSentinel = if (replay == 0) 1 else (replay.coerceAtMost(Int.MAX_VALUE/2) * 2)
    return materialize()
        .transform {

            // note the different ordering of emission of the sentinel null for the special replay = 0 case - this is
            // due to the buffer size being 1 for that case, and the sentinel value must be emitted before the
            // notification value in order to correctly empty the buffer of useful values
            if (replay == 0) {
                emit(it)
            }
            if (it is Notification.Result) {
                emit(null)
            }
            if (replay != 0){
                emit(it)
            }
        }
        .shareIn(coroutineScope, started, bufferReplayIncludingSentinel)
        .filterNotNull()
        .dematerialize()
rossdanderson commented 2 years ago

Thank you for the workaround, I have tried it myself and unfortunately there is still one blocker for us - when setting a stopTimeoutMillis value in sharingStarted, a later subscriber within this time period does not trigger a resubscription to the upstream and simply gets the last replay values and the previous completion/error.
Comparing with Rx, replay(1).refCount(1000, MILLISECONDS) would immediately resets the refCount cooldown and clears the replay buffer when the upstream triggers an error or complete, and I cannot find a Flow equivalent.

Our usecase is: We want to share expensive streams typically with a replay of 1 We tend to use stopTimeoutMillis to keep the stream around for a little while as, if requested, they are likely to be requested again in the near future and it saves us the set up cost. These streams may complete - due to some upstream entitlement change for example - or error - due to some unforseen issue or network conditions. Often we would like to communicate the problem to the user and allow them to initiate the retry rather than retrying with a fixed delay or the like. We would rather avoid passing around wrapper objects to replace the existing complete/error semantics - from a memory overhead perspective and code cleanliness one - performing combine across many materialized streams quickly becomes very messy.

An example of what would be desired, using Rx, is

@Test
    fun sharingFlowable(): Unit = runBlocking {
        lateinit var publisher: PublishProcessor<String>

        val refCountedFlowable = Flowable.defer {
            publisher = PublishProcessor.create()
            publisher
        }
            .replay(2)
            .refCount(1000, MILLISECONDS)
            .asFlow()

        refCountedFlowable.test {
            publisher.onNext("A")
            awaitItem() shouldBeEqualTo "A"

            publisher.onNext("B")
            awaitItem() shouldBeEqualTo "B"

            publisher.onNext("C")
            awaitItem() shouldBeEqualTo "C"
        }

        // The replay buffer and upstream subscription still exists as no upstream complete was fired and we are
        // within the refCount cooldown

        refCountedFlowable.test {
            awaitItem() shouldBeEqualTo "B"
            awaitItem() shouldBeEqualTo "C"

            publisher.onNext("D")
            awaitItem() shouldBeEqualTo "D"
        }

        delay(1001)

        // The replay buffer was reset and the upstream subscription cancelled as we had no subscribers for longer than
        // the refCount cooldown

        refCountedFlowable.test {
            expectNoEvents()

            publisher.onNext("D")
            awaitItem() shouldBeEqualTo "D"

            publisher.onComplete()
            awaitComplete()
        }

        // The upstream complete immediately reset the replay buffer and terminated the upstream subscription
        // despite the refCount cooldown, and we've now initiated a new upstream subscription

        refCountedFlowable.test {
            expectNoEvents()

            publisher.onNext("X")
            awaitItem() shouldBeEqualTo "X"

            publisher.onComplete()
            awaitComplete()
        }
    }

Unfortunately I've tried implementing this myself by materializing and using existing operators to recombine, but haven't managed to get anything working yet.

rossdanderson commented 2 years ago

I did eventually manage to write a version that appears to pass all of our requirements

fun <T : Any?> Flow<T>.shareInCompleting(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int,
): Flow<T> {
    val reference = AtomicReference<SharedFlow<Action<T>>>()

    return flow {
        fun compute(): SharedFlow<Action<T>> {
            while (true) {
                val current = reference.get()
                if (current != null) return current

                val sharedFlow =  materialize()
                    .transformWhile {
                        emit(it)
                        it is Update
                    }
                    .onCompletion { reference.set(null) }
                    .shareIn(scope, started, replay)

                if (reference.compareAndSet(null, sharedFlow)) return sharedFlow
            }
        }

        val flow = compute()

        emitAll(flow.dematerialize())
    }
}