marcoferrer / kroto-plus

gRPC Kotlin Coroutines, Protobuf DSL, Scripting for Protoc
Apache License 2.0
493 stars 28 forks source link

gRPC Client and Service code gen #16

Closed marcoferrer closed 5 years ago

marcoferrer commented 5 years ago

This is PR introduces client stub and service impl code generation for grpc. It contains an entire overhaul of the coroutines grpc api. Its open to any feedback.

Please reference the client / server example included. It contains a sample of the expected output for discussion.

You can try out the changes by running the following command to get started with a preconfigured template project. (kotlin-coroutines-gRPC-template)

git clone https://github.com/marcoferrer/kotlin-coroutines-gRPC-template && \
cd kotlin-coroutines-gRPC-template && \
./gradlew run 

Or via the version 0.2.2-SNAPSHOT using the following repo in your build file.

repositories {
    maven { url 'https://oss.jfrog.org/artifactory/oss-snapshot-local' }
}

And the following kroto config

grpc_coroutines {}

Some of the key points are:

  1. All around better integration with Structured Concurrency
  2. Backpressure Support has been implemented.
  3. Generation of Client Stubs which implement the CoroutineScope interface
    • Allows client stubs to work well with Structured Concurrency
    • Cancellations can now be propagated across usages of a specific stub instance.
  4. Generation of abstract Service Base Impl's
    • Allows services to still support common grpc-java patterns, while still fully embracing coroutine idioms and features.
  5. Convenience exts for sending requests and responses in both client, server code.
    • Full support for familiar Kroto-plus convenience lambda exts on SendChannel.send { } and CompletableDeferred.complete { }
  6. Support for annotation processor assistance via RpcMethod annotation from grpc-java. RpcMethod.java
suspend fun performUnaryCall(stub: GreeterCoroutineGrpc.GreeterCoroutineStub){

    val unaryResponse = stub.sayHello { name = "John" }

    println("Unary Response: ${unaryResponse.message}")
}

suspend fun performServerStreamingCall(stub: GreeterCoroutineGrpc.GreeterCoroutineStub){

    val responseChannel = stub.sayHelloServerStreaming { name = "John" }

    responseChannel.consumeEach {
        println("Server Streaming Response: ${it.message}")
    }
}

suspend fun CoroutineScope.performClientStreamingCall(stub: GreeterCoroutineGrpc.GreeterCoroutineStub){

    // Client Streaming RPC
    val (requestChannel, response) = stub.sayHelloClientStreaming()

    launch {
        repeat(5){
            requestChannel.send { name = "person #$it" }
        }
        requestChannel.close()
    }

    println("Client Streaming Response: ${response.await().toString().trim()}")
}

suspend fun CoroutineScope.performBidiCall(stub: GreeterCoroutineGrpc.GreeterCoroutineStub){

    val (requestChannel, responseChannel) = stub.sayHelloStreaming()

    launch {
        repeat(5){
            requestChannel.send { name = "person #$it" }
        }
        requestChannel.close()
    }

    launch {
        responseChannel.consumeEach {
            println("Bidi Response: ${it.message}")
        }
    }
}

This change is Reviewable

marcoferrer commented 5 years ago

Referencing original discussion. https://github.com/Kotlin/kotlinx.coroutines/issues/360

marcoferrer commented 5 years ago

Working on finishing up the unit test / docs. Hoping to release soon after any pending user feedback

marcoferrer commented 5 years ago

Benchmark application added based on grpc-benchmarks api.

These are initial results. The test parameters will need further refinement.

QPS Benchmarks

Unary Test Args:

./qps_client --address=127.0.0.1:8000 --channels=10 --server_payload=1 --client_payload=1

Server Executor: ForkJoinPool(parallelism = 4)

Channels:                       10
Outstanding RPCs per Channel:   10
Server Payload Size:            1
Client Payload Size:            1
50%ile Latency (in micros):     459
90%ile Latency (in micros):     1151
95%ile Latency (in micros):     2191
99%ile Latency (in micros):     11775
99.9%ile Latency (in micros):   124415
Maximum Latency (in micros):    260095
QPS:                            87355

Server Executor: directExecutor()

Channels:                       10
Outstanding RPCs per Channel:   10
Server Payload Size:            1
Client Payload Size:            1
50%ile Latency (in micros):     431
90%ile Latency (in micros):     1003
95%ile Latency (in micros):     1975
99%ile Latency (in micros):     8383
99.9%ile Latency (in micros):   39679
Maximum Latency (in micros):    148479
QPS:                            122236

Client Executor: directExecutor() & Server Executor: directExecutor()

Channels:                       10
Outstanding RPCs per Channel:   10
Server Payload Size:            1
Client Payload Size:            1
50%ile Latency (in micros):     397
90%ile Latency (in micros):     855
95%ile Latency (in micros):     1215
99%ile Latency (in micros):     2767
99.9%ile Latency (in micros):   7679
Maximum Latency (in micros):    89087
QPS:                            178784

Streaming Test Args:

./qps_client --address=127.0.0.1:8000 --channels=10 --server_payload=1 --client_payload=1 --streaming_rpcs=true

Server Executor: ForkJoinPool(parallelism = 4)

Channels:                       10
Outstanding RPCs per Channel:   10
Server Payload Size:            1
Client Payload Size:            1
50%ile Latency (in micros):     415
90%ile Latency (in micros):     883
95%ile Latency (in micros):     1359
99%ile Latency (in micros):     3295
99.9%ile Latency (in micros):   10751
Maximum Latency (in micros):    245759
QPS:                            174428

Server Executor: directExecutor()

Channels:                       10
Outstanding RPCs per Channel:   10
Server Payload Size:            1
Client Payload Size:            1
50%ile Latency (in micros):     321
90%ile Latency (in micros):     559
95%ile Latency (in micros):     779
99%ile Latency (in micros):     1831
99.9%ile Latency (in micros):   5567
Maximum Latency (in micros):    119295
QPS:                            250821

Client Executor: directExecutor() & Server Executor: directExecutor()

Channels:                       10
Outstanding RPCs per Channel:   10
Server Payload Size:            1
Client Payload Size:            1
50%ile Latency (in micros):     249
90%ile Latency (in micros):     373
95%ile Latency (in micros):     481
99%ile Latency (in micros):     1023
99.9%ile Latency (in micros):   2655
Maximum Latency (in micros):    125439
QPS:                            348325