rsocket / rsocket-java

Java implementation of RSocket
http://rsocket.io
Apache License 2.0
2.35k stars 354 forks source link

Rsocket error "java.lang.IllegalStateException: Source has to be ASYNC fuseable" with Spring boot 3.2.x & JDK17 #1100

Open abdulrahimseera opened 8 months ago

abdulrahimseera commented 8 months ago

Expected Behavior

The Rocket server should respond to the client. Hello from RSocket consumer

Actual Behavior

Throwing exception reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Source has to be ASYNC fuseable

Error stack trace

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Source has to be ASYNC fuseable
Caused by: java.lang.IllegalStateException: Source has to be ASYNC fuseable
    at io.rsocket.resume.InMemoryResumableFramesStore$FramesSubscriber.onSubscribe(InMemoryResumableFramesStore.java:528) ~[rsocket-core-1.1.3.jar:na]
    at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104) ~[reactor-core-3.6.1.jar:3.6.1]
    at io.rsocket.internal.UnboundedProcessor.subscribe(UnboundedProcessor.java:414) ~[rsocket-core-1.1.3.jar:na]
    at reactor.core.publisher.FluxContextWriteRestoringThreadLocals.subscribe(FluxContextWriteRestoringThreadLocals.java:46) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4512) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.Mono.subscribeWith(Mono.java:4578) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4339) ~[reactor-core-3.6.1.jar:3.6.1]
    at io.rsocket.resume.ResumableDuplexConnection.<init>(ResumableDuplexConnection.java:83) ~[rsocket-core-1.1.3.jar:na]
    at io.rsocket.core.ServerSetup$ResumableServerSetup.acceptRSocketSetup(ServerSetup.java:124) ~[rsocket-core-1.1.3.jar:na]
    at io.rsocket.core.RSocketServer.acceptSetup(RSocketServer.java:418) ~[rsocket-core-1.1.3.jar:na]
    at io.rsocket.core.RSocketServer.accept(RSocketServer.java:386) ~[rsocket-core-1.1.3.jar:na]
    at io.rsocket.core.RSocketServer.lambda$acceptor$0(RSocketServer.java:370) ~[rsocket-core-1.1.3.jar:na]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:132) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.FluxFirstWithSignal$FirstEmittingSubscriber.onNext(FluxFirstWithSignal.java:332) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onNext(FluxTimeout.java:181) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(FluxContextWriteRestoringThreadLocals.java:118) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:176) ~[reactor-core-3.6.1.jar:3.6.1]
    at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:115) ~[rsocket-core-1.1.3.jar:na]
    at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) ~[rsocket-core-1.1.3.jar:na]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(FluxContextWriteRestoringThreadLocals.java:118) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(FluxContextWriteRestoringThreadLocals.java:118) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:294) ~[reactor-netty-core-1.1.14.jar:1.1.14]
    at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:403) ~[reactor-netty-core-1.1.14.jar:1.1.14]
    at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:426) ~[reactor-netty-core-1.1.14.jar:1.1.14]
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114) ~[reactor-netty-core-1.1.14.jar:1.1.14]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

Steps to Reproduce

Sample project to reproduce the issue: https://github.com/abdulrahimseera/rsocket-spring-boot-3.2.x

Needs to run both services and then call "http://localhost:3333/hello/rsocket".

  1. rsocket-client
  2. rsocket-server

Possible Solution

Your Environment

abdulrahimseera commented 6 months ago

Oleh Dokuka

vasyl-pryimak commented 4 months ago

@abdulrahimseera Hey! I had the same problem. The fix was to downgrade reactor-core lib to 3.5.16. implementation("io.projectreactor:reactor-core:3.5.16")

The problem was in Mono subscribe method. In 3.6.x version they changed lines publisher.subscribe(subscriber); to

CoreSubscriber subscriber = Operators.restoreContextOnSubscriberIfPublisherNonInternal(publisher, (CoreSubscriber)subscriber);
        publisher.subscribe(subscriber);

Operators.restoreContextOnSubscriberIfPublisherNonInternal returns FuseableContextWriteRestoringThreadLocalsSubscriber In that class method requestFusion returns 0.

And that is why we have that exception in rsocket from InMemoryResumableFramesStore See onSubscribe method qs.requestFusion(ANY); this return 0 from FuseableContextWriteRestoringThreadLocalsSubscriber

Didn't dig further, but hope this helps.

@OlegDokuka fyi

vasyl-pryimak commented 3 weeks ago

@OlegDokuka any updates on this? still relevant

abdulrahimseera commented 3 weeks ago

@vasyl-pryimak, It appears that no one is actively maintaining the repository. :(