btlines / grpcakkastream

Use GRPC services with the Akka-stream API
MIT License
69 stars 12 forks source link

Implement end-to-end backpressure #12

Open cbornet opened 6 years ago

cbornet commented 6 years ago

See https://github.com/grpc/grpc-java/pull/3119 for an example of manual flow control of gRPC. It would be great to use it to implement backpressure instead of buffering.

There are also some existing wrappers in the reactive-grpc project for reactive streams implementations that can maybe be reused for akka-streams (they already deal with some tricky things such as cancellation, ...). See https://github.com/salesforce/reactive-grpc/tree/master/reactor for an example of usage of this classes to bind Reactor to gRPC.

Eventually, once end-to-end backpressure is implemented, it would be nice to pass the Reactive-Streams TCK.

btlines commented 6 years ago

Definitely something I'd like to have. I was aware of the grpc-java implementation but wasn't sure of the stability of the api (you need to cast the StreamObserver into ServerStreamObserver to use the request method).

I didn't know about reactive-grpc but will check this out.

assiotis commented 6 years ago

I took a look at reactive-grpc, and it seems fairly straight forward to provide a similar implementation for Akka Streams. I took a stab at doing a quick POC implementation of the server-side and published in this gist. The ServerCalls object is nothing but a bridge to the Publisher & Subscriber implementations in the reactive-grpc project. With the bulk of the implementation there, the generated code simply becomes (Route Guide proto example):

  def bindService(serviceImpl: RouteGuide)(implicit mat: Materializer): ServerServiceDefinition =
    ServerServiceDefinition
      .builder("routeguide.RouteGuide")
      .addMethod(
        METHOD_GET_FEATURE,
        ServerCalls.asyncUnaryCall(
          new ServerCalls.UnaryMethod[io.grpc.routeguide.Point, io.grpc.routeguide.Feature] {
            override def invoke(request: io.grpc.routeguide.Point, responseObserver: StreamObserver[io.grpc.routeguide.Feature]) =
              tubi.rpclib.ServerCalls.oneToOne(request, responseObserver, serviceImpl.getFeature)
          }
        )
      )
      .addMethod(
        METHOD_LIST_FEATURES,
        ... etc etc
     )
      .build()

If there is interest and there are plans to continue development of this library, I can do a proper implementation and contribute a PR back to this repo.

btlines commented 6 years ago

That looks great! I'd be happy to see a PR to support back-pressure.

ktoso commented 6 years ago

Hi there (akka team here) It would be nice to implement it without having to depend on external frameworks... We wanted to get to this at some point as well, but I don't know when we'll have the time to do so. I'll keep an eye on this repo though! :-)

btlines commented 6 years ago

Hi @ktoso, yes that would be ideal but it shouldn't be too difficult to jump from reactive-streams to akka-streams.

BTW do you guys have any plans for a grpc implementation based on akka-http ?

ktoso commented 6 years ago

shouldn't be too difficult to jump from reactive-streams to akka-streams.

Sure, that is trivial ;-) However that's not the point. What matters is the net of dependencies we'd then put onto users (who do indeed value small dependency chains), and esp avoiding two libraries doing the same thing.

BTW do you guys have any plans for a grpc implementation based on akka-http ?

Yes, it's in the roadmap to provide support for GRPC nicely with Akka Streams and eventually over Akka HTTP, but for that we need to develop the HTTP/2 client side which we currently lack.

I was thinking, since the process is basically two steps: a) exposing Akka Streams to users and b) actually making calls over Akka HTTP. Taking a stab already at step 1 in this project would provide to be a great stepping stone and perhaps we could eventually collaborate here.

Sadly I don't know when we'll get to work on this feature, but it's definitely (currently) in the roadmap of things we want to provide.

jroper commented 6 years ago

We did a small amount of work putting Akka streams on top of the grpc-java client in Lagom:

https://github.com/jroper/lagom/tree/grpc-support

By the way, one problem with the current interface that you're providing is that gRPC allows the server to send a status at the end of the response stream. Using a Flow[Request, Response, NotUsed] provides no mechanism for either the server to produce or the client to consume that status message. To allow the status message to be consumed or produced, the flow could materialize to a future of the status message, for example Flow[Request, Response, Future[GrpcStatus]]. This is what we were thinking of implementing in Lagom when we add gRPC support.

cbornet commented 6 years ago

Is it fixed by #17 ?