akka / akka-grpc

Akka gRPC
https://doc.akka.io/docs/akka-grpc/
Other
432 stars 123 forks source link

Concatenating routes #1066

Open Marcus-Rosti opened 4 years ago

Marcus-Rosti commented 4 years ago

Versions used

Akka-grpc version: 1.0.0

Expected Behavior

Concating Route of server and reflection results in both working on same port

Actual Behavior

Concating partial routes leads to first route only being evaluated

Relevant logs


{"sourceThread":"attribute-service-akka.actor.default-dispatcher-4","cause":"scala.MatchError: HttpRequest(HttpMethod(POST),http://localhost:50051/grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo,Vector(te: trailers, User-Agent: grpc-c++/1.30.1 grpc-c/10.0.0 (osx; chttp2), grpc-accept-encoding: identity,deflate,gzip, Accept-Encoding: identity, gzip, x-http2-stream-id: 1),HttpEntity.Chunked(application/grpc),HttpProtocol(HTTP/2.0)) (of class akka.http.scaladsl.model.HttpRequest)\n\tat scala.PartialFunction$$anon$1.apply(PartialFunction.scala:341)\n\tat scala.PartialFunction$$anon$1.apply(PartialFunction.scala:339)\n\tat scala.PartialFunction$Unlifted.applyOrElse(PartialFunction.scala:319)\n\tat scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:35)\n\tat com.sony.sie.caprica.grpc.server.GrpcServer.$anonfun$services$1(GrpcServer.scala:23)\n\tat akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation.$anonfun$$tilde$2(RouteConcatenation.scala:47)\n\tat akka.http.scaladsl.util.FastFuture$.strictTransform$1(FastFuture.scala:41)\n\tat akka.http.scaladsl.util.FastFuture$.transformWith$extension(FastFuture.scala:45)\n\tat akka.http.scaladsl.util.FastFuture$.flatMap$extension(FastFuture.scala:26)\n\tat akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation.$anonfun$$tilde$1(RouteConcatenation.scala:44)\n\tat akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation.$anonfun$$tilde$1(RouteConcatenation.scala:44)\n\tat akka.http.scaladsl.server.directives.BasicDirectives.$anonfun$mapRouteResultWith$2(BasicDirectives.scala:74)\n\tat akka.http.scaladsl.server.directives.BasicDirectives.$anonfun$textract$2(BasicDirectives.scala:161)\n\tat akka.http.scaladsl.server.directives.ExecutionDirectives.$anonfun$handleExceptions$2(ExecutionDirectives.scala:32)\n\tat akka.http.scaladsl.server.directives.BasicDirectives.$anonfun$textract$2(BasicDirectives.scala:161)\n\tat akka.http.scaladsl.server.Route$.$anonfun$asyncHandler$1(Route.scala:86)\n\tat com.sony.sie.caprica.oasis.prometheus.GRPCInterceptor.$anonfun$apply$1(GRPCInterceptor.scala:36)\n\tat datadog.trace.instrumentation.akkahttp.AkkaHttpServerInstrumentation$DatadogAsyncWrapper.apply(AkkaHttpServerInstrumentation.java:176)\n\tat datadog.trace.instrumentation.akkahttp.AkkaHttpServerInstrumentation$DatadogAsyncWrapper.apply(AkkaHttpServerInstrumentation.java:159)\n\tat akka.http.impl.engine.http2.Http2Blueprint$.$anonfun$handleWithStreamIdHeader$1(Http2Blueprint.scala:130)\n\tat akka.stream.impl.fusing.MapAsyncUnordered$$anon$31.onPush(Ops.scala:1376)\n\tat akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:523)\n\tat akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:409)\n\tat akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:606)\n\tat akka.stream.impl.fusing.ActorGraphInterpreter$SimpleBoundaryEvent.execute(ActorGraphInterpreter.scala:47)\n\tat akka.stream.impl.fusing.ActorGraphInterpreter$SimpleBoundaryEvent.execute$(ActorGraphInterpreter.scala:43)\n\tat akka.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary$OnNext.execute(ActorGraphInterpreter.scala:85)\n\tat akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:581)\n\tat akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:749)\n\tat akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:764)\n\tat akka.actor.Actor.aroundReceive(Actor.scala:539)\n\tat akka.actor.Actor.aroundReceive$(Actor.scala:537)\n\tat akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:671)\n\tat akka.actor.ActorCell.receiveMessage(ActorCell.scala:614)\n\tat akka.actor.ActorCell.invoke(ActorCell.scala:583)\n\tat akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)\n\tat akka.dispatch.Mailbox.run(Mailbox.scala:229)\n\tat akka.dispatch.Mailbox.exec(Mailbox.scala:241)\n\tat akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\n\n","akkaSource":"akka.actor.ActorSystemImpl(attribute-service)","akkaTimestamp":"02:01:20.308UTC","message":"Error during processing of request: 'HttpRequest(HttpMethod(POST),http://localhost:50051/grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo,Vector(te: trailers, User-Agent: grpc-c++/1.30.1 grpc-c/10.0.0 (osx; chttp2), grpc-accept-encoding: identity,deflate,gzip, Accept-Encoding: identity, gzip, x-http2-stream-id: 1),HttpEntity.Chunked(application/grpc),HttpProtocol(HTTP/2.0)) (of class akka.http.scaladsl.model.HttpRequest)'. Completing with 500 Internal Server Error response. To change default exception handling behavior, provide a custom ExceptionHandler.","thread":"attribute-service-akka.actor.default-dispatcher-11","sourceActorSystem":"attribute-service","logger":"akka.actor.ActorSystemImpl","logtype":"app","level":"ERROR","timestamp":"2020-07-15T02:01:20.309Z"}
{"sourceThread":"attribute-service-akka.actor.default-dispatcher-4","akkaSource":"Http2ServerDemux(akka://attribute-service)","akkaTimestamp":"02:01:20.348UTC","message":"handleOutgoingEnded received unexpectedly in state HalfClosedLocal. This indicates a bug in Akka HTTP, please report it to the issue tracker.","thread":"attribute-service-akka.actor.default-dispatcher-11","sourceActorSystem":"attribute-service","logger":"akka.http.impl.engine.http2.Http2ServerDemux","logtype":"app","level":"WARN","timestamp":"2020-07-15T02:01:20.348Z"}
{"sourceThread":"attribute-service-akka.actor.default-dispatcher-187","akkaSource":"Http2ServerDemux(akka://attribute-service)","akkaTimestamp":"02:01:20.350UTC","message":"handleOutgoingEnded received unexpectedly in state Closed. This indicates a bug in Akka HTTP, please report it to the issue tracker.","thread":"attribute-service-akka.actor.default-dispatcher-11","sourceActorSystem":"attribute-service","logger":"akka.http.impl.engine.http2.Http2ServerDemux","logtype":"app","level":"WARN","timestamp":"2020-07-15T02:01:20.35Z"}

Reproducible Test Case

package object grpc {
  type Service = PartialFunction[HttpRequest, Future[HttpResponse]]
}

----
import akka.actor.ActorSystem
import akka.grpc.scaladsl.ServiceHandler
import akka.http.scaladsl.server.{Route, RouteResult}
import akka.stream.ActorMaterializer

import scala.concurrent.ExecutionContext

class GrpcServer(
  port: Int
  grpcServices: Service*
)(
  implicit system: ActorSystem,
  materializer: ActorMaterializer,
  ec: ExecutionContext
) extends Server(port) {

  override val services: Route = { ctx =>
    ServiceHandler.concat(grpcServices: _*)(ctx.request).map(RouteResult.Complete)
  }

}

--------
import akka.actor.ActorSystem
import akka.http.scaladsl.{Http, HttpConnectionContext}
import akka.stream.Materializer
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.{Route, RouteResult}

import scala.concurrent.{ExecutionContext, Future}

abstract class Server(port: Int)(
  implicit system: ActorSystem,
  materializer: Materializer,
  ec: ExecutionContext
) {
  val services: Route

  def run(reflection: Service): Future[Http.ServerBinding] = {
    val reflectionRoute: Route = { ctx =>
      reflection(ctx.request).map(RouteResult.Complete)
    }

    Http().bindAndHandleAsync(
     Route.asyncHandler(concat(services, reflectionRoute)),
      interface = "0.0.0.0",
      port = port,
      connectionContext = HttpConnectionContext(),
      parallelism = Runtime.getRuntime.availableProcessors
    )
  }

}
Marcus-Rosti commented 4 years ago

Oh and the call I'm making is:

$ grpc_cli ls localhost:50051
raboof commented 4 years ago

This is indeed an area where the interoperability between Akka gRPC and plain Akka HTTP still can use more work.

In this particular case, you could probably solve your direct problem by inserting your reflection route into the list passed to ServiceHandler.concat. https://doc.akka.io/docs/akka-grpc/current/server/walkthrough.html#serving-multiple-services might also be interesting.

ServiceHandler.concat (conceptually) 'seals' the route to make sure that if no matching service is found, we return a gRPC 'Not Found' (rather than a plain HTTP2 'Not Found' - those are unfortunately different). Perhaps we should make that clearer in the naming, and/or provide an easy way to inject a RejectionHandler that produces a 'plain' or 'gRPC'-style 'Not Found' depending on the request.

We hope to make this nicer when we upgrade to (the as of yet unreleased) Akka HTTP 10.2.0, which will have better features for converting between functions and routes (https://github.com/akka/akka-http/pull/3367, https://github.com/akka/akka-http/pull/3239), though this is not complete yet.

If you could try Akka gRPC with an Akka HTTP 10.2 snapshot and provide feedback/propose improvements, that would be really helpful!

Marcus-Rosti commented 4 years ago

OH! Okay I understand now. I guess the functionality I was looking for was to have the Service itself be authenticated, whereas the reflection isn't. But have them both be on the same port. That's my server setup looks a bit awkward, I'm trying to get the authenticated server match the unauthenticated setup.

import akka.actor.ActorSystem
import akka.grpc.scaladsl.ServiceHandler
import akka.http.scaladsl.server.{Directive0, Route, RouteResult}
import akka.stream.ActorMaterializer
import com.sony.sie.caprica.grpc.Service

import scala.concurrent.ExecutionContext

class AuthenticatedGrpcServer(
  port: Int,
  authenticator: Directive0,
  authedServices: Service*
)(
  implicit val ec: ExecutionContext,
  system: ActorSystem,
  materializer: ActorMaterializer
) extends Server(port, interceptor) {

  override val services: Route =
    authenticator { ctx =>
      ServiceHandler
        .concat(authedServices: _*)(ctx.request)
        .map(RouteResult.Complete)
    }
}
raboof commented 4 years ago

I guess the functionality I was looking for was to have the Service itself be authenticated, whereas the reflection isn't

Aah, gotcha! For now you could probably get away with something like:

val reflectionRoute: Route = {
  { ctx =>
      reflection
        .andThen(_.fast.map(RouteResult.Complete)(ctx.executionContext))
        .applyOrElse[HttpRequest, Future[RouteResult]](ctx.request, _ => ctx.reject())
    }
}

concat(
  reflectionRoute,
  services
)
Marcus-Rosti commented 4 years ago

Let me try that and I'll report back!

Marcus-Rosti commented 4 years ago

That worked thanks! You can close this issue if you'd like or just tie it to whatever improvement y'all have in the works

Marcus-Rosti commented 4 years ago

although the fix was without .fast

val reflectionRoute: Route = {
  { ctx =>
      reflection
        .andThen(_.map(RouteResult.Complete)(ctx.executionContext))
        .applyOrElse[HttpRequest, Future[RouteResult]](ctx.request, _ => ctx.reject())
    }
}

concat(
  reflectionRoute,
  services
)