spring-projects / spring-framework

Spring Framework
https://spring.io/projects/spring-framework
Apache License 2.0
55.71k stars 37.79k forks source link

Only one connection receive subscriber allowed #26699

Open marcingrzejszczak opened 3 years ago

marcingrzejszczak commented 3 years ago

originally opened https://github.com/spring-cloud/spring-cloud-sleuth/issues/1854 however it can reproduced without Spring Cloud Sleuth. Below you can find the original description by @violetagg

Describe the bug The exception below is observed when using spring-cloud-sleuth

2021-02-22 15:12:25.427 ERROR [,77249f740f448dff,77249f740f448dff] 18936 --- [ctor-http-nio-2] reactor.core.publisher.Operators         : Operator called default onErrorDropped

java.lang.IllegalStateException: Only one connection receive subscriber allowed.
    at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:180) ~[reactor-netty-core-1.0.3.jar:1.0.3]
    at reactor.netty.channel.FluxReceive.subscribe(FluxReceive.java:144) ~[reactor-netty-core-1.0.3.jar:1.0.3]
    at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62) ~[reactor-core-3.4.2.jar:3.4.2]
    at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:340) ~[reactor-netty-core-1.0.3.jar:1.0.3]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4046) ~[reactor-core-3.4.2.jar:3.4.2]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.4.2.jar:3.4.2]
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:392) ~[reactor-core-3.4.2.jar:3.4.2]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:140) ~[reactor-core-3.4.2.jar:3.4.2]
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121) ~[reactor-core-3.4.2.jar:3.4.2]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:140) ~[reactor-core-3.4.2.jar:3.4.2]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onError(FluxFilterFuseable.java:162) ~[reactor-core-3.4.2.jar:3.4.2]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
    at reactor.core.publisher.MonoCollect$CollectSubscriber.onError(MonoCollect.java:144) ~[reactor-core-3.4.2.jar:3.4.2]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
    at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:132) ~[reactor-core-3.4.2.jar:3.4.2]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
    at reactor.core.publisher.Operators.error(Operators.java:196) ~[reactor-core-3.4.2.jar:3.4.2]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:164) ~[reactor-core-3.4.2.jar:3.4.2]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:67) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
    at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92) ~[reactor-core-3.4.2.jar:3.4.2]
    at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:168) ~[reactor-netty-core-1.0.3.jar:1.0.3]
    at reactor.netty.channel.FluxReceive.subscribe(FluxReceive.java:144) ~[reactor-netty-core-1.0.3.jar:1.0.3]
    at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62) ~[reactor-core-3.4.2.jar:3.4.2]
    at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:340) ~[reactor-netty-core-1.0.3.jar:1.0.3]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.2.jar:3.4.2]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) ~[reactor-core-3.4.2.jar:3.4.2]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73) ~[reactor-core-3.4.2.jar:3.4.2]
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387) ~[reactor-core-3.4.2.jar:3.4.2]
    at org.springframework.cloud.sleuth.instrument.web.client.TraceExchangeFilterFunction$TraceWebClientSubscriber.onNext(TraceWebClientBeanPostProcessor.java:220) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
    at org.springframework.cloud.sleuth.instrument.web.client.TraceExchangeFilterFunction$TraceWebClientSubscriber.onNext(TraceWebClientBeanPostProcessor.java:183) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.2.jar:3.4.2]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.4.2.jar:3.4.2]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199) ~[reactor-core-3.4.2.jar:3.4.2]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199) ~[reactor-core-3.4.2.jar:3.4.2]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199) ~[reactor-core-3.4.2.jar:3.4.2]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) ~[reactor-core-3.4.2.jar:3.4.2]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250) ~[reactor-core-3.4.2.jar:3.4.2]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2359) ~[reactor-core-3.4.2.jar:3.4.2]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.request(ScopePassingSpanSubscriber.java:74) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:150) ~[reactor-core-3.4.2.jar:3.4.2]
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onSubscribe(MonoFlatMapMany.java:245) ~[reactor-core-3.4.2.jar:3.4.2]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:67) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]

the double subscription happens with onError:

    at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:180) ~[reactor-netty-core-1.0.3.jar:1.0.3]
    at reactor.netty.channel.FluxReceive.subscribe(FluxReceive.java:144) ~[reactor-netty-core-1.0.3.jar:1.0.3]
    at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62) ~[reactor-core-3.4.2.jar:3.4.2]
    at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:340) ~[reactor-netty-core-1.0.3.jar:1.0.3]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4046) ~[reactor-core-3.4.2.jar:3.4.2]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.4.2.jar:3.4.2]
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:392) ~[reactor-core-3.4.2.jar:3.4.2]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]

and onNext

    at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:168) ~[reactor-netty-core-1.0.3.jar:1.0.3]
    at reactor.netty.channel.FluxReceive.subscribe(FluxReceive.java:144) ~[reactor-netty-core-1.0.3.jar:1.0.3]
    at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62) ~[reactor-core-3.4.2.jar:3.4.2]
    at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:340) ~[reactor-netty-core-1.0.3.jar:1.0.3]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.2.jar:3.4.2]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) ~[reactor-core-3.4.2.jar:3.4.2]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]

The issue Only one connection receive subscriber allowed. is reported as part of the memory leak issue https://github.com/reactor/reactor-netty/issues/1513. However this exception is not related to the memory leak issue.

Sample In the example https://github.com/shj95/webflux-leak-test that is provided with https://github.com/reactor/reactor-netty/issues/1513, add a dependency implementation 'org.springframework.cloud:spring-cloud-starter-sleuth:3.0.1', then use the provided jmeter script to reproduce it.

rstoyanchev commented 3 years ago

I believe this is a result of an attempt to read the body while wrapping an exception concurrently with a cancellation from a closed server response. I'll have look to see if we can do something better and avoid this.

mhmdsalem1993 commented 2 years ago

Any updates on this issue, if its a spring issue or a usage issue? I've the same exception https://github.com/reactor/reactor-netty/issues/2115 also

lonre commented 2 years ago

Hi, any update on this? @rstoyanchev

SimonBerry555 commented 2 years ago

I have this issue too - any updates @rstoyanchev ? Would be good to know if a) it can be safely ignored or b) a workaround. Thanks

lonre commented 2 years ago

Hi, stack trace here @rstoyanchev

java.lang.IllegalStateException: Only one connection receive subscriber allowed.
    at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:182)
    at reactor.netty.channel.FluxReceive.subscribe(FluxReceive.java:143)
    at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)
    at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:340)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263)
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:2063)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172)
    at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134)
    at reactor.core.publisher.Operators$MonoSubscriber.onError(Operators.java:1863)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:544)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142)
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onError(FluxMapFuseable.java:340)
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onError(FluxFilterFuseable.java:382)
    at reactor.core.publisher.MonoCollect$CollectSubscriber.onError(MonoCollect.java:144)
    at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134)
    at reactor.core.publisher.Operators.error(Operators.java:198)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:165)
    at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92)
    at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:167)
    at reactor.netty.channel.FluxReceive.subscribe(FluxReceive.java:143)
    at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)
    at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:340)
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
    at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130)
    at reactor.core.publisher.FluxDoOnEach$DoOnEachSubscriber.onNext(FluxDoOnEach.java:173)
    at org.springframework.cloud.sleuth.instrument.web.client.TraceExchangeFilterFunction$TraceWebClientSubscriber.onNext(TraceExchangeFilterFunction.java:197)
    at org.springframework.cloud.sleuth.instrument.web.client.TraceExchangeFilterFunction$TraceWebClientSubscriber.onNext(TraceExchangeFilterFunction.java:153)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398)
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:150)
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:189)
    at reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99)
    at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onNext(FluxRetryWhen.java:174)
    at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:172)
    at reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:431)
    at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:677)
    at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onStateChange(DefaultPooledConnectionProvider.java:187)
    at reactor.netty.resources.DefaultPooledConnectionProvider$PooledConnection.onStateChange(DefaultPooledConnectionProvider.java:444)
    at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:638)
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
    at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:487)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:385)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
cunhap commented 1 year ago

Hey guys, having the same problem here using spring data elasticsearch. Any news? @rstoyanchev

skurzbac commented 1 year ago

We are also experiencing this issue. Is there a workaround?

patpatpat123 commented 10 months ago

I am encountering this issue as well. Any update please? Good day!

lonre commented 10 months ago

Hi, guys, don't forget to vote with a thumbs up if you still have this issue

patpatpat123 commented 10 months ago

Hello team, I am encountering this even 2 years after the original post.

I am using the latest 3.2.0-M2 and able to reproduce it consistently.

2023-09-08 06:51:10 1 ERROR --- [reactor-http-epoll-29] [,,] reactor.core.publisher.Operators : Operator called default onErrorDropped
java.lang.IllegalStateException: Rejecting additional inbound receiver. State=[terminated=false, cancelled=true, pending=0, error=false]
        at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:187)
        at reactor.netty.channel.FluxReceive.subscribe(FluxReceive.java:145)
        at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)
        at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:340)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4495)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263)
        at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4495)
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:2210)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:180)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278)
        at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142)
        at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:544)
        at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134)
        at reactor.core.publisher.Operators.error(Operators.java:198)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:165)
        at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92)
        at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:170)
        at reactor.netty.channel.FluxReceive.subscribe(FluxReceive.java:145)
        at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)
        at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:340)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4495)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263)
        at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)
        at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
        at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113)
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
        at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
        at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
        at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
        at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)
        at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
        at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2545)
        at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:150)
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onSubscribe(MonoFlatMapMany.java:245)
        at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onSubscribe(FluxContextWrite.java:101)
        at reactor.core.publisher.FluxJust.subscribe(FluxJust.java:68)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8773)
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:195)
        at reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99)
        at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onNext(FluxRetryWhen.java:174)
        at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:172)
        at reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:435)
        at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:710)
        at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onStateChange(DefaultPooledConnectionProvider.java:195)
        at reactor.netty.resources.DefaultPooledConnectionProvider$PooledConnection.onStateChange(DefaultPooledConnectionProvider.java:456)
        at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:647)
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at reactor.netty.http.client.AbstractHttpClientMetricsHandler.channelRead(AbstractHttpClientMetricsHandler.java:142)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:454)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
        at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at reactor.netty.channel.AbstractChannelMetricsHandler.channelRead(AbstractChannelMetricsHandler.java:126)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:280)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1466)
        at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1329)
        at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1378)
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Unknown Source)

I have some more debug logs enabled, but do not know what can be usefull.

May I ask what is the root cause of this technical issue, and how to solve this please?

Thank you

blue-int commented 9 months ago

Any update on this? I am experiencing this issue in spring boot 2.5.8, spring 5.3.14

Anamura commented 5 months ago

Any updates in 2024? thank you

maxmateos commented 2 months ago

Hello, any updates on this issue? I also seem to have this error with spring data elasticsearch and it results in a netty memory leak detection. If anyone has been able to find a solution or workaround, I would appreciate it.

Abdulkhakimov commented 1 month ago

Hi guys the same issue was occured. Any updates? Thanks