Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
13.05k stars 1.85k forks source link

merge() and combine() detaches emission from collection #3765

Closed odedniv closed 1 year ago

odedniv commented 1 year ago

Describe the bug

Flows expect that emit() will block until the value has finished collecting, unless explicit buffering is used (e.g. by call to conflate() or buffer()). Manipulating flows using merge() and combine() break that promise, acting as buffer(UNLIMITED) and weird conflate() respectively.

combine is documented to conflate the flows until all flows emit at least once, but: 1. It breaks the above promise even after all flows emitted, and 2. There's no reason to - if the caller wants to conflate the inputs they can (with conflate()), and if they don't it makes sense that the flows will be blocked by default until all flows have emitted (and thus collection of their emission can begin).

merge has no excuse - all emissions are supposed to be collected, so there is no reason to break the above promise.

I think the issue is the unsynchronized usage of Channel to communicate the input to the output, in both implementations. A possible simple solution would be to hold a Mutex on collecting from the input until after emitting to the output.

This is similar to issue #2603 (which has a potential solution similar to the one described in the previous paragraph), but it's less acceptable because these are manipulator functions rather than implementations of Flow (which can state whatever they want in the emission docs, like "SharedFlow doesn't wait" and "StateFlow conflates").

Provide a Reproducer

Regular flow:

val f = flow {
    emit(Unit)
    emit(Unit)
    println("emit done")
}
f.collect {
    delay(100)
    println("collect done")
}
// Prints: "collect done" twice, then "emit done"

Merge:

val f = flow {
    emit(Unit)
    emit(Unit)
    println("emit done")
}
merge(f).collect {
    delay(100)
    println("collect done")
}
// Prints: "emit done", then "collect done" twice

Combine:

val f = flow {
    emit(Unit)
    emit(Unit)
    println("emit done")
}
combine(f) { it }.collect {
    delay(100)
    println("collect done")
}
// Prints: "emit done", then "collect done" twice

Adding more emissions to the merge example show that they all finish before the first collection is done, while doing the same for the combine example create results that I can't explain.

qwwdfsad commented 1 year ago

It is not a bug, but rather an intrinsic property of any concurrent operator. What we indeed can do better is to document it thoroughly.

Flows expect that emit() will block until the value has finished collecting,

Could you please elaborate on where this expectation comes from? There is no such invariant by design, all we state is that all emissions are sequential, and this invariant isn't broken.

odedniv commented 1 year ago

This is documented in buffer.

Collection and emission should not be concurrent though, right? They should also be sequential as they are supposed to run on the same CoroutineContext?

Why does it work with flow {} and not with merge? What's the reason to not make them sequential?

odedniv commented 1 year ago

I don't mind that the different flows are collected concurrently inside merge, but there's no reason for them to be concurrent with the post-merge collection.

odedniv commented 1 year ago

Come to think of it, I find this CoroutineContext transition also unexpected:

val f = flow {
    println("f1: ${currentCoroutineContext()}")
    emit(Unit)
}.onEach {
    println("f2: ${currentCoroutineContext()}")
}
merge(f).onEach {
    println("f3: ${currentCoroutineContext()}")
}.collect {
    println("cl: ${currentCoroutineContext()}")
}

// Prints:
f1: [CoroutineId(3), "coroutine#3":StandaloneCoroutine{Active}@556e8754, Dispatchers.Default]
f2: [CoroutineId(3), "coroutine#3":StandaloneCoroutine{Active}@556e8754, Dispatchers.Default]
f3: [CoroutineId(1), "coroutine#1":ScopeCoroutine{Active}@3b5d09e5, Dispatchers.Default]
cl: [CoroutineId(1), "coroutine#1":ScopeCoroutine{Active}@3b5d09e5, Dispatchers.Default]
odedniv commented 1 year ago

I think I understand that the CoroutineContext can not physically be the same, as the collections must be launched.

That being said, I created a generic solution for preventing unwanted buffering methods from... buffering.

Note that if combine() and merge() use this implementation, the caller can still decide to buffer before passing the flows to the combine()/merge(), which would have the exact same outcome as today. Given that, I suggest labeling this issue with a "design" tag.

// I guess this is a separate suggestion :-)
fun <T> Flow<T>.afterEach(block: suspend (T) -> Unit): Flow<T> = flow {
    collect {
        emit(it)
        block(it)
    }
}

/**
 * Returns the transformation of the receiver [Flow], blocking its emission on collection of the returned flow,
 * practically ignoring any buffering made inside the transformation.
 */
fun <T, R> Flow<T>.waitingFlow(
    bufferingTransform: Flow<T>.() -> Flow<R>
): Flow<R> = flow {
    val mutex = Mutex(true)
    emitAll(
        afterEach { mutex.lock() }
            .bufferingTransform()
            .afterEach { mutex.unlock() }
    )
}

// Signature for N flows, so it can be used with combine() or merge():
fun <T1, T2, R> waitingFlow(
    flow: Flow<T1>,
    flow2: Flow<T2>,
    // ..., or vararg, etc.
    bufferingTransform: (Flow<T1>, Flow<T2>) -> Flow<R>
): Flow<R> = flow {
    val mutex = Array(2) { Mutex(true) }
    emitAll(
        bufferingTransform(
            flow.afterEach { mutex[0].lock() },
            flow2.afterEach { mutex[1].lock() },
        ).afterEach {
            mutex.forEach { it.unlock() }
        }
    )
}

// Basic (useless) example:
val f = (1..3).asFlow().onEach { println("f$it") }
f.waitingFlow { map { it * 2 }.conflate() }.collect { println("c$it"); delay(100) }
// Prints: f1, c2, f2, c4, f3, c6 (ignoring conflate()).

// Negating combine buffering example:
val fa = (1..3).asFlow().onEach { println("fa$it") }
val fb = (1..3).asFlow().onEach { println("fb$it"); delay(10) }
waitingFlow(fa, fb) { wfa, wfb ->
    combine(wfa, wfb) { va, vb -> va + vb }
}.collect { println("c$it"); delay(100) }
// Prints: fa1, fb1, c2, fa2, fb2, c4, fa3, fb3, c6 (faN and fbN might be reversed)
odedniv commented 1 year ago

The above solution won't work as is for merge() because we need to unlock only the one that has emitted this value - unlock() will either fail, pre-unlock, or wrongly unlock a flow that's still pending collection. Need to somehow side-channel which mutex has been locked by this emission. Perhaps utilizing the lock's owner argument?

I don't have a solution on hand, but I'm sure it's solvable.

elizarov commented 1 year ago

Flow is conceptually an asynchronous primitive. The thing you are trying to achieve would be much better to do using a different [synchronous] primitive that will not have any kind of asynchronous operations like buffer and others.

odedniv commented 1 year ago

To be clear, the solution should not make combine() the same as zip() - removing the delay() from collect() should allow each new emission from either flow to be collected immediately, such that the example above could also be:

val fa = (1..3).asFlow().onEach { println("fa$it") } // No delay
val fb = (1..3).asFlow().onEach { println("fb$it"); delay(10) }
waitingFlow(fa, fb) { wfa, wfb ->
    combine(wfa, wfb) { va, vb -> va + vb }
}.collect { println("c$it") } // No delay
// Prints: fa1, fb1, c2, fa2, c3, fa3, c4, fb2, c5, cf3, c6

Unfortunately the issue mentioned above re merge() also applies when this happens, so it needs to be solved.

Note that adding conflate() before passing to the suggested combine() would be slightly different then today's combine() because any emission that reaches the combine() must be used, unlike today. Which would be equivalent to this:

val f = (1..3).asFlow()
f.conflate().collect { println(it); delay(100) }
// Prints: 1, 3
// And not: 3 (like combine would today)

Given this equivalency (which is more deterministic), I still suggest that this would be the default combine(), and have combineConflate() that does what we have today.

odedniv commented 1 year ago

I don't think being "conceptually an asynchronous primitive" is conflicting with this issue. It is still asynchronous, still suspend functions that do not block the thread.

Nobody expects map() to conflate even if it has a delay() in it. The APIs set the expectation that regardless of flows being asynchronous - all emissions will be collected unless cancelled or filtering explicitly (which conflate() is doing).

qwwdfsad commented 1 year ago

This is documented in buffer.

Could you please quote the part that gives that impression? I intended to change that, but for me, it explicitly states that it introduces a concurrency and splits sequential operators chain into multiple coroutines.

The APIs set the expectation that regardless of flows being asynchronous

Here is where we disagree, and that's what we are highlighting in the documentation, both in buffer and Flow itself. Some operators are by their nature concurrent; while the emissions (the sequencing) of elements are sequential, the overall state of the system is not. For strictly sequential behaviour, especially when it incurs a non-trivial overhead and/or behaviour, a more tailored primitive and/or wrapper like waitingFlow can indeed be used.

Maybe something like #3274 could've been helpful as well, I suggest checking it out

odedniv commented 1 year ago

I'd appreciate reopening this for the sake of discussion, as I think there's more to have. Feel free to ignore the request, but either way this is my response to your comment:

Could you please quote the part that gives that impression?

Documentation that states flows are sequential:

  1. https://kotlinlang.org/docs/flow.html#flows-are-sequential, as well as all the printing examples in that doc that show how emit waits for collection: (cherry-picked on purpose, see below)

    Each individual collection of a flow is performed sequentially ... The collection works directly in the coroutine that calls a terminal operator. No new coroutines are launched by default.

  2. In buffer's documentation:

    Normally, flows are sequential. It means that the code of all operators is executed in the same coroutine.

Documentation that states flows are not sequential:

  1. In buffer's documentation:

    The buffer operator creates a separate coroutine during execution for the flow it applies to. It will use two coroutines for execution of the code. ...

  2. From https://kotlinlang.org/docs/flow.html#flows-are-sequential:

    ... is performed sequentially unless special operators that operate on multiple flows are used.

The points that I'm trying to make are:

  1. Using buffer is the developer explicitly asking to remove the sequential property of the flow such that it's clearly obvious, with no need to look through pages of documentation

  2. Using merge and combine is not an explicit request to remove the sequential property, but rather a side effect of the implementation detail which isn't obvious, requiring the developer to read the docs thoroughly (likely after they hit the issue already).

  3. I understand the reason for this side effect - you can't (currently) block the same coroutine on collecting from two flows.

    But... if we can support making it sequential (using a similar method to C's select API, or using a Mutex), the developer story would be more obvious (no need to read documentation).