Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
12.94k stars 1.84k forks source link

`stateIn` with `SharingStarted.Eagerly` doesn't really immediately start collecting #4006

Open dkhalanskyjb opened 8 months ago

dkhalanskyjb commented 8 months ago

An example was submitted to us (slightly simplified):

runBlocking {
    val sharedFlow = MutableSharedFlow<Int>()
    val stateFlow = sharedFlow.stateIn(this, SharingStarted.Eagerly, 0)

    check(stateFlow.value == 0)
    sharedFlow.emit(1)
    delay(1.seconds)
    check(stateFlow.value == 1) // fails
}

The reason is that the subscription of stateIn only happens when the thread becomes available, which is only at delay in this example. Despite Eagerly promising that the subscription will happen immediately, the emit call is lost.

Adding a yield after stateIn fixes the issue, as stateIn gets a chance to finish its initialization.

This behavior is actually intentional (https://github.com/Kotlin/kotlinx.coroutines/blob/1a0287ca3fb5d6c59594d62131e878da4929c5f8/kotlinx-coroutines-core/common/src/flow/operators/Share.kt#L206-L208): it's said that it's for the subscribers to have a chance to subscribe before the sharing starts. For example,

runBlocking {
    val myFlow = flow {
        emit(1); emit(2)
    }
    val stateFlow = myFlow.stateIn(this, SharingStarted.Eagerly, 0)
    launch(start = CoroutineStart.UNDISPATCHED) {
      stateFlow.collect {
        println(it) // guaranteed to observe the initial value 0
      }
    }
}

The code that ensures delivery of the initial element is tricky to write, as it requires knowingly starving the dispatcher of its threads that could perform the initialization code of stateIn in parallel. Also, the use cases are unclear, and it doesn't seem like this guarantee is even specified anywhere.

We should either document that Eagerly can sometimes fail to even run the initialization code of collect (for example, to subscribe to a shared flow) or change this behavior.

igor-korotenko commented 4 months ago

agree, spent some time to realise this eventually, since was expecting that it might be used for pre-caching value, would be good to have some documentation on this at least :) Or to actually start it in case of StateFlow, since this way it could be used for pre-caching value for consumers, which appear later on