grpc / grpc-kotlin

Kotlin gRPC implementation. HTTP/2 based RPC
https://grpc.io/docs/languages/kotlin
Apache License 2.0
1.2k stars 162 forks source link

Flow and implementation ... ServiceCoroutineImplBase #306

Open ashNext opened 2 years ago

ashNext commented 2 years ago

I want to make a simple chat where one client sends a message and other clients receive it. I have done this on Bidirectional streaming RPC and StreamObserver via the ... ServiceImplBase implementation.

ChatServer.kt

private val clients = mutableMapOf<String, MutableSet<StreamObserver<Chat.MessageTextRequest>>>()

...

private class ChatService : ChatSendServiceGrpc.ChatSendServiceImplBase() {
    override fun send(responseObserver: StreamObserver<Chat.MessageTextRequest>): StreamObserver<Chat.MessageTextRequest> {

        return object : StreamObserver<Chat.MessageTextRequest> {
            override fun onNext(value: Chat.MessageTextRequest) {
                val observers = clients.computeIfAbsent(value.streamId) {
                    logger.info("New streamId ${value.streamId}")
                    mutableSetOf()
                }

                if (observers.add(responseObserver)) {
                    logger.info("Add responseObserver $responseObserver")
                }

                for (observer in observers) {
                    observer.onNext(messageTextRequest {
                        msgText = value.msgText
                    })
                }
            }

            override fun onError(t: Throwable?) {...}

            override fun onCompleted() {...}
        }
    }
}

But to do the same through Flow and implementation ... ServiceCoroutineImplBase does not work. I don't understand how to "remember" streams.

private class ChatService : ChatSendServiceGrpcKt.ChatSendServiceCoroutineImplBase() {
    override fun send(requests: Flow<Chat.MessageTextRequest>): Flow<Chat.MessageTextRequest> {

        return flow {
            requests.collect { value ->

                emit(messageTextRequest {
                    msgText = value.msgText
                })
            }
        }
    }
}
jamesward commented 2 years ago

I have a gRPC Kotlin sample that uses BiDi for a chat client & server: https://github.com/jamesward/cloud-native-grpc-kotlin

Maybe that can help you figure out what is going on?

z0mb1ek commented 2 years ago

@jamesward hi, thx for reply, but your example is very simple, how can I get chatId for my shared flows? i need many chats

jamesward commented 2 years ago

Architecturally there are a lot of variations. In a real-world I'd use Kafka and use that as the hub. For a single-node configuration you can do something like you and I have but you should definitely use MutableSharedFlow as the hub, like: https://github.com/jamesward/cloud-native-grpc-kotlin/blob/main/chat-server/src/main/kotlin/cngk/ChatServer.kt#L17

Note how the sharedFlow.emitAll(filteredFlow) has to be run in a separate coroutine.

z0mb1ek commented 2 years ago

@jamesward how can I get chatId from requests flow in first request? I see only a way like this:

            sharedFlow.combine(requests) { a, b ->
                b
            }.collect {
                println(it.t)
            }
        }

 return sharedFlow

but this check id every message on collect. Is there any way to get chatId in the first message and than save sharedFlow per chatId only once?

jamesward commented 2 years ago

I'm not totally understanding your architecture and what you mean by chatId. Can you provide more details on that?

z0mb1ek commented 2 years ago

There is method with bidi streaming for message exchange for clients. Client send message to this method with chatId, and I need send this message only for other clients with this chatId. In your example you send message to ALL

jamesward commented 2 years ago

I see. Yeah, you will have to put that state somewhere. Map<String, MutableSharedFlow> might work but you'll have to be careful with concurrency. Ultimately this approach is only good for a toy. For prod you'll need to externalize that state or use Kafka (etc).

z0mb1ek commented 2 years ago

No, I cannot. Because return of the flow called before collect. Or can you give some example with code please?