Open ValdemarGr opened 1 year ago
Grpc has support for interceptors on both client and server. Having such tools in this library could facilitate improvements in usage (for instance tracing and authorization).
The ideas proposed here would also cover other similar issues (https://github.com/typelevel/fs2-grpc/issues/627, https://github.com/typelevel/fs2-grpc/issues/6, https://github.com/typelevel/fs2-grpc/pull/567).
I have taken inspiration from cats-tagless for the Dom and Cod typeclasses. https://github.com/typelevel/cats-tagless/blob/master/core/src/main/scala/cats/tagless/aop/Aspect.scala
cats-tagless
Dom
Cod
The following example covers the server-side of an implementation, but a client aspect is very similar.
If you are interested I could put together a PR for this?
import cats._ import cats.implicits._ // shared between client and server case class CallContext[Req, Res]( metadata: Metadata, methodDescriptor: io.grpc.MethodDescriptor[Req, Res] ) trait ServiceAspect[F[_], Dom[_], Cod[_], A] { self => def visitUnaryToUnary[Req, Res]( callCtx: CallContext[Req, Res], dom: Dom[Req], cod: Cod[Res], request: (Req, A) => F[Res] ): Req => F[Res] def visitUnaryToStreaming[Req, Res]( callCtx: CallContext[Req, Res], dom: Dom[Req], cod: Cod[Res], request: (Req, A) => fs2.Stream[F, Res] ): Req => fs2.Stream[F, Res] def visitStreamingToUnary[Req, Res]( callCtx: CallContext[Req, Res], dom: Dom[Req], cod: Cod[Res], request: (fs2.Stream[F, Req], A) => F[Res] ): fs2.Stream[F, Req] => F[Res] def visitStreamingToStreaming[Req, Res]( callCtx: CallContext[Req, Res], dom: Dom[Req], cod: Cod[Res], request: (fs2.Stream[F, Req], A) => fs2.Stream[F, Res] ): fs2.Stream[F, Req] => fs2.Stream[F, Res] def modify[B](f: A => F[B])(implicit F: Monad[F]): ServiceAspect[F, Dom, Cod, B] = new ServiceAspect[F, Dom, Cod, B] { override def visitUnaryToUnary[Req, Res]( callCtx: CallContext[Req, Res], dom: Dom[Req], cod: Cod[Res], request: (Req, B) => F[Res] ): Req => F[Res] = self.visitUnaryToUnary[Req, Res]( callCtx, dom, cod, (req, a) => f(a).flatMap(request(req, _)) ) override def visitUnaryToStreaming[Req, Res]( callCtx: CallContext[Req, Res], dom: Dom[Req], cod: Cod[Res], request: (Req, B) => Stream[F, Res] ): Req => Stream[F, Res] = self.visitUnaryToStreaming[Req, Res]( callCtx, dom, cod, (req, a) => fs2.Stream.eval(f(a)).flatMap(request(req, _)) ) override def visitStreamingToUnary[Req, Res]( callCtx: CallContext[Req, Res], dom: Dom[Req], cod: Cod[Res], request: (Stream[F, Req], B) => F[Res] ): Stream[F, Req] => F[Res] = self.visitStreamingToUnary[Req, Res]( callCtx, dom, cod, (req, a) => f(a).flatMap(request(req, _)) ) override def visitStreamingToStreaming[Req, Res]( callCtx: CallContext[Req, Res], dom: Dom[Req], cod: Cod[Res], request: (Stream[F, Req], B) => Stream[F, Res] ): Stream[F, Req] => Stream[F, Res] = self.visitStreamingToStreaming[Req, Res]( callCtx, dom, cod, (req, a) => fs2.Stream.eval(f(a)).flatMap(request(req, _)) ) } } // https://github.com/typelevel/cats-tagless/blob/master/core/src/main/scala/cats/tagless/Trivial.scala final class Trivial[A] private extends Serializable object Trivial { private val any = new Trivial[Any] implicit def instance[A]: Trivial[A] = any.asInstanceOf[Trivial[A]] } object ServiceAspect { def default[F[_], Dom[_], Cod[_]] = new ServiceAspect[F, Dom, Cod, io.grpc.Metadata] { override def visitUnaryToUnary[Req, Res]( callCtx: CallContext[Req, Res], dom: Dom[Req], cod: Cod[Res], request: (Req, Metadata) => F[Res] ): Req => F[Res] = request(_, callCtx.metadata) override def visitUnaryToStreaming[Req, Res]( callCtx: CallContext[Req, Res], dom: Dom[Req], cod: Cod[Res], request: (Req, Metadata) => Stream[F, Res] ): Req => Stream[F, Res] = request(_, callCtx.metadata) override def visitStreamingToUnary[Req, Res]( callCtx: CallContext[Req, Res], dom: Dom[Req], cod: Cod[Res], request: (Stream[F, Req], Metadata) => F[Res] ): Stream[F, Req] => F[Res] = request(_, callCtx.metadata) override def visitStreamingToStreaming[Req, Res]( callCtx: CallContext[Req, Res], dom: Dom[Req], cod: Cod[Res], request: (Stream[F, Req], Metadata) => Stream[F, Res] ): Stream[F, Req] => Stream[F, Res] = request(_, callCtx.metadata) } }
case class Request() case class Response() case class StreamingResponse() trait SomeServiceFs2Grpc[F[_], A] { def doSomething(request: Request, ctx: A): F[Response] def doSomethingStream(request: Request, ctx: A): fs2.Stream[F, StreamingResponse] } object SomeServiceFs2Grpc { def serviceBinding[F[_]: Async, Dom[_], Cod[_], A]( dispatcher: Dispatcher[F], impl: SomeServiceFs2Grpc[F, A], serverOptions: ServerOptions, aspect: ServiceAspect[F, Dom, Cod, A] )( implicit domRequest: Dom[Request], codResponse: Cod[Response], codStreamingResponse: Cod[StreamingResponse] ) = io.grpc.ServerServiceDefinition .builder(SomeService.SERVICE) .addMethod( SomeService.METHOD_DO_SOMETHING, fs2.grpc.server .Fs2ServerCallHandler[F](dispatcher, serverOptions) .unaryToUnaryCall[Request, Response] { (r, m) => aspect .visitUnaryToUnary[Request, Response]( CallContext(m, SomeService.METHOD_DO_SOMETHING), domRequest, codResponse, (r, a) => impl.doSomething(r, a) ) .apply(r) } ) .addMethod( SomeService.METHOD_DO_SOMETHING_STREAM, fs2.grpc.server .Fs2ServerCallHandler[F](dispatcher, serverOptions) .unaryToStreamingCall[Request, StreamingResponse] { (r, m) => aspect .visitUnaryToStreaming[Request, StreamingResponse]( CallContext(m, SomeService.METHOD_DO_SOMETHING_STREAM), domRequest, codStreamingResponse, (r, a) => impl.doSomethingStream(r, a) ) .apply(r) } ) .build() } object SomeService { val METHOD_DO_SOMETHING: io.grpc.MethodDescriptor[Request, Response] = ??? val METHOD_DO_SOMETHING_STREAM: io.grpc.MethodDescriptor[Request, StreamingResponse] = ??? val SERVICE: io.grpc.ServiceDescriptor = ??? }
val mySimpleImpl: SomeServiceFs2Grpc[IO, Metadata] = ??? val disp: Dispatcher[IO] = ??? SomeServiceFs2Grpc.serviceBinding( disp, mySimpleImpl, ServerOptions.default, ServiceAspect.default[IO, Trivial, Trivial] ) // more complex usecase type Auth = String val myAuthedImpl: SomeServiceFs2Grpc[IO, Auth] = ??? def extractAuthFromMetadata(m: Metadata): IO[Auth] = ??? SomeServiceFs2Grpc.serviceBinding( disp, myAuthedImpl, ServerOptions.default, ServiceAspect.default[IO, Trivial, Trivial].modify(extractAuthFromMetadata) ) import natchez._ trait TracingAspect[A] { def key: String def value: String } def traceAspect[F[_]: Applicative: Trace, Cod[_], A](underlying: ServiceAspect[F, TracingAspect, Cod, A]) = { type Dom[A] = TracingAspect[A] new ServiceAspect[F, Dom, Cod, A] { override def visitUnaryToUnary[Req, Res]( callCtx: CallContext[Req, Res], dom: Dom[Req], cod: Cod[Res], request: (Req, A) => F[Res] ): Req => F[Res] = req => Trace[F].span(callCtx.methodDescriptor.getFullMethodName()) { Trace[F].put(dom.key -> dom.value) *> underlying.visitUnaryToUnary[Req, Res]( callCtx, dom, cod, (req, a) => request(req, a) )(req) } override def visitUnaryToStreaming[Req, Res]( callCtx: CallContext[Req, Res], dom: Dom[Req], cod: Cod[Res], request: (Req, A) => Stream[F, Res] ): Req => Stream[F, Res] = ??? override def visitStreamingToUnary[Req, Res]( callCtx: CallContext[Req, Res], dom: Dom[Req], cod: Cod[Res], request: (Stream[F, Req], A) => F[Res] ): Stream[F, Req] => F[Res] = ??? override def visitStreamingToStreaming[Req, Res]( callCtx: CallContext[Req, Res], dom: Dom[Req], cod: Cod[Res], request: (Stream[F, Req], A) => Stream[F, Res] ): Stream[F, Req] => Stream[F, Res] = ??? } }
I think this looks promising. You are welcome to put together a PR and we can take it from there :)
Grpc has support for interceptors on both client and server. Having such tools in this library could facilitate improvements in usage (for instance tracing and authorization).
The ideas proposed here would also cover other similar issues (https://github.com/typelevel/fs2-grpc/issues/627, https://github.com/typelevel/fs2-grpc/issues/6, https://github.com/typelevel/fs2-grpc/pull/567).
I have taken inspiration from
cats-tagless
for theDom
andCod
typeclasses. https://github.com/typelevel/cats-tagless/blob/master/core/src/main/scala/cats/tagless/aop/Aspect.scalaThe following example covers the server-side of an implementation, but a client aspect is very similar.
If you are interested I could put together a PR for this?
runtime module
codegen
userland