nomisRev / kotlin-kafka

Kafka bindings for Kotlin `suspend`, and Kafka streaming operators for KotlinX Flow.
https://nomisRev.github.io/kotlin-kafka/
Apache License 2.0
103 stars 10 forks source link

defaultCoroutineExceptionHandler probably shouldn't catch kotlin.Error #192

Open felixkrull-neuland opened 4 months ago

felixkrull-neuland commented 4 months ago

The default CoroutineExceptionHandler in kotlin-kafka currently catches, logs, and then eats every exception that's thrown in background coroutines. Unfortunately, that includes things like OutOfMemoryError and other fatal errors that probably shouldn't be handled in this way.

Specifically we had an OOMError in the Kafka deserializer which was caught and logged by kotlin-kafka and then not processed further:

KafkaDispatcher with [io.github.nomisRev.kafka.receiver.internals.KafkaSchedulerKt$special$$inlined$CoroutineExceptionHandler$1@61131887, StandaloneCoroutine{Cancelling}@fbd2604, java.util.concurrent.ScheduledThreadPoolExecutor@3ae87e38[Running, pool size = 1, active threads = 1, queued tasks = 2, completed tasks = 62]] failed with an uncaught exception. Report to kotlin-kafka repo please.

The consequence was that a Kafka consumer silently stopped consuming, in a way I didn't initially think to look for. I think the coroutine exception handler should rethrow java.lang.Error/kotlin.Error so that it ends up in the thread uncaught exception handler (at least on JVM as far as I can tell from the docs) which is at least something I as the user of this library can override.

IMO, the ideal solution would be for uncaught exceptions in background tasks to somehow be raised from e.g. a receive() call so they can be handled by the user as normal, but I don't know how feasible that actually is.

nomisRev commented 4 months ago

Thank you for the report @felixkrull-neuland! (Is this reported on 0.3.x?)

That is indeed very bad, I thought I had tests for this so let's add them..

IMO, the ideal solution would be for uncaught exceptions in background tasks to somehow be raised from e.g. a receive() call so they can be handled by the user as normal, but I don't know how feasible that actually is.

I just checked (0.4.x) in the places where this is being used, and it's only being used when the Channel is already closed 🤔 So a terminal event was already seen by the Flow prior. So we cannot send it to receive() using Channel.close, any suggestions or thoughts?

felixkrull-neuland commented 4 months ago

Oh yeah, this is on 0.3.1, should've mentioned that, sorry :)

Rethrowing from a not-background method would probably only make sense for non-fatal throwables. If your runtime environment tells you it's running out of memory there's not really any room maybe try and report that later.

Maybe the coroutine exception handler should just rethrow all exceptions it catches regardless of whether they're fatal or not? Since it's just a last resort for unexpectedly uncaught exceptions, it might make sense to fail a bit more loudly rather than silently stop running.

nomisRev commented 4 months ago

Hey @felixkrull-neuland,

Let's keep this discussion going, I'd prefer to improve this based on 0.4.x but I'm not sure I understand your train of thought. Or well, I don't think it's possible with KotlinX Coroutines what you're saying.

This is an extremely simplified version of the kotlin-kafka loop, where we first create a Channel, and when the resulting Flow is collected we start the loop. Here we simulate sending 1 message, and then we encounter a failure.

So, we cannot do anything except terminate the loop and send the exception to the user. We do this by closing the Channel, and as a result it'll be re thrown from the Flow.

However, if we encounter a second error whilst closing the kotlin-kafka loop, where do we send it (This is perhaps a parallel record still being processed, or committing to Kafka that fails, etc.)

If we try to send it to the Channel again, Channel.close will return false, and ignores the error. Simple throwing an exception within onCompletion is completely ignored here. So as a best effort I send it to the CoroutineExceptionHandler instead, because I cannot send it to the Channel anymore.

Possibly there was an additional bug in 0.3.x, but that still raises the question if the 0.4.x behavior is correct. Can you reproduce the example in a simple test? (i.e. manually throwing OOM from a deserializer?).

suspend fun example(): Flow<Int> {
    val ch = Channel<Int>(Channel.BUFFERED)
    return ch.consumeAsFlow()
        .onStart {
            println("Sending")
            ch.send(1)
            println("Closing 1")
            ch.close(RuntimeException("Boom!"))
        }.onCompletion {
            println("Closing 2")
            requireNotNull(ch.close(RuntimeException("Boom2!"))) {
                "I am swallowed by KotlinX Coroutines"
            }
        }
}

fun main() = runBlocking<Unit> {
    example()
        .onEach { println(it) }
        .launchIn(this)
}
Sending
Closing 1
1
Closing 2
Exception in thread "main" java.lang.RuntimeException: Boom!
    at kotlinconf.FlowExampleKt$example$2.invokeSuspend(FlowExample.kt:18)
    at kotlinconf.FlowExampleKt$example$2.invoke(FlowExample.kt)
    at kotlinconf.FlowExampleKt$example$2.invoke(FlowExample.kt)
    at kotlinx.coroutines.flow.FlowKt__EmittersKt$onStart$$inlined$unsafeFlow$1.collect(SafeCollector.common.kt:115)
    at kotlinx.coroutines.flow.FlowKt__EmittersKt$onCompletion$$inlined$unsafeFlow$1.collect(SafeCollector.common.kt:113)
    at kotlinx.coroutines.flow.FlowKt__TransformKt$onEach$$inlined$unsafeTransform$1.collect(SafeCollector.common.kt:112)
    at kotlinx.coroutines.flow.FlowKt__CollectKt.collect(Collect.kt:26)
    at kotlinx.coroutines.flow.FlowKt.collect(Unknown Source)
    at kotlinx.coroutines.flow.FlowKt__CollectKt$launchIn$1.invokeSuspend(Collect.kt:46)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:104)
    at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:277)
    at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:95)
    at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:69)
    at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
    at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:48)
    at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
    at kotlinconf.FlowExampleKt.main(FlowExample.kt:27)
    at kotlinconf.FlowExampleKt.main(FlowExample.kt)