salesforce / reactive-grpc

Reactive stubs for gRPC
BSD 3-Clause "New" or "Revised" License
833 stars 118 forks source link

Dropped Error on Bidi Stream After Broken TCP Connection #302

Closed jGleitz closed 1 year ago

jGleitz commented 1 year ago

When a TCP connection between the client and the server breaks, we get the following dropped error on the Reactor stream for a bidi stream:

Dropped error stack trace ``` 2-12-12 13:39:55.802 [ault-executor-0] reactor.core.publisher.Operators ERROR : Operator called default onErrorDropped io.grpc.StatusRuntimeException: CANCELLED: client cancelled at io.grpc.Status.asRuntimeException(Status.java:530) at io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:291) at io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40) at io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23) at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40) at io.micrometer.core.instrument.binder.grpc.MetricCollectingServerCallListener.onCancel(MetricCollectingServerCallListener.java:75) at io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40) at io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23) at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40) at io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40) at io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23) at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40) at io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40) at io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23) at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40) at org.lognet.springboot.grpc.security.SecurityInterceptor$2.lambda$onCancel$2(SecurityInterceptor.java:208) at org.lognet.springboot.grpc.security.SecurityInterceptor$2.propagateAuthentication(SecurityInterceptor.java:225) at org.lognet.springboot.grpc.security.SecurityInterceptor$2.onCancel(SecurityInterceptor.java:208) at io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40) at io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23) at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40) at io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96) at io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40) at io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23) at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40) at io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40) at io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23) at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40) at io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40) at io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23) at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40) at com.logmein.uccbackend.rtcresourcemanager.grpc.logging.LogInterceptor$MdcProvidingListener.onCancel(LogInterceptor.java:123) at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:378) at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:365) at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:923) at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) ```

From my understanding, AbstractServerStreamObserverAndPublisher already tries to address this in line 53. This line checks for the message "cancelled before receiving half close". However, the logic in io.gropc.ServerCalls.StreamingServerCallHandler.StreamingServerCallListener.onCancel() is as follows:

public void onCancel() {
        …
        if (!halfClosed) {
          requestObserver.onError(
              Status.CANCELLED
                  .withDescription("client cancelled")
                  .asRuntimeException());
        }
      }

Proposed Solution

Change AbstractServerStreamObserverAndPublisher to check for either the old message, or the message CANCELLED: client cancelled.

I’m happy to create a PR.

rmichela commented 1 year ago

A repro test case and a PR would be fantastic! Thank you.

jGleitz commented 1 year ago

@rmichela I provide both in #303 in separate commits. Can you have a look?