rsocket / rsocket-kotlin

RSocket Kotlin multi-platform implementation
http://rsocket.io
Apache License 2.0
546 stars 37 forks source link

TcpClientTransport: Supress ClosedReceiveChannelException when server disconnects #226

Open yuriykulikov opened 2 years ago

yuriykulikov commented 2 years ago

Motivation

As of 0.15.4, when the client loses the connection to the server, a ClosedReceiveChannelException is thrown in one of the client coroutines. It is propagated to the CoroutineExceptionHandler and is printed. It looks like this (using TcpClientTransport):

Exception in thread "DefaultDispatcher-worker-1 @rSocket-tcp-client#73" kotlinx.coroutines.channels.ClosedReceiveChannelException: Unexpected EOF: expected 3 more bytes
    at io.ktor.utils.io.ByteBufferChannel.readFullySuspend(ByteBufferChannel.kt:573)
    at io.ktor.utils.io.ByteBufferChannel.access$readFullySuspend(ByteBufferChannel.kt:24)
    at io.ktor.utils.io.ByteBufferChannel$readFullySuspend$1.invokeSuspend(ByteBufferChannel.kt)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
    at kotlinx.coroutines.internal.LimitedDispatcher.run(LimitedDispatcher.kt:42)
    at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:95)
    at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:749)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)
    Suppressed: kotlinx.coroutines.DiagnosticCoroutineContextException: [CoroutineName(rSocket-tcp-client), CoroutineId(73), "rSocket-tcp-client#73":StandaloneCoroutine{Cancelling}@61931601, Dispatchers.IO]

Desired solution

In my opinion, this exception can be supressed, because if the channel is closed, the client cannot do anything about it and should just quit or reconnect. This can be achieved by attaching a CoroutineExceptionHandler to the TcpConnection coroutineContext:

@TransportApi
internal class TcpConnection(
        socket: Socket,
        coroutineContext: CoroutineContext,
        override val pool: ObjectPool<ChunkBuffer>
) : Connection {
    override val coroutineContext: CoroutineContext = coroutineContext + CoroutineExceptionHandler { _, throwable ->
        when (throwable) {
            is ClosedReceiveChannelException -> Unit // NOP
            else -> throw throwable
        }
    }
// ...
}

Considered alternatives

An alternative would be to catch the exception close to the readPacket invokation, but doing so breaks the responder rSocket coroutineScope behavior. For example, if client code relies on the scope cancellation to log disconnects, this cancellation won't happen.

Additional context

Exception is usually thrown when reading packet length:

/*
 * Copyright 2015-2022 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package io.rsocket.kotlin.transport.ktor.tcp

import io.ktor.network.sockets.*
import io.ktor.util.cio.*
import io.ktor.utils.io.*
import io.ktor.utils.io.core.*
import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.Connection
import io.rsocket.kotlin.frame.io.*
import io.rsocket.kotlin.internal.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlin.coroutines.*

@TransportApi
internal class TcpConnection(
    socket: Socket,
    override val coroutineContext: CoroutineContext,
    override val pool: ObjectPool<ChunkBuffer>
) : Connection {
    private val socketConnection = socket.connection()

    private val sendChannel = @Suppress("INVISIBLE_MEMBER") SafeChannel<ByteReadPacket>(8)
    private val receiveChannel = @Suppress("INVISIBLE_MEMBER") SafeChannel<ByteReadPacket>(8)

    init {
        launch {
            socketConnection.output.use {
                while (isActive) {
                    val packet = sendChannel.receive()
                    val length = packet.remaining.toInt()
                    try {
                        writePacket {
                            @Suppress("INVISIBLE_MEMBER") writeLength(length)
                            writePacket(packet)
                        }
                        flush()
                    } catch (e: Throwable) {
                        packet.close()
                        throw e
                    }
                }
            }
        }
        launch {
            socketConnection.input.apply {
                while (isActive) {
                    // original code
                    // val length = @Suppress("INVISIBLE_MEMBER") readPacket(3).readLength()
                    // val packet = readPacket(length)
                    // end of original code
                    // new code
                    val packet = try {
                        val length = @Suppress("INVISIBLE_MEMBER") readPacket(3).readLength()
                        readPacket(length)
                    } catch (closedReceiveChannelException: ClosedReceiveChannelException) {
                        println("Receive channel was closed!")
                        // this is the exception
                        // if replaced with break or cancel, then responder rSocket scope won't be cancelled
                        throw closedReceiveChannelException
                    }
                    // end of new code
                    try {
                        receiveChannel.send(packet)
                    } catch (cause: Throwable) {
                        packet.close()
                        throw cause
                    }
                }
            }
        }
        coroutineContext.job.invokeOnCompletion {
            @Suppress("INVISIBLE_MEMBER") sendChannel.fullClose(it)
            @Suppress("INVISIBLE_MEMBER") receiveChannel.fullClose(it)
            socketConnection.input.cancel(it)
            socketConnection.output.close(it)
            socketConnection.socket.close()
        }
    }

    override suspend fun send(packet: ByteReadPacket): Unit = sendChannel.send(packet)
    override suspend fun receive(): ByteReadPacket = receiveChannel.receive()
}
olme04 commented 2 years ago

Hey, thanks for raising this. There is already similar opened issue #148. Im now in progress of rewriting transport API to support QUIC, and I will try to solve this unexpected error in some way to not clutter users with such errors.

rocketraman commented 2 years ago

Same thing happens on the server-side when a client disconnects.