Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
12.94k stars 1.84k forks source link

Flow.catch skips upstream exceptions #4159

Open alex-dorokhov opened 2 months ago

alex-dorokhov commented 2 months ago

It looks like under some race conditions using the Flow.catch operator skips upstream exceptions. For example:

try {
  flow {
     // some work...
     throw RuntimeException()
  }.catch {
      // this may be not called and the RuntimeException is passed downstream
  }.collect()
} catch (e: RuntimeException) {
  // exception is caught here instead
}

We are using pretty old 1.6.1, but since then the catch operator implementation was not changed. From the source code of the catch operator it looks like the behaviour is possible when both downstream and upstream exceptions are caught, which can happen in race conditions.

We believe it's totally unclear from the documentation and especially from the semantics of 'catch' operator, that any upstream exception could skip the operator. For now we had to switch channelFlow builder (instead of flow) and to use try-catch.

Could it be that the code here should be replaced with "return e"?

qwwdfsad commented 2 months ago

Could you provide any reproducer for the scenario where an exception is thrown upstream but is completely ignored?

qwwdfsad commented 2 months ago
try {
    flow<Int> {
        emit(1)
        yield()
        println("Upstream failure")
        throw RuntimeException("Upstream flow failing")
    }.buffer(1).flowOn(Dispatchers.Default)
        .catch { println("Catch operator: $it") }
        .collect {
            println("Downstream throw")
            throw TestException()
        }
} catch (e: Throwable) {
    println("Catch block: $e " + e.suppressed.contentToString())
}

In this scenario, it is possible to observe "Catch block" upstream exception without "Catch operator: " being invoked though

nbransby commented 1 month ago
suspend fun main() {
    val flow =
        flow<String> {
            emit("yo")
            throw UnsupportedOperationException()
        }
        .catch { println(".catch caught $it") }
    println(flow.first())
}

outputs

yo

whilst:

suspend fun main() {
    val flow =
        flow<String> {
            emit("yo")
            throw UnsupportedOperationException()
        }
        .catch { println(".catch caught $it") }
        .flowOn(Dispatchers.Default)
    println(flow.first())
}

outputs

.catch caught java.lang.UnsupportedOperationException
yo

whilst:

suspend fun main() {
    val flow =
        flow<String> {
            emit("yo")
            throw UnsupportedOperationException()
        }
        .flowOn(Dispatchers.Default)
        .catch { println(".catch caught $it") }
    println(flow.first())
}

outputs

Exception in thread "main" java.lang.UnsupportedOperationException
    at MainKt$main$flow$1.invokeSuspend(Main.kt:8)
    at MainKt$main$flow$1.invoke(Main.kt)
    at MainKt$main$flow$1.invoke(Main.kt)
    at kotlinx.coroutines.flow.SafeFlow.collectSafely(Builders.kt:57)
    at kotlinx.coroutines.flow.AbstractFlow.collect(Flow.kt:226)
    at kotlinx.coroutines.flow.internal.ChannelFlowOperatorImpl.flowCollect(ChannelFlow.kt:191)
    at kotlinx.coroutines.flow.internal.ChannelFlowOperator.collectTo$suspendImpl(ChannelFlow.kt:153)
    at kotlinx.coroutines.flow.internal.ChannelFlowOperator.collectTo(ChannelFlow.kt)
    at kotlinx.coroutines.flow.internal.ChannelFlow$collectToFun$1.invokeSuspend(ChannelFlow.kt:56)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:102)
    at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:589)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:816)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:720)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:707)
    Suppressed: kotlinx.coroutines.flow.internal.AbortFlowException: Flow was aborted, no more elements needed

Was about to create a new issue for this but came across this one, is it the same bug?

nbransby commented 1 month ago

@qwwdfsad do you have any insight into what is going on here?

qwwdfsad commented 1 month ago

@nbransby this is a leaking behaviour of multiple aspects: concurrency imposed by flowOn, catch operator being able to emit and that first cancels asynchronous coroutine (e.g. the behaviour won't reproduce if you replace it with toList).

It's a bit more complicated than just an omission or a trivial bug and I'm yet to make my mind about this behaviour, its validity and whether we can fix it without breaking even more stuff in the meantime

qwwdfsad commented 1 month ago

The root cause that introduced the problem (and fixed other problems): https://github.com/Kotlin/kotlinx.coroutines/pull/3017 (#2860)

alex-dorokhov commented 1 month ago

Was about to create a new issue for this but came across this one, is it the same bug?

Yes, it looks like it is the same bug.