joffrey-bion / krossbow

A Kotlin multiplatform coroutine-based STOMP client over websockets, with built-in conversions.
MIT License
195 stars 14 forks source link

WebSocket was closed while subscriptions were still active #397

Open Shusek opened 10 months ago

Shusek commented 10 months ago

What happened?

Recently, I integrated WebSocket lifecycle monitoring into our production mobile application. While monitoring, I've noticed that we rarely encounter the error message: "WebSocket was closed while subscriptions were still active." However, I am uncertain about the root cause of this issue. While the message is quite clear, I don't understand why such an exception is raised. If the websocket is closed, it is quite clear that the subscribers should also be closed.

In my WebSocket usecase, the connection remains active for whole the duration of live application, and if any issues occur within the stream, it should automatically reconnect and report what happened. Unfortunately, I cannot repeat this error locally and I have no idea what causes it. It's possible that I'm using the library incorrectly.

Code showing roughly how it works

   reconnectEventFlow.onStart { emit(Unit) }
            .flatMapLatest { _ ->
                callbackFlow {
                    val errorHandler = CoroutineExceptionHandler { _, exception ->
                        close(exception)
                    }
                    val sessionCoroutineContext = EmptyCoroutineContext + errorHandler
                    val connection: StompSessionWithKxSerialization =
                        stompSessionConnection.connectSession(sessionCoroutineContext).also {
                            send(it)
                        }
                    awaitSuspendClose { connection.safeDisconnect() }
                }.flatMapLatest { subscribeChannels(it) }
            }
            .retryWhen { _, _ ->
                true
            }  .shareIn(MainScope(), SharingStarted.WhileSubscribed())

StackTrace: Non-fatal Exception: org.hildan.krossbow.stomp.WebSocketClosedUnexpectedly: the WebSocket was closed while subscriptions were still active. Code: 1013 Reason: at org.hildan.krossbow.stomp.StompSocketKt.decodeToStompEvent(StompSocket.kt:89) at org.hildan.krossbow.stomp.StompSocketKt.access$decodeToStompEvent(StompSocket.kt:1) at org.hildan.krossbow.stomp.StompSocket.decodeStomp(StompSocket.kt:39) at org.hildan.krossbow.stomp.StompSocket.access$decodeStomp(StompSocket.kt:14) at org.hildan.krossbow.stomp.StompSocket$special$$inlined$map$1$2.emit(StompSocket.kt:224) at kotlinx.coroutines.flow.FlowKt__ErrorsKt$catchImpl$2.emit(Errors.kt:158) at kotlinx.coroutines.flow.FlowKt__ErrorsKt$catchImpl$2.emit(Errors.kt:158) at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke(SafeCollector.kt:15) at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke(SafeCollector.kt:15) at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:87) at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:66) at org.hildan.krossbow.websocket.ktor.KtorWebSocketConnectionAdapter$incomingFrames$2.invokeSuspend(KtorWebSocketConnectionAdapter.java:56) at org.hildan.krossbow.websocket.ktor.KtorWebSocketConnectionAdapter$incomingFrames$2.invoke(KtorWebSocketConnectionAdapter.java:62) at org.hildan.krossbow.websocket.ktor.KtorWebSocketConnectionAdapter$incomingFrames$2.invoke(KtorWebSocketConnectionAdapter.java:282) at kotlinx.coroutines.flow.FlowKt__EmittersKt$onCompletion$$inlined$unsafeFlow$1.collect(SafeCollector.common.kt:128) at kotlinx.coroutines.flow.FlowKt__EmittersKt$onCompletion$$inlined$unsafeFlow$1$1.invokeSuspend(SafeCollector.common.kt:1) at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108) at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.java:584) at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793) at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697) at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)

Reproduction and additional details

No response

Krossbow version

5.8.0

Krossbow modules

krossbow-stomp-core, krossbow-stomp-kxserialization, krossbow-stomp-kxserialization-json, krossbow-websocket-core, krossbow-websocket-ktor, krossbow-websocket-okhttp

Kotlin version

1.9.10

Kotlin target platforms

Android

joffrey-bion commented 10 months ago

Thanks a lot for the detailed report. I will review that exception more closely, because the message is based on the assumption that receiving a web socket level Close frame in this place of the code only happens if the WS was closed "unexpectedly" (without disconnecting the STOMP session as a whole). I need to check this assumption again, and maybe adjust how the exception is thrown.

Shusek commented 10 months ago

I was able to repeat this locally when the websocket undergoes maintenance but instead throw 500 on endpoint " wss://mywebsocket" return 101 with CONNECTED frame as if everything was working but after that it server crashed connection. I'm not sure that the same thing happened to users on production but in this case this is quite understandable.

joffrey-bion commented 10 months ago

Thanks for the additional info, this is quite useful. Yeah that is what the exception is supposed to cover. Basically the connection being unexpectedly closed (from the server) without disconnection at the STOMP level. So most likely this is a server failure that is reported as an exception to subscription collectors, which is good. I'd still like to make sure there aren't other cases that could trigger this error

joffrey-bion commented 3 days ago

A new case: https://github.com/joffrey-bion/krossbow/discussions/561