rsocket / rsocket-kotlin

RSocket Kotlin multi-platform implementation
http://rsocket.io
Apache License 2.0
553 stars 37 forks source link

Handle client disconnect in server #229

Open mihai1voicescu opened 2 years ago

mihai1voicescu commented 2 years ago

Each time a client disconnects we get an unhandled error in our server logs. How can we handle them since they are flooding our server logs?

We are using ktor_version=2.1.0 with WebSockets and the Netty engine. We also tried CIO but the same result.

Caused by: kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed
    at kotlinx.coroutines.channels.Closed.getReceiveException(AbstractChannel.kt:1108)
    at kotlinx.coroutines.channels.AbstractChannel$ReceiveElement.resumeReceiveClosed(AbstractChannel.kt:913)
    at kotlinx.coroutines.channels.AbstractSendChannel.helpClose(AbstractChannel.kt:342)
    at kotlinx.coroutines.channels.AbstractSendChannel.close(AbstractChannel.kt:271)
    at kotlinx.coroutines.channels.SendChannel$DefaultImpls.close$default(Channel.kt:93)
    at io.ktor.websocket.DefaultWebSocketSessionImpl$runIncomingProcessor$1.invokeSuspend(DefaultWebSocketSession.kt:204)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTaskKt.resume(DispatchedTask.kt:234)
    at kotlinx.coroutines.DispatchedTaskKt.resumeUnconfined(DispatchedTask.kt:190)
    at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:161)
    at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:397)
    at kotlinx.coroutines.CancellableContinuationImpl.completeResume(CancellableContinuationImpl.kt:513)
    at kotlinx.coroutines.channels.AbstractChannel$ReceiveHasNext.completeResumeReceive(AbstractChannel.kt:947)
    at kotlinx.coroutines.channels.AbstractSendChannel.offerInternal(AbstractChannel.kt:56)
    at kotlinx.coroutines.channels.AbstractSendChannel.send(AbstractChannel.kt:134)
    at io.ktor.websocket.RawWebSocketJvm$1.invokeSuspend(RawWebSocketJvm.kt:68)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
    at kotlinx.coroutines.internal.LimitedDispatcher.run(LimitedDispatcher.kt:42)
    at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:95)
    at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)
whyoleg commented 2 years ago

Looks like the same issue as #211. So I think it's ktor side As for workaround is to provide CoroutineExceptionHandler to server instance via scope or parentCoroutineContext which goes directly into applicationEngineEnvironment

whyoleg commented 2 years ago

related: #226 May be it will be possible to fix it in rsocket-kotlin in similar way during refactoring of transports

mihai1voicescu commented 2 years ago

@whyoleg thank you for your quick response!

That is also what we have been trying, but no success yet. Maybe we are doing something stupid since the handler never gets called.

We tried with the applicationEngingeEnviroment like this:

private val handler = CoroutineExceptionHandler { coroutineContext, throwable ->
    println("CALLED")
    when (throwable) {
        is kotlinx.coroutines.channels.ClosedReceiveChannelException -> Unit
        else -> throw throwable
    }
}

fun applicationEngine(): NettyApplicationEngine {
    val port = config.deployment("port", "80").toInt()

    val host = config.deployment("host", "0.0.0.0")

    return embeddedServer(
        Netty,
        applicationEngineEnvironment {
            module {
                mount()
                printRoutes()
            }
            connector {
                this.port = port
                this.host = host
            }

            this.parentCoroutineContext = this.parentCoroutineContext + handler

        }) {

    }
}

We also tried passing a scope with a handler to the RSocketRequestHandler(EmptyCoroutineContext+handler) and it still does not get called.

whyoleg commented 2 years ago

From what I've checked, looks like it's not possible to add exception handler for websocket session... Need to create an issue on ktor side for this (?). Still it's possible to hide this exception on rsocket side, but I'm not sure, what is the best thing now as in that way it will be not possible to distinguish between just close and connection failure. I need more time to investigate, if POC of new transport API will have this possibility...

whyoleg commented 2 years ago

will be fixed on ktor side here: https://youtrack.jetbrains.com/issue/KTOR-5016/Channel-was-closed-exception-in-WebSocket-during-normal-use

mihai1voicescu commented 2 years ago

We did some digging, for the moment adding a simple try/catch at https://github.com/rsocket/rsocket-kotlin/blob/a09a2b500e2e7b63c5837958eda8730f19b4cc00/rsocket-transport-ktor/rsocket-transport-ktor-websocket/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/WebSocketConnection.kt#L36 will catch the error.

Maybe there is a way to propagate it and notify that the rSocket was closed (since this is very common when dealing with mobile and web clients)?

Maybe you can point us in the right direction and we make a PR?

whyoleg commented 2 years ago

Change should be a little bigger. We need to make receive result nullable and handle it where it's called. This will affect all transport implementations and not only websocket. Also handling of error error in send should be there: if send is called while receive is in process (as those operations can be performed in parallel). I will try to prepare a quicfix(and new patch release)this weekend, until new transport API will be available.

mihai1voicescu commented 2 years ago

Thank you! If there is something we can help with let us know.

whyoleg commented 2 years ago

Sorry for the delay. Focus switched on new transport API. I've tried to create a quick fix, but it's really degrade stability and expected behavior of current transport implementation and I need to investigate more time that I expected. At the same time I still think, that this specific issue should be fixed on ktor side. I will try to ensure that it will be fixed in ktor 2.2.0 if ktor team will not do it.