grpc / grpc-kotlin

Kotlin gRPC implementation. HTTP/2 based RPC
https://grpc.io/docs/languages/kotlin
Apache License 2.0
1.2k stars 165 forks source link

Messy requestFlow race when ClientCall.start() triggers error #378

Open bubenheimer opened 1 year ago

bubenheimer commented 1 year ago

grpc-kotlin 1.3.0

In a bidi streaming or client streaming scenario there is a race between ClientCall.start() and requests flow initialization which causes messy request flow subscription & cancellation behavior when the server closes the RPC right after ClientCall.start(). This is not an unusual scenario: for example, the server might close the connection with an error due to invalid RPC header parameters, like an expired session ID. I see that in this case, requests flow collection is quickly cancelled, in my case often leading to partial requests flow initialization, cancelling the flow right in the middle of initial collection. The result is a mess in the state of my client app. I have reworked my client code to guard against this scenario, but it requires very careful consideration of early failure scenarios to robustly address partial initialization.

Basic sample bidi usage on client:

val responseFlow = myrpc(requestsFlow)

I don't think grpc-kotlin should behave in this unpredictable and error-prone manner. The simple mental model that I (and likely many other grpc-kotlin users) generally follow is: (for bidi streaming)

  1. app code collects response flow
  2. requestFlow collection started by grpc-kotlin
  3. RPC connection is established and requests sent to server
  4. response flow collection receives data

Instead what grpc-kotlin does is:

  1. app code collects response flow
  2. RPC connection starts being established
  3. Right afterwards, and concurrently, requestFlow collection is started
  4. Responses are passed to collector

grpc-kotlin's behavior is an unexpected implementation detail. I generally think of requests flow subscription as the logical trigger to starting the RPC, but the current implementation does something else. Documentation does not provide a clear contract. I suppose it is arguable whether upon the client collecting the response flow, grpc-kotlin should be allowed to first start setting up the channel before subscribing to the requests flow. Regardless of the answer to this question, the current behavior causes a potentially messy race.

I don't really see an automatic, reliable way to guarantee full requestFlow initialization from grpc-kotlin code prior to RPC connection setup. Perhaps using ReceiveChannels instead of flows could be part of a solution: client app code could fully control initialization of the stream of requests before initiating the RPC. Of course, this would be a very major API and behavior change.

Again, I think that this is important to address because of the current racy, unpredictable, and error-prone behavior.

bubenheimer commented 1 year ago

To give an idea what my request flow code looks like:

repeatingTimerFlow
    .onCompletion { println("I rarely see this: $it") }
    .map { /* something */ }
    .onStart { withContext(Dispatchers.IO) { /* slowish, transactional DB operation } }
    .onEach { /* something else */ }
    .onCompletion { println("I sometimes see this: $it") }
    .onStart { withContext(Dispatchers.IO) { /* some other slowish, transactional DB operation } }
    .map { /* more stuff */ }
    .onCompletion { println("Do I ever not see this? $it") }

When the session ID is expired on my local test server, it typically does one of these:

  1. DB op in bottom onStart block runs and fails upon switching back from IO context
  2. DB op in bottom onStart block runs to completion, then top onStart block runs and fails upon switching back from IO context
  3. All onStart blocks run to completion
  4. Cancellation in some other suspending call in the onStart blocks that suspends less long (not shown for clarity)

onCompletion() calls are run with similar unpredictability. That one really got me.

So really very unpredictable behavior, and it was very difficult to understand what was happening here in the first place.

My expectation was that at least all the onStart blocks would run to completion, and that I'd definitely see the output from all the onCompletion() calls before things are torn down.

bubenheimer commented 1 year ago

Can a server delay ClientCall.Listener.onReady() to influence this behavior? Is onReady() only triggered in response to manual or automatic flow control on the server requesting data from the client? In this case a server could choose to request only after validating RPC headers, and the real problem is that the server requests data too soon.

Headers seem to be sent with ClientCall.start(), so I imagine what's needed is a way to delay request flow collection until the server fully signs off on the RPC.

My server also uses grpc-kotlin, and I seem to remember that grpc-kotlin takes over flow control, I even created an issue for it a couple years ago (#231). So it seems like I can't really try this right now, or can I?

bubenheimer commented 1 year ago

After playing around with it a bit it seems to me that a request() call on the server is not what initially enables onReady()/isReady on the client.

bubenheimer commented 1 year ago

I went to implement my own stream-based protocol to await server RPC initialization on the client, in the case of bidirectional streaming. This requires coupling request flow and response flow, as the response flow must communicate knowledge of the server's RPC initialization success to its associated request flow, which can then initiate normal stream operations (stateful in my case).

grpc-kotlin uses an abstraction of a request flow collected independently (by the internal implementation) of the response flow (collected by application code). This abstraction breaks down here. Are there thoughts about offering a different abstraction for bidi streaming? The current API is not quite right for that case. It's weird having to tie the 2 flows back up. @lowasser?

bubenheimer commented 1 year ago

I suppose I can tie up the 2 flows via the response flow's CouroutineContext, as an alternative to my current singleton-like approach. Cumbersome, but seems the cleanest approach, for a purist. Might be something for the docs and an official example.