rsocket / rsocket-java

Java implementation of RSocket
http://rsocket.io
Apache License 2.0
2.35k stars 354 forks source link

BaseDuplexConnection does not handle exceptions in the doFinally #1023

Closed a701440 closed 2 years ago

a701440 commented 3 years ago

RSocket version 1.1.1

public BaseDuplexConnection() { onClose().doFinally(s -> doOnClose()).subscribe(); }

when exception is thrown in doOnClose it propagates to the Reactor unhandled exceptions hook. doFinally lambda should handle all exceptions or subscribe() should have the error handler provided.

Example of such exception is below:

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disposed Caused by: java.util.concurrent.CancellationException: Disposed at io.rsocket.internal.UnboundedProcessor.dispose(UnboundedProcessor.java:550) at io.rsocket.transport.netty.TcpDuplexConnection.doOnClose(TcpDuplexConnection.java:67) at io.rsocket.internal.BaseDuplexConnection.lambda$new$0(BaseDuplexConnection.java:30) at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:163) at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:146) at reactor.core.publisher.SinkEmptyMulticast$VoidInner.onComplete(SinkEmptyMulticast.java:227) at reactor.core.publisher.SinkEmptyMulticast.tryEmitEmpty(SinkEmptyMulticast.java:70) at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:43) at io.rsocket.internal.BaseDuplexConnection.dispose(BaseDuplexConnection.java:51) at io.rsocket.core.ClientServerInputMultiplexer$InternalDuplexConnection.dispose(ClientServerInputMultiplexer.java:286) at io.rsocket.core.RSocketRequester.terminate(RSocketRequester.java:337) at io.rsocket.core.RSocketRequester.tryShutdown(RSocketRequester.java:328) at io.rsocket.core.RSocketRequester.dispose(RSocketRequester.java:191)

In this case remote JVM was killed, the connection was broken. The client code detected that machine is not there and called

if (!rsocket.isDisposed()) { rsocket.dispose(); }

and still ended-up with this exception.

One issue is broken TCP socket did not mark rsocket as disposed so that isDisposed() check would work. The second issue is that the exception propagates to Reactor unhandled hook.

tomasz-galuszka commented 2 years ago

I have the same exception when no keep-alive was sent from the client side in the given lifetime. Even though I have doOnError and doFinally attached to the connection on the server side the error is not propagated and logged separately as mentioned above. Finally I have two errors for the same scenario in the logs: 1st one

2021-10-25 09:15:46.055 ERROR 13249 --- [actor-tcp-nio-2] reactor.core.publisher.Operators         : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disposed
//...

2nd one

2021-10-25 09:15:46.058  WARN 13249 --- [     parallel-2] c.y.b.s.e.i.c.s.CreateConnectionService  : No ack for keep-alive: 1635146145.763207000

io.rsocket.exceptions.ConnectionErrorException: No keep-alive acks for 100 ms
//...

Simple code to reproduce from the client/test side:

    Builder socketBuilder = RSocketRequester.builder()
        .dataMimeType(MediaType.APPLICATION_JSON)
        .rsocketStrategies(configurer -> configurer
            .encoder(new Jackson2JsonEncoder())
            .decoder(new Jackson2JsonDecoder())
        )
        .rsocketConnector(connector -> connector.keepAlive(ofMillis(300), ofMillis(100))); // this triggers disconnect 

    RSocketClient client = RSocketClientProvider.rSocketClient(port, socketBuilder);
    RSocket socket = client.source().block();

    StepVerifier.create(socket.onClose())
      .consumeErrorWith(error -> assertThat(error).isInstanceOf(ConnectionErrorException.class))
      .verify(Duration.ofSeconds(2));

    assertThat(socket.isDisposed()).isTrue();

Server side:

// ...
  @ConnectMapping
  public void init(RSocketRequester requester) {
    requester.rsocket()
        .onClose()
        .doFirst(connect(requester))
        .doOnError(errorConnection())
        .doFinally(disconnect())
        .subscribe();
  }
// ...
OlegDokuka commented 2 years ago

closing since it is a duplicate of #1018