scalapb / zio-grpc

ScalaPB meets ZIO: write purely functional gRPC services and clients using ZIO
Apache License 2.0
257 stars 81 forks source link

Streaming gRPC client has no backpressure #631

Open joroKr21 opened 1 month ago

joroKr21 commented 1 month ago

It uses an unbounded queue: https://github.com/scalapb/zio-grpc/blob/def8d5d99b51330627d7b0b4f84b887de5b03034/core/src/main/scalajvm/scalapb/zio_grpc/client/StreamingClientCallListener.scala#L51

But there is no external backpressure mechanism, we request the next message as soon as we receive one: https://github.com/scalapb/zio-grpc/blob/def8d5d99b51330627d7b0b4f84b887de5b03034/core/src/main/scalajvm/scalapb/zio_grpc/client/StreamingClientCallListener.scala#L27

In addition, it would be beneficial to use a (configurable) batch size > 1 when requesting messages from the server.

If that's ok in terms of IP, I can port the solution from fs2-grpc to zio-grpc.

thesamet commented 1 month ago

A PR that adds backpressure (disabled by default) will be accepted. Generally, porting an implementation should be ok. If unsure please check directly with fs2-grpc team.

On Tue, May 28, 2024, 2:12 AM Georgi Krastev @.***> wrote:

It uses an unbounded queue:

https://github.com/scalapb/zio-grpc/blob/def8d5d99b51330627d7b0b4f84b887de5b03034/core/src/main/scalajvm/scalapb/zio_grpc/client/StreamingClientCallListener.scala#L51

But there is no external backpressure mechanism, we request the next message as soon as we receive one:

https://github.com/scalapb/zio-grpc/blob/def8d5d99b51330627d7b0b4f84b887de5b03034/core/src/main/scalajvm/scalapb/zio_grpc/client/StreamingClientCallListener.scala#L27

In addition, to use a (configurable) batch size > 1 when requesting messages from the server.

If that's ok in terms of IP, I can port the solution from fs2-grpc to zio-grpc.

— Reply to this email directly, view it on GitHub https://github.com/scalapb/zio-grpc/issues/631, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACLBLJWVBHD6CIDF6VYNILZERDBDAVCNFSM6AAAAABIMQB7KCVHI2DSMVQWIX3LMV43ASLTON2WKOZSGMZDANJUGA4DGMA . You are receiving this because you are subscribed to this thread.Message ID: @.***>

joroKr21 commented 3 weeks ago

A PR that adds backpressure (disabled by default) will be accepted.

Where can we configure it? I can't find a good place.

thesamet commented 3 weeks ago

See https://github.com/scalapb/zio-grpc/blob/def8d5d99b51330627d7b0b4f84b887de5b03034/core/src/main/scalajvm/scalapb/zio_grpc/server/ZServerCallHandler.scala#L41-L48

joroKr21 commented 3 weeks ago

I added it as a parameter to ZChannel: https://github.com/scalapb/zio-grpc/pull/636 If you tell me how to call the setting, I could also make it a property.

thesamet commented 3 weeks ago

Can you rephrase your question, I am not sure exactly what are you looking for.

joroKr21 commented 3 weeks ago

In my PR I just added a constructor parameter to ZChannel - should I convert it to a property and if so how should we call it?

joroKr21 commented 3 weeks ago

Or do you mean that I should reuse backpressure_queue_size for the client?

thesamet commented 3 weeks ago

Since it's a constructor parameter, no need to introduce a property.