Kotlin / kotlinx-rpc

Add asynchronous RPC services to your multiplatform applications.
https://kotlin.github.io/kotlinx-rpc/
Apache License 2.0
758 stars 17 forks source link

Show example of intended server -> client RPC call #238

Closed Jire closed 1 day ago

Jire commented 4 days ago

In documentation, there is no example shown of how to send a request from server -> client over an RPC call, only client -> server.

I am not sure if this is just a missing feature, or just missing documentation, but I find this a critical feature for my application, and would like to see the "right way to do it".

I have implemented an interval-polled subscribe system which I use to accomplish this, but if there's a "right way" to do this, that would be preferred.

Something maybe like this:

// in API module
@ClientRpc
interface ClientEventService : ClientRemoteService {
    suspend fun receiveEvent(event: Event)
}

// in client module

class ClientEventService : ClientEventService {
    override suspend fun receiveEvent(event: Event) {
        when (event) {
            is MessageEvent -> {
                println(event.message ?: return)
            }
            else -> println("Unhandled event: $event")
        }
    }
}

registerService<ClientEventService> { ctx -> ClientServiceImpl(ctx) }

// in server module code under an RPC request something like:
val clientEventService: ClientEventService = request.withService()
clientEventService.receiveEvent(MessageEvent("Hello world!"))

Here is a stripped-down version of my implementation to achieve something similar, although with the drawback that it only polls at a set interval and thus has polling overhead + delay:

API module (client and server both depend on this code):

@Serializable
sealed interface Event

@Serializable
data class MessageEvent(
    val message: String?
) : Event

@Rpc
interface EventService : RemoteService {

    suspend fun subscribe(uuid: Long)

    suspend fun pollEvents(uuid: Long): List<Event>?

}

Server module:

class EventServiceImpl(
    override val coroutineContext: CoroutineContext
) : EventService {

    private val uuidToEvents: MutableMap<Long, BlockingQueue<Event>> =
        ConcurrentHashMap()

    private fun emitEvent(
        uuid: Long,
        event: Event
    ): Boolean {
        val events = uuidToEvents[uuid] ?: return false
        return events.add(event)
    }

    override suspend fun subscribe(uuid: Long) {
        check(!uuidToEvents.containsKey(uuid)) {
            "UUID $uuid is already subscribed"
        }

        uuidToEvents[uuid] = ArrayBlockingQueue(256, true)

        emitEvent(uuid, MessageEvent("Successfully subscribed UUID $uuid"))
    }

    override suspend fun pollEvents(uuid: Long): List<Event>? {
        val events = uuidToEvents[uuid] ?: return null

        val polledEvents: MutableList<Event> = mutableListOf()
        events.drainTo(polledEvents)

        return polledEvents
    }

}

Then a client can utilize it like this:

const val OUR_UUID = 1234L
const val POLL_INTERVAL_NANOS = 100L * 1000 * 1000 // 100 milliseconds

val eventService: EventService = rpcClient.withService()

eventService.subscribe(OUR_UUID)

fun pollEvents(uuid: Long) {
    val events: List<Event> = eventService.pollEvents(uuid) ?: return
    for (event in events) {
        when (event) {
            is MessageEvent -> {
                println(event.message ?: return)
            }
            else -> println("Unhandled event: $event")
        }
    }
}

while (true) {
    val elapsedNanos = measureNanoTime {
        pollEvents(OUR_UUID)
    }
    val delayNanos = POLL_INTERVAL_NANOS - elapsedNanos
    if (delayNanos > 0) {
        val delayMillis = delayNanos.milliseconds
        if (delayMillis > 0) {
            delay(delayMillis)
        }
    }
}
lsafer-meemer commented 3 days ago

The protocol is not yet standardized for good. but at this time, I think the intended solution for server->client communication is flows.

Every instance of RemoteService is tide to a websocket session. So, you could give each client connection a flow instance as a field in the remote service to subscribe to.

@Rpc
interface MyService : RemoteService {
    val serverToClientRequests: Flow<MyServerRequest>
}

Also, you could return a flow as a result of a remote service function invocation.

@Rpc
interface MyService : RemoteService {
    fun subscribeToServerRequests(): Flow<MyServerRequest>
}

Sorry for weak answer. I am only a user not a contributor