Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
12.94k stars 1.84k forks source link

SharedFlow, buffer and produceIn fusion bug. #2817

Open Dominaezzz opened 3 years ago

Dominaezzz commented 3 years ago
val stream = MutableSharedFlow<Unit>()
stream
//  .onEach {} // workaround
    .buffer(0)
    .produceIn(GlobalScope)

yield() // To make sure subscription has started.

var i = 0
while (true) {
    stream.emit(Unit)
    println("Emitted $i")
    i++
}

Without the workaround, there are 64 emissions before suspension happens. There should be at most 1 emissions.

The bug is either in SharedFlow fusion or in this function that produceIn uses.

linean commented 2 years ago

I have similar issue when mixing channelFlow, buffer and shareIn. When I remove shareIn or use flow builder I can see only one emission. Otherwise send does not suspend.

val flow = channelFlow { // if I use flow{} builder buffer works as expected
    var i = 0
    repeat(100) {
        send("$i")
        println("send: ${++i}")
    }
}

val job = flow
    .buffer(capacity = 0)
    .shareIn(GlobalScope, SharingStarted.WhileSubscribed(), 0) // if I remove shareIn buffer works as expected
    .onEach { delay(1000) }
    .take(1)
    .launchIn(GlobalScope)

runBlocking {
    job.join()
}
linean commented 2 years ago

I did more testing and maybe I just don't understand how buffer + shareIn + WhileSubscribed should behave :thinking: Docs says:

buffer(0).shareIn(scope, started, 0) - ... Effectively, it configures sequential processing between the upstream emitter and subscribers ...

but based on tests processing is not sequential

@DelicateCoroutinesApi
@ExperimentalCoroutinesApi
class FlowBufferTest {

    @Test
    fun `channelFlow with no buffer and shareIn suspends until collector is ready`() {
        var sendCount = 0

        channelFlow {
            while (true) {
                send(Unit)
                sendCount++
            }
        }.buffer(capacity = 0)
            .shareIn(GlobalScope, SharingStarted.WhileSubscribed(), 0)
            .takeOne()

        assertEquals(1, sendCount)
    }

    @Test
    fun `channelFlow with no buffer suspends until collector is ready`() {
        var sendCount = 0

        channelFlow {
            while (true) {
                send(Unit)
                sendCount++
            }
        }.buffer(capacity = 0)
            .takeOne()

        assertEquals(1, sendCount)
    }

    @Test
    fun `flow with no buffer suspends until collector is ready`() {
        var sendCount = 0

        flow {
            while (true) {
                emit(Unit)
                sendCount++
            }
        }.buffer(capacity = 0)
            .takeOne()

        assertEquals(1, sendCount)
    }

    @Test
    fun `flow with no buffer and shareIn suspends until collector is ready`() {
        var sendCount = 0

        flow {
            while (true) {
                emit(Unit)
                sendCount++
            }
        }.buffer(capacity = 0)
            .shareIn(GlobalScope, SharingStarted.WhileSubscribed(), 0)
            .takeOne()

        assertEquals(1, sendCount)
    }

    private fun Flow<*>.takeOne() = runBlocking {
        onEach { delay(500) }
            .take(1)
            .collect()
    }
}
ajmalk commented 1 year ago

I just hit this bug too. These phantom buffers are huge annoyance and I really wish the default behavior was no buffer. Bugs in the implementation like this makes it even worse because we can't even trust that there won't be a buffer even if we explicitly set the buffer to zero.