marcoferrer / kroto-plus

gRPC Kotlin Coroutines, Protobuf DSL, Scripting for Protoc
Apache License 2.0
493 stars 28 forks source link

Client cancellations aren't propagated to the server #93

Closed jebbench closed 4 years ago

jebbench commented 4 years ago

When I close a streaming client request with an exception I would expect the corresponding server receive channel to throw an exception upon calling receive (or iterating the channel) but it doesn't seem to do so - am I doing something wrong?

Client:

call.requestChannel.close(Exception("Testing Exception"))

Server:

try {
    for (request in requestChannel) {
        logger.info("Iterating")
    }
    logger.info("Closed normally")
} catch (e: Exception){
    logger.error("Caught exception", e)
}
marcoferrer commented 4 years ago

What you’re describing is the expected behavior. If you’re experiencing something that differs then it might be a bug. Could you provide a little more detail for your example? This way I can try to reproduce it.

What Kroto+ version are you using? What type of method does this occur with? Client Streaming or BiDirectional?

Thanks again for submitting an issue!

jebbench commented 4 years ago

Hi Marco - thanks for the response (and the library).

I've replicated the issue I'm seeing in this repo: https://github.com/jebbench/kotlin-coroutines-gRPC-template

If you need anything else let me know.

marcoferrer commented 4 years ago

I can confirm this is a bug. I think it was introduced when outbound flow control was reworked. It is actually addressed in the streaming machinery rewrite that is currently in progress. Since that rewrite is still in alpha stages, I'll get a fix out for this bug in the very next release. The release is expected to occur on this up coming Monday. Ill get a PR opened tonight and tag you for visibility.

jebbench commented 4 years ago

Brilliant thanks!

jebbench commented 4 years ago

I've retested this using 0.6.0-SNAPSHOT and unfortunately I'm getting the same problem.

Received name: First
Client Streaming Response: message: "Completed without receiving an Exception"

Process finished with exit code 0
marcoferrer commented 4 years ago

I looked into this and it turns out that the snapshot wasnt updated with the most recent changes from PR#95.

If you're still having issues please let me know.

jebbench commented 4 years ago

I've just tried again, I've tried forcing Gradle to re-download dependencies with both the --refresh-dependencies flag and by deleting the cache directory - neither seem to have worked and I'm still getting the same error.

marcoferrer commented 4 years ago

So I tested your branch and you're correct. It is still not propagating cancellation but this time it seems to be due to a race condition. Using the following example the expected behavior is observed.

suspend fun performClientStreamingCall(stub: GreeterCoroutineGrpc.GreeterCoroutineStub) = coroutineScope {

    // Client Streaming RPC
    val (requestChannel, response) = stub.sayHelloClientStreaming()

    launch {
        println("Client Streaming Response: ${response.await().toString().trim()}")
    }
    requestChannel.send { name = "First" }
    requestChannel.close(Exception("This is an exception!"))

}

Im currently working on tracking down the source of the race condition in order to resolve this bug.

marcoferrer commented 4 years ago

Tested the latest SNAPSHOT against your example project and everything seems to behave as expected now.

jebbench commented 4 years ago

Version 0.6.0 resolves this for me.

Many thanks for your help!