marcoferrer / kroto-plus

gRPC Kotlin Coroutines, Protobuf DSL, Scripting for Protoc
Apache License 2.0
494 stars 28 forks source link

Bugfix for race condition in outbound flow control #59

Closed blachris closed 5 years ago

blachris commented 5 years ago

A bidi stream call with a high throughput of messages can hang up.

This occurred reliably in a throughput test where 100000 messages of 1kB were transmitted over an in-process channel to a server and echoed back to the caller. With kroto plus, the message transfer gets simply stuck and never completes.

The root cause is a race condition in FlowControl.kt applyOutboundFlowControl(): Reading streamObserver.isReady and setting isOutboundJobRunning are not atomic, thus the outBoundJob can terminate and misses to be relaunched by an onReady-event.

Here I propose a minimally invasive fix though I also considered refactoring the coroutine to be long running - suspend while not ready and resume on ready. Please consider addressing this because the hangup is a deal breaker for my use case and most likely others as well.

marcoferrer commented 5 years ago

Thanks for the PR. Im actually interested in your second approach as well. I like the idea of not having to re launch the observer consumer

marcoferrer commented 5 years ago

Sorry about that. Didn’t mean to close this out

blachris commented 5 years ago

Thanks for your interest. I added and activated the second approach in the pull request but I am struggling a bit to get the corner cases right and passing the tests.

marcoferrer commented 5 years ago

Do you have a sample project I can use to reproduce the deadlock? I wanted to see what it would take to add it as part of the integration tests. I was also wanted to test an approach using a mutex during stream consumption to resolve the issue.

blachris commented 5 years ago

Unfortunately I didn't succeed yet in creating a minimal test. I encounter the problem in a complex setup where 3 grpc calls are chained together in-process. But my theory is that the problem occurs when this flow control coroutine is frequently exiting and restarting. With a little delay in there my setup hangs almost immediately:

} finally { // FlowControl.kt line 75
  // Put a little delay/sleep/println here. 
  // If the onReady event fires between here and the next line, 
  // the coroutine exits and no new coroutine is launched.
  isOutboundJobRunning.set(false)
}
marcoferrer commented 5 years ago

Although I cant reproduce the issue by adding a delay there, I can see that it is definitely possible for a race condition to manifest there.

I've had some mild success with a solution similar to the following

private typealias MessageHandler = suspend CoroutineScope.() -> Unit

internal fun <T> CoroutineScope.applyOutboundFlowControl(
    streamObserver: CallStreamObserver<T>,
    targetChannel: Channel<T>
){

    val channelIterator = targetChannel.iterator()
    val messageHandlerBlock: MessageHandler = {
        while(
            streamObserver.isReady &&
            !targetChannel.isClosedForReceive &&
            channelIterator.hasNext()
        ){
            val value = channelIterator.next()
            streamObserver.onNext(value)
        }
        if(targetChannel.isClosedForReceive){
            streamObserver.onCompleted()
        }
    }

    val messageHandlerActor = actor<MessageHandler>(
        start = CoroutineStart.LAZY,
        capacity = Channel.CONFLATED,
        context = Dispatchers.Unconfined + CoroutineExceptionHandler { _, e ->
            streamObserver.completeSafely(e)
            targetChannel.close(e)
        }
    ) {
        consumeEach {
            if(streamObserver.isReady && !targetChannel.isClosedForReceive){
                it.invoke(this)
            }else{
                streamObserver.completeSafely()
            }
        }
    }

    streamObserver.setOnReadyHandler {
        if(targetChannel.isClosedForReceive){
            streamObserver.completeSafely()
        }else if(
            streamObserver.isReady &&
            !targetChannel.isClosedForReceive
        ){
            // Using sendBlocking here is safe since we're using a conflated channel
            messageHandlerActor.sendBlocking(messageHandlerBlock)
        }
    }
}

With this approach there are still tests that fail but it looks like those faliures are due the actor inheriting the event loop dispatcher from the runBlocking coroutine in the test. Trying out difference solutions at the moment.

blachris commented 5 years ago

Thank you for working on this issue. Since my fix attempts just seemed to make things worse I tried to reproduce the problem and I might have been successful. In the pull request right now I added a rough unit test that sends 100k messages to a simple server, that echos the messages back. When I run :kroto-plus-coroutines:test on my machine with the 0.4.0 flow control, the test hangs up hard. With the flow control with minimally added synchronization blocks, the new test completes in under 20s but I get occasional test failures in ClientStreamingBackPressureTests > Client send suspends until server invokes receive. What do you think about the new test?

blachris commented 5 years ago

This pull request is not necessary anymore because the issue is being addressed in #61