rsocket / rsocket-java

Java implementation of RSocket
http://rsocket.io
Apache License 2.0
2.35k stars 354 forks source link

CancellationException: Disposed with Spring Boot 2.5.1 #1020

Closed ghost closed 3 years ago

ghost commented 3 years ago

Hello, I created Hello World to test bi-directional channel and while with Spring Boot 2.5.0 it works fine, today I upgraded to 2.5.1 and I am getting CancellationException: Disposed. Downgrading to 2.5.0 fixes the problem.

Expected Behavior

2021-06-11 19:48:32.576 DEBUG 18936 --- [ctor-http-nio-7] reactor.netty.ReactorNetty : [id:1468d010-1, L:/127.0.0.1:7000 ! R:/127.0.0.1:59525] Non Removed handler: PongHandler, context: ChannelHandlerContext(PongHandler, [id: 0x1468d010, L:/127.0.0.1:7000 ! R:/127.0.0.1:59525]), pipeline: DefaultChannelPipeline{(wsencoder = io.netty.handler.codec.http.websocketx.WebSocket13FrameEncoder), (wsdecoder = io.netty.handler.codec.http.websocketx.WebSocket13FrameDecoder), (PongHandler = io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)} 2021-06-11 19:48:32.576 DEBUG 18936 --- [ctor-http-nio-7] r.n.http.server.HttpServerOperations : [id:1468d010-1, L:/127.0.0.1:7000 ! R:/127.0.0.1:59525] Cancelling Websocket inbound. Closing Websocket

Actual Behavior

`2021-06-11 19:55:48.803 INFO 13400 --- [actor-tcp-nio-1] FIND_PERSON_WITH_PREFIX : | onNext(SearchResultDTO[firstname=Maxine, lastname=Klein, email=Maxine.Klein@test.at, displayedRole=News Editor, createdAt=null, createdBy=testdata]) 2021-06-11 19:55:48.807 INFO 13400 --- [actor-tcp-nio-1] FIND_PERSON_WITH_PREFIX : | onComplete() 2021-06-11 19:55:48.840 DEBUG 13400 --- [ctor-http-nio-6] reactor.netty.ReactorNetty : [id:ce3548c0-1, L:/127.0.0.1:7000 ! R:/127.0.0.1:62335] Non Removed handler: PongHandler, context: ChannelHandlerContext(PongHandler, [id: 0xce3548c0, L:/127.0.0.1:7000 ! R:/127.0.0.1:62335]), pipeline: DefaultChannelPipeline{(wsencoder = io.netty.handler.codec.http.websocketx.WebSocket13FrameEncoder), (wsdecoder = io.netty.handler.codec.http.websocketx.WebSocket13FrameDecoder), (PongHandler = io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)} 2021-06-11 19:55:48.848 ERROR 13400 --- [ctor-http-nio-6] reactor.core.publisher.Operators : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disposed Caused by: java.util.concurrent.CancellationException: Disposed at io.rsocket.internal.UnboundedProcessor.dispose(UnboundedProcessor.java:545) ~[rsocket-core-1.1.1.jar:na] Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Assembly trace from producer [reactor.core.publisher.MonoIgnoreThen] : reactor.core.publisher.Mono.thenEmpty reactor.netty.ReactorNetty$OutboundThen.(ReactorNetty.java:693) Error has been observed at the following site(s): | Mono.thenEmpty ⇢ at reactor.netty.ReactorNetty$OutboundThen.(ReactorNetty.java:693) | ⇢ at reactor.netty.ReactorNetty$OutboundThen.then(ReactorNetty.java:738) Stack trace: at io.rsocket.internal.UnboundedProcessor.dispose(UnboundedProcessor.java:545) ~[rsocket-core-1.1.1.jar:na] at io.rsocket.transport.netty.WebsocketDuplexConnection.doOnClose(WebsocketDuplexConnection.java:72) ~[rsocket-transport-netty-1.1.1.jar:na] at io.rsocket.internal.BaseDuplexConnection.lambda$new$0(BaseDuplexConnection.java:30) ~[rsocket-core-1.1.1.jar:na] at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:163) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:146) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.SinkEmptyMulticast$VoidInner.onComplete(SinkEmptyMulticast.java:227) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.SinkEmptyMulticast.tryEmitEmpty(SinkEmptyMulticast.java:70) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:43) ~[reactor-core-3.4.6.jar:3.4.6] at io.rsocket.internal.BaseDuplexConnection.dispose(BaseDuplexConnection.java:51) ~[rsocket-core-1.1.1.jar:na] at io.rsocket.transport.netty.WebsocketDuplexConnection.lambda$new$0(WebsocketDuplexConnection.java:54) ~[rsocket-transport-netty-1.1.1.jar:na] at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) ~[netty-common-4.1.65.Final.jar:4.1.65.Final] at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571) ~[netty-common-4.1.65.Final.jar:4.1.65.Final] at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550) ~[netty-common-4.1.65.Final.jar:4.1.65.Final] at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491) ~[netty-common-4.1.65.Final.jar:4.1.65.Final] at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616) ~[netty-common-4.1.65.Final.jar:4.1.65.Final] at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605) ~[netty-common-4.1.65.Final.jar:4.1.65.Final] at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104) ~[netty-common-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1186) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:773) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:749) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:620) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.closeOnRead(AbstractNioByteChannel.java:105) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:130) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:177) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.65.Final.jar:4.1.65.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.65.Final.jar:4.1.65.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.65.Final.jar:4.1.65.Final] at java.base/java.lang.Thread.run(Thread.java:831) ~[na:na]

2021-06-11 19:55:48.854 DEBUG 13400 --- [ctor-http-nio-6] r.n.http.server.HttpServerOperations : [id:ce3548c0-1, L:/127.0.0.1:7000 ! R:/127.0.0.1:62335] Cancelling Websocket inbound. Closing Websocket `

Steps to Reproduce

  1. Set up controller, e.g.
  @MessageMapping("find.person")
  public Flux<SearchResultDTO> findPersonByNameStartingWith(Flux<String> searchTermPublisher) {

    return searchTermPublisher.switchMap(searchService::search);
  }
  1. Subscribe to some data publisher, e.g.
private final R2dbcEntityTemplate template;

  public Flux<SearchResultDTO> search(final String searchTerm) {

    return template
        .select(SearchResultDTO.class)
        .from("person")
        .matching(startsWith(searchTerm))
        .all();
  }
  1. call for the data, e.g. rsc ws://localhost:7000 -ab "pastebearertokenhere" --route find.person --log --channel --debug -d 'Max'

  2. see result

    
    {"firstname":"Maxine","lastname":"Klein","email":"Maxine.Klein@test.at","displayedRole":"News Editor","createdAt":null,"createdBy":"testdata"}
    2021-06-11 19:55:48.834 DEBUG 8960 --- [actor-tcp-nio-2] io.rsocket.FrameLogger                   : receiving ->
    Frame => Stream ID: 1 Type: COMPLETE Flags: 0b1000000 Length: 6
    Data:

2021-06-11 19:55:48.834 INFO 8960 --- [actor-tcp-nio-2] rsc : onComplete()```

Your Environment

I use Java 16.

plugins {
    id 'org.springframework.boot' version '2.5.1'
    id 'io.spring.dependency-management' version '1.0.11.RELEASE'
    id 'java'
}

group = 'at.apa.orf.plan'
version = '0.1.0-SNAPSHOT'
sourceCompatibility = '16'

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
}

repositories {
    mavenCentral()
}

dependencies {

    implementation 'io.projectreactor:reactor-tools'
    implementation 'org.springframework.boot:spring-boot-starter-data-r2dbc'
    implementation 'org.springframework.boot:spring-boot-starter-data-redis-reactive'
    implementation 'org.springframework.boot:spring-boot-starter-rsocket'
    implementation 'org.springframework.boot:spring-boot-starter-security'
    implementation 'org.springframework.boot:spring-boot-starter-webflux'
    implementation 'org.springframework.security:spring-security-messaging'
    implementation 'org.springframework.security:spring-security-rsocket'
    implementation 'org.springframework.boot:spring-boot-starter-actuator'
    implementation 'org.springframework.boot:spring-boot-starter-oauth2-resource-server'

    runtimeOnly 'io.r2dbc:r2dbc-postgresql'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'io.projectreactor:reactor-test'
    testImplementation 'org.springframework.security:spring-security-test'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
}
OlegDokuka commented 3 years ago

Hi @MichalVavrikMuJ!

What you see in logs is

2021-06-11 19:55:48.848 ERROR 13400 --- [ctor-http-nio-6] reactor.core.publisher.Operators : Operator called default onErrorDropped

which clearly states Dropped Error which (as I understand from the logs as well) does not hurt any of the functionality and can be easily disabled by adding Hooks.onDroppedErrors(() -> {}) to your app.

Closing this since this is a duplicate of #1018

ghost commented 3 years ago

Thank you @OlegDokuka.