getkyo / kyo

Toolkit for Scala Development
https://getkyo.io
Apache License 2.0
513 stars 42 forks source link

gRPC Support #390

Open ghostdogpr opened 3 months ago

ghostdogpr commented 3 months ago

Support for protobuf and gRPC would drive adoption (at least for me).

It's definitely not easy as it involves code generation but maybe https://scalapb.github.io/docs/writing-plugins could be used like it is for zio-grpc and fs2-grpc.

There is currently the option of using zio-grpc with the zio interop but its performance is not great to start with (https://github.com/LesnyRumcajs/grpc_bench/discussions/441) so there might be an opportunity to shine.

He-Pin commented 3 months ago

Yes, looking forward to it, grpc_bench is a great place to show .

fwbrasil commented 3 months ago

/bounty $200

algora-pbc[bot] commented 3 months ago

💎 $200 bounty • Kyo

Steps to solve:

  1. Start working: Comment /attempt #390 with your implementation plan
  2. Submit work: Create a pull request including /claim #390 in the PR body to claim the bounty
  3. Receive payment: 100% of the bounty is received 2-5 days post-reward. Make sure you are eligible for payouts

Thank you for contributing to getkyo/kyo!

Add a bounty • Share on socials

Attempt Started (GMT+0) Solution
🟢 @steinybot May 30, 2024, 12:31:58 PM WIP
steinybot commented 3 months ago

/attempt #390

I've had a little look at this.

The code generation will take a little while but there is enough documentation and prior art that it should be easy enough once we know what it should look like.

I tried a naive implementation:

package com.example.helloworld

import io.grpc.*
import io.grpc.examples.helloworld.helloworld.*
import io.grpc.stub.{ServerCalls, StreamObserver}
import kyo.*

import java.util.concurrent.Executors
import scala.util.Try
import scala.util.chaining.*

type Grpcs >: Grpcs.Effects <: Grpcs.Effects

object Grpcs:
  type Effects = Fibers & Aborts[StatusException]

  def init[T: Flat](v: => T < Grpcs): Fiber[T] < IOs =
    def pendingFibers: Try[T] < Fibers = Aborts.run[StatusException].apply[StatusException, T, Fibers, StatusException, Any](v).map(_.toTry)

    val pendingIOs: Fiber[Try[T]] < IOs = Fibers.init(pendingFibers)
    pendingIOs.map(_.transform(_.fold(Fiber.fail, Fiber.value)))

  def run[T: Flat](v: Fiber[T] < IOs): Fiber[T] =
    IOs.run(v)

trait KyoGreeter:
  def sayHello(request: HelloRequest): HelloReply < Grpcs

object GreeterService extends KyoGreeter:
  override def sayHello(request: HelloRequest): HelloReply < Grpcs =
    for {
      _ <- Consoles.run(Consoles.println(s"Got request: $request"))
    } yield HelloReply(s"Hello, ${request.name}")

object HelloWorldServer extends KyoApp:

  private def buildGreeterService(serviceImpl: KyoGreeter): ServerServiceDefinition =
    ServerServiceDefinition.builder(SERVICE)
      .addMethod(
        METHOD_SAY_HELLO,
        // TODO: When to use a different type of call?
        // TODO: Is there any kind of backpressure or do these closures keep building up?
        ServerCalls.asyncUnaryCall((request: HelloRequest, observer: StreamObserver[HelloReply]) => {
          val fiber = Grpcs.run(Grpcs.init(serviceImpl.sayHello(request)))
          IOs.run(fiber.onComplete { reply =>
            IOs.attempt(reply).map(scalapb.grpc.Grpc.completeObserver(observer))
          })
        }))
      .build()

  private def buildServer(port: Int, services: Seq[ServerServiceDefinition]): Server =
    val builder = services.foldLeft(ServerBuilder.forPort(port))(_.addService(_))

    /**
     * Allow customization of the Executor with two environment variables:
     *
     * <p>
     * <ul>
     * <li>JVM_EXECUTOR_TYPE: direct, workStealing, single, fixed, cached</li>
     * <li>GRPC_SERVER_CPUS: integer value.</li>
     * </ul>
     * </p>
     *
     * The number of Executor Threads will default to the number of
     * availableProcessors(). Only the workStealing and fixed executors will use
     * this value.
     */
    val threads = System.getenv("GRPC_SERVER_CPUS")
    var i_threads = Runtime.getRuntime.availableProcessors
    if (threads != null && !threads.isEmpty) i_threads = threads.toInt
    val value = System.getenv.getOrDefault("JVM_EXECUTOR_TYPE", "workStealing")
    val builderWithExecutor = value match {
      case "direct" => builder.directExecutor
      case "single" => builder.executor(Executors.newSingleThreadExecutor)
      case "fixed" => builder.executor(Executors.newFixedThreadPool(i_threads))
      case "workStealing" => builder.executor(Executors.newWorkStealingPool(i_threads))
      case "cached" => builder.executor(Executors.newCachedThreadPool)
    }

    builderWithExecutor.build()

  private val port: Int = 50051

  private val services = Seq(
    buildGreeterService(GreeterService)
  )

  run {
    for {
      // TODO: Get the shutdown working properly.
      _ <- Consoles.println(s"Server is running on port $port. Press Ctrl-C to stop.")
      server <- Resources.acquireRelease(IOs(buildServer(port, services).start())) { (server: Server) =>
        IOs.run(Consoles.run(Consoles.print("Shutting down...")))
        // TODO: Add a timeout.
        server.shutdown().awaitTermination()
        IOs.run(Consoles.run(Consoles.println("Done.")))
      }
      _ <- Fibers.sleep(Duration.Infinity)
    } yield {
      "Goodbye!"
    }
  }

end HelloWorldServer

It didn't perform too great which is not that surprising:

-----------------------------------------------------------------------------------------------------------------------------------------
| name                        |   req/s |   avg. latency |        90 % in |        95 % in |        99 % in | avg. cpu |   avg. memory |
-----------------------------------------------------------------------------------------------------------------------------------------
| scala_akka                  |   23998 |       31.39 ms |       81.50 ms |       88.65 ms |      102.31 ms |   52.49% |    216.35 MiB |
| scala_pekko                 |   23810 |       32.40 ms |       82.83 ms |       90.08 ms |      102.21 ms |   44.11% |    190.85 MiB |
| scala_fs2                   |   23270 |       34.14 ms |       79.12 ms |       86.97 ms |      105.47 ms |   81.39% |    241.85 MiB |
| scala_kyo                   |   20744 |       40.89 ms |       85.64 ms |       98.11 ms |      178.73 ms |   81.58% |     244.2 MiB |
| scala_zio                   |   19573 |       43.24 ms |       89.10 ms |      103.96 ms |      200.26 ms |  101.32% |    249.95 MiB |
-----------------------------------------------------------------------------------------------------------------------------------------
Benchmark Execution Parameters:
160be4a Thu, 25 Apr 2024 16:18:16 +0200 GitHub Update sbt-assembly to 2.2.0 (#446)
- GRPC_BENCHMARK_DURATION=20s
- GRPC_BENCHMARK_WARMUP=5s
- GRPC_SERVER_CPUS=1
- GRPC_SERVER_RAM=512m
- GRPC_CLIENT_CONNECTIONS=50
- GRPC_CLIENT_CONCURRENCY=1000
- GRPC_CLIENT_QPS=0
- GRPC_CLIENT_CPUS=1
- GRPC_REQUEST_SCENARIO=complex_proto
- GRPC_GHZ_TAG=0.114.0
-----------------------------------------------------------------------------------------------------------------------------------------
| name                        |   req/s |   avg. latency |        90 % in |        95 % in |        99 % in | avg. cpu |   avg. memory |
-----------------------------------------------------------------------------------------------------------------------------------------
| scala_akka                  |   55639 |       14.39 ms |       50.75 ms |       62.50 ms |       92.22 ms |   96.89% |    241.54 MiB |
| scala_pekko                 |   55609 |       14.59 ms |       52.69 ms |       62.66 ms |       84.92 ms |   96.83% |    258.92 MiB |
| scala_fs2                   |   44931 |       19.97 ms |       58.33 ms |       67.13 ms |       91.93 ms |   99.48% |    261.63 MiB |
| scala_zio                   |   34658 |       26.98 ms |       67.05 ms |       77.79 ms |      164.29 ms |   98.73% |    336.51 MiB |
| scala_kyo                   |   33819 |       27.80 ms |       67.40 ms |       77.69 ms |      149.42 ms |   99.02% |    308.01 MiB |
-----------------------------------------------------------------------------------------------------------------------------------------
Benchmark Execution Parameters:
160be4a Thu, 25 Apr 2024 16:18:16 +0200 GitHub Update sbt-assembly to 2.2.0 (#446)
- GRPC_BENCHMARK_DURATION=120s
- GRPC_BENCHMARK_WARMUP=30s
- GRPC_SERVER_CPUS=1
- GRPC_SERVER_RAM=512m
- GRPC_CLIENT_CONNECTIONS=50
- GRPC_CLIENT_CONCURRENCY=1000
- GRPC_CLIENT_QPS=0
- GRPC_CLIENT_CPUS=9
- GRPC_REQUEST_SCENARIO=complex_proto
- GRPC_GHZ_TAG=0.114.0

Anything obviously wrong with this?

steinybot commented 2 months ago

This is still a WIP. I made some good progress on the code gen side of things. I haven't had much time in the last 2 weeks. I hope to get back to this in a week or so.

steinybot commented 1 month ago

Not sure what I changed but after implementing it as a library it is now significantly slower:

-----------------------------------------------------------------------------------------------------------------------------------------
| name                        |   req/s |   avg. latency |        90 % in |        95 % in |        99 % in | avg. cpu |   avg. memory |
-----------------------------------------------------------------------------------------------------------------------------------------
| scala_zio                   |   31949 |       29.93 ms |       66.57 ms |       76.40 ms |      132.59 ms |   100.5% |    315.89 MiB |
| scala_kyo                   |    7056 |      140.20 ms |      214.61 ms |      502.63 ms |         1.39 s |  101.77% |    333.34 MiB |
-----------------------------------------------------------------------------------------------------------------------------------------
DamianReeves commented 1 day ago

Any updates on this front?

steinybot commented 1 day ago

I haven't managed to dig into the performance issues. I created some benchmarks but that is as far as I got. I'm in the middle of merging the new core changes in.

Unfortunately I'm unexpectedly out of a job so I actually have less free time while I apply for jobs and do interviews etc. Happy to share what I have done and split the bounty if you want to help out.