Kotlin / kotlinx-rpc

Add asynchronous RPC services to your multiplatform applications.
https://kotlin.github.io/kotlinx-rpc/
Apache License 2.0
512 stars 7 forks source link

Consuming flows from an RPC #100

Open ShayOinif opened 2 weeks ago

ShayOinif commented 2 weeks ago

Hello. I don't know if it is an issue or not, but I definitely find it weird. It seems like there is no way of telling when the client is disconnected like a status flow, or on disconnection lambda to register. Right now when I consume flow and server goes away, the flow is now completed. As a workaround I created my little helper flows (Also to maintain single client for my app), like so:

      internal val clientCoroutineScope = CoroutineScope(Job())

      internal val httpClientSharedFlow = channelFlow {
          val client = HttpClient { installRPC() }
          send(client)
          awaitClose {
              client.close()
          }
      }.shareIn(clientCoroutineScope, SharingStarted.WhileSubscribed(3.seconds), 1)

        internal sealed interface ClientState {
            data class Disconnected(val message: String) : ClientState
            data class Connected(val client: RPCClient) : ClientState
        }

        @OptIn(ExperimentalCoroutinesApi::class)
        internal val myServiceClientStateFlow = httpClientSharedFlow.flatMapLatest<HttpClient, ClientState> {
            val client = it.rpc {
                url("ws://0.0.0.0:3024/firstRoute")
                rpcConfig { serialization { json() } }
            }

            flow {
                emit(ClientState.Connected(client))
                client.coroutineContext.job.join()
            }.onCompletion {
                client.cancel()
                throw Throwable("Remote disconnected")
            }
        }.retryWhen { cause, _ ->
            repeat(3) {
                emit(ClientState.Disconnected("Error: $cause, retrying in ${3 - it} seconds..."))
                delay(1.seconds)
            }
            true
        }.stateIn(
            clientCoroutineScope,
            SharingStarted.WhileSubscribed(3.seconds),
            ClientState.Disconnected("Initial")
        )

        val uiClientStateFlow = myServiceClientStateFlow.map {
            when (it) {
                is ClientState.Connected -> "Connected"
                is ClientState.Disconnected -> "Disconnected - ${it.message}"
            }
        }

        @OptIn(ExperimentalCoroutinesApi::class, InternalRPCApi::class)
        val uiRpcFlow = myServiceClientStateFlow.transformLatest {
            if (it is ClientState.Connected) {
                streamScoped {
                    emitAll(
                        it.client.withService<MyService>().coldFlow(getPlatform().name)
                    )
                }
            }
        }.retryWhen { cause, attempt ->
            emit("Error - $attempt: $cause")
            delay(1.seconds)
            true
        }

I wonder if it is the intended use or am I missing something or should the interface or behavior change? Thanks, Shay Oinif

Mr3zee commented 3 days ago

Hi, thank you for the question! I'm not sure what do you want to do. If a client or a service disconnects, its corresponding CoroutineScope will cancel. If the there was an error with a flow request - it should throw. There is no retry mechanism right now, that is true

ShayOinif commented 3 days ago

I simply expected if a service goes away, that the flow I am consuming will end no matter on what coroutine scope I collect it.

In a normal shutdown, krpc protocol sends an end message for all clients, but in a case of an unexpected crash, the coroutine scope of the backing - web socket with ktor krpc - ends but the corresponding client flow doesn't, unless collected on the client's coroutine scope.