neo4j / neo4j-java-driver

Neo4j Bolt driver for Java
Apache License 2.0
329 stars 155 forks source link

Deadlock in BasicPullResponseHandler from neo4j-java-driver when used in a reactive way while cancelling unfinished Reactor subscription #1230

Closed stepanv closed 2 years ago

stepanv commented 2 years ago

We detected a deadlock in our production setup between 2 threads, each locking same locks but in the opposite order.

See the 2 callstacks from a thread dump we collected:

"default-nioEventLoopGroup-1-2" #46 prio=5 os_prio=0 cpu=665086.46ms elapsed=268644.92s tid=0x00007f35e897f550 nid=0x31 waiting for monitor entry  [0x00007f35c1122000]
   java.lang.Thread.State: BLOCKED (on object monitor)
    at org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler.cancel(BasicPullResponseHandler.java)
    - waiting to lock <0x000000072488f200> (a org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler)
    at org.neo4j.driver.internal.cursor.RxResultCursorImpl.cancel(RxResultCursorImpl.java:103)
    at org.neo4j.driver.internal.reactive.InternalRxResult$$Lambda$1634/0x0000000801da7090.dispose(Unknown Source)
    at org.neo4j.driver.internal.shaded.reactor.core.publisher.FluxCreate$SinkDisposable.cancel(FluxCreate.java:1039)
    at org.neo4j.driver.internal.shaded.reactor.core.publisher.FluxCreate$BaseSink.disposeResource(FluxCreate.java:473)
    at org.neo4j.driver.internal.shaded.reactor.core.publisher.FluxCreate$BaseSink.cancel(FluxCreate.java:462)
    at org.neo4j.driver.internal.shaded.reactor.core.publisher.Operators.terminate(Operators.java:1240)
    at org.neo4j.driver.internal.shaded.reactor.core.publisher.StrictSubscriber.cancel(StrictSubscriber.java:155)
    at reactor.core.publisher.Operators.terminate(Operators.java:1240)
    at reactor.core.publisher.StrictSubscriber.cancel(StrictSubscriber.java:155)
    at org.neo4j.driver.internal.shaded.reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.cancel(FluxUsingWhen.java:326)
    at org.neo4j.driver.internal.shaded.reactor.core.publisher.Operators$DeferredSubscription.cancel(Operators.java:1633)
    at org.neo4j.driver.internal.shaded.reactor.core.publisher.FluxUsingWhen$ResourceSubscriber.cancel(FluxUsingWhen.java:248)
    at org.neo4j.driver.internal.shaded.reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
    at org.neo4j.driver.internal.shaded.reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
    at org.neo4j.driver.internal.shaded.reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
    at org.neo4j.driver.internal.shaded.reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.cancel(FluxRetryWhen.java:163)
    at org.neo4j.driver.internal.shaded.reactor.core.publisher.SerializedSubscriber.cancel(SerializedSubscriber.java:157)
    at org.neo4j.driver.internal.shaded.reactor.core.publisher.Operators.terminate(Operators.java:1240)
    at org.neo4j.driver.internal.shaded.reactor.core.publisher.StrictSubscriber.cancel(StrictSubscriber.java:155)
    at reactor.core.publisher.FluxUsing$UsingSubscriber.cancel(FluxUsing.java:176)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.cancel(FluxMapFuseable.java:174)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.cancel(FluxMapFuseable.java:174)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.cancel(FluxMapFuseable.java:174)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
    at reactor.core.publisher.Operators.terminate(Operators.java:1240)
    at reactor.core.publisher.FluxPublish$PublishSubscriber.dispose(FluxPublish.java:301)
    at reactor.core.publisher.FluxRefCount.cancel(FluxRefCount.java:106)
    at reactor.core.publisher.FluxRefCount$RefCountMonitor.innerCancelled(FluxRefCount.java:153)
    at reactor.core.publisher.FluxRefCount$RefCountInner.cancel(FluxRefCount.java:231)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.cancel(FluxMapFuseable.java:174)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.cancel(FluxMapFuseable.java:174)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
    at reactor.core.publisher.Operators.terminate(Operators.java:1240)
    at reactor.core.publisher.FluxZip$ZipInner.cancel(FluxZip.java:954)
    at reactor.core.publisher.FluxZip$ZipCoordinator.cancelAll(FluxZip.java:652)
    at reactor.core.publisher.FluxZip$ZipCoordinator.cancel(FluxZip.java:616)
    at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.cancel(MonoCollectList.java:144)
    - locked <0x000000072488fa10> (a reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber)
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.cancel(MonoFlatMapMany.java:130)
    at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.cancel(FluxConcatArray.java:286)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
    at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:167)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.cancel(FluxPeek.java:153)
    at reactor.core.publisher.Operators.terminate(Operators.java:1240)
    at reactor.core.publisher.FluxFlatMap$FlatMapInner.cancel(FluxFlatMap.java:1022)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:340)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:219)
    at reactor.core.publisher.FlatMapTracker.unsubscribe(FluxFlatMap.java:1083)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.cancel(FluxFlatMap.java:360)
    at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.cancel(FluxDoFinally.java:151)
    at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:167)
    at reactor.core.publisher.Operators.terminate(Operators.java:1240)
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.cancel(MonoFlatMapMany.java:131)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.cancel(FluxPeek.java:153)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.cancel(FluxTimeout.java:251)
    at reactor.core.publisher.SerializedSubscriber.cancel(SerializedSubscriber.java:157)
    at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.cancel(FluxSubscribeOn.java:203)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.cancel(FluxPeek.java:153)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.cancel(FluxPeek.java:153)
    at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.cancel(FluxDoFinally.java:151)
    at reactor.core.publisher.Operators.terminate(Operators.java:1240)
    at reactor.core.publisher.FluxFlatMap$FlatMapInner.cancel(FluxFlatMap.java:1022)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:340)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:219)
    at reactor.core.publisher.FlatMapTracker.unsubscribe(FluxFlatMap.java:1083)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.cancel(FluxFlatMap.java:360)
    at reactor.core.publisher.FluxTakeUntilOther$TakeUntilMainSubscriber.cancelMain(FluxTakeUntilOther.java:182)
    at reactor.core.publisher.FluxTakeUntilOther$TakeUntilMainSubscriber.cancel(FluxTakeUntilOther.java:199)
    at reactor.core.publisher.SerializedSubscriber.cancel(SerializedSubscriber.java:157)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2252)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2220)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2032)
    at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:167)
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.cancel(FluxContextWrite.java:141)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.cancel(FluxHide.java:157)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159)
    at reactor.core.publisher.Operators.terminate(Operators.java:1240)
    at reactor.core.publisher.StrictSubscriber.cancel(StrictSubscriber.java:155)
    at io.micronaut.http.netty.reactive.HandlerSubscriber.cancel(HandlerSubscriber.java:239)
    at io.micronaut.http.netty.reactive.HandlerSubscriber.channelInactive(HandlerSubscriber.java:141)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
    at io.netty.handler.codec.http.HttpContentDecoder.channelInactive(HttpContentDecoder.java:235)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
    at io.netty.handler.codec.http.HttpContentEncoder.channelInactive(HttpContentEncoder.java:313)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
    at io.netty.handler.flow.FlowControlHandler.channelInactive(FlowControlHandler.java:134)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelInactive(CombinedChannelDuplexHandler.java:418)
    at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:392)
    at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:357)
    at io.netty.channel.CombinedChannelDuplexHandler.channelInactive(CombinedChannelDuplexHandler.java:221)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
    at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
    at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
    at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(java.base@17/Thread.java:833)

"Neo4jDriverIO-2-3" #62 daemon prio=10 os_prio=0 cpu=459229.04ms elapsed=268640.21s tid=0x00007f35beb0dfd0 nid=0x3f waiting for monitor entry  [0x00007f35bcefc000]
   java.lang.Thread.State: BLOCKED (on object monitor)
    at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onNext(MonoCollectList.java:90)
    - waiting to lock <0x000000072488fa10> (a reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber)
    at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
    at reactor.core.publisher.FluxZip$ZipCoordinator.drain(FluxZip.java:756)
    at reactor.core.publisher.FluxZip$ZipInner.onNext(FluxZip.java:915)
    at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
    at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
    at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
    at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
    at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
    at reactor.core.publisher.FluxRefCount$RefCountInner.onNext(FluxRefCount.java:200)
    at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
    at reactor.core.publisher.FluxPublish$PublishSubscriber.drain(FluxPublish.java:477)
    at reactor.core.publisher.FluxPublish$PublishSubscriber.onNext(FluxPublish.java:268)
    at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
    at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
    at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
    at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
    at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
    at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
    at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
    at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
    at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
    at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
    at reactor.core.publisher.FluxUsing$UsingSubscriber.onNext(FluxUsing.java:202)
    at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
    at org.neo4j.driver.internal.shaded.reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
    at org.neo4j.driver.internal.shaded.reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99)
    at org.neo4j.driver.internal.shaded.reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onNext(FluxRetryWhen.java:174)
    at org.neo4j.driver.internal.shaded.reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:345)
    at reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
    at org.neo4j.driver.internal.shaded.reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
    at org.neo4j.driver.internal.shaded.reactor.core.publisher.FluxCreate$IgnoreSink.next(FluxCreate.java:618)
    at org.neo4j.driver.internal.shaded.reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:154)
    at org.neo4j.driver.internal.reactive.InternalRxResult.lambda$createRecordConsumer$3(InternalRxResult.java:95)
    at org.neo4j.driver.internal.reactive.InternalRxResult$$Lambda$1633/0x0000000801da6e58.accept(Unknown Source)
    at org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler.handleRecord(BasicPullResponseHandler.java:134)
    at org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler$State$2.onRecord(BasicPullResponseHandler.java:308)
    at org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler.onRecord(BasicPullResponseHandler.java:89)
    - locked <0x000000072488f200> (a org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler)
    at org.neo4j.driver.internal.handlers.RoutingResponseHandler.onRecord(RoutingResponseHandler.java:69)
    at org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher.handleRecordMessage(InboundMessageDispatcher.java:114)
    at org.neo4j.driver.internal.messaging.common.CommonMessageReader.unpackRecordMessage(CommonMessageReader.java:94)
    at org.neo4j.driver.internal.messaging.common.CommonMessageReader.read(CommonMessageReader.java:65)
    at org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:83)
    at org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:35)
    at org.neo4j.driver.internal.shaded.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
    at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
    at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
    at org.neo4j.driver.internal.async.inbound.MessageDecoder.channelRead(MessageDecoder.java:47)
    at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
    at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
    at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
    at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
    at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at org.neo4j.driver.internal.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at org.neo4j.driver.internal.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at org.neo4j.driver.internal.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
    at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
    at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
    at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
    at org.neo4j.driver.internal.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
    at org.neo4j.driver.internal.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at org.neo4j.driver.internal.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(java.base@17/Thread.java:833)

Here you can see that

the threads happen to end up in a deadlock.

Steps to reproduce

  1. Start Neo4j in docker
  2. Prepare following test
    
    import org.junit.jupiter.api.Test;
    import org.neo4j.configuration.GraphDatabaseSettings;
    import org.neo4j.driver.AuthTokens;
    import org.neo4j.driver.Config;
    import org.neo4j.driver.GraphDatabase;
    import org.neo4j.driver.Logging;
    import org.neo4j.driver.SessionConfig;
    import org.neo4j.driver.reactive.RxSession;
    import reactor.core.publisher.Flux;

import java.time.Duration;

/**

  1. Run the test in a debug mode
  2. When you hit that breakpoint wait at least for 6 seconds (until the main method tries to cancel the subscription via the block(Duration.ofSeconds(5))
  3. Resume
  4. The test should never complete because the main thread and the Neo4jDriver thread got into a deadlock
  5. collect the thread dump

This reproducer is just a demonstration of how one can reproduce the problem completely synthetically. By using the breakpoint you're causing the threads to interleave in the exact wrong order and ending in a deadlock.

We hit this deadlock while running a standard business logic in our production on AWS. (No breakpoints needed :) )

Expected behavior

The neo4j BasicPullResponseHandler should not get into a deadlock with Reactor primitives.

stepanv commented 2 years ago

To work around this problem, one can instruct the reactive stream to cancel on a different thread. But that needs to be done after the particular thread executes MonoCollectList.cancel(). That is to declare it before the .collectList() statement.

var publish = Flux.using(
                () -> driver.rxSession(SessionConfig.forDatabase(GraphDatabaseSettings.DEFAULT_DATABASE_NAME)),
                session -> session.readTransaction(tx -> Flux.from(tx.run("call db.ping()").records())),
                rxSession -> {
                    rxSession.close();
                }
        )
        .cancelOn(Schedulers.parallel())
        .collectList();

note that this wouldn't work as the parallel thread would get into a deadlock:

      .collectList()
      .cancelOn(Schedulers.parallel());

but note that this work around works only thanks to the fact that other implementations of the Subscription interface do not use the same lock for onNext() and cancel().

injectives commented 2 years ago

Thank you for detailed explanation of the problem. We will see if we can improve it.

In the meantime, have you had a chance to try the following update? https://github.com/reactor/reactor-core/pull/3053

stepanv commented 2 years ago

@injectives , yep that Simon's improvement prevents the deadlock I reported. However, as he is pointing out, you guys should be ready to accept concurrent cancel() and onNext() calls. This is not possible now as you're using the same monitor (this) as these methods in BasicPullResponseHandler.java are synchronized. Furthermore, you keep the lock while calling upstream (in cancel()) and downstream (in onRecord()) reactive stream.

injectives commented 2 years ago

@stepanv, would you mind giving the following update a go? https://github.com/neo4j/neo4j-java-driver/pull/1233 I have checked it locally and it seems to prevent this issue from happening.

stepanv commented 2 years ago

Hi, I'm sorry for the late response. With the 4.4.6 version, I'm no longer hitting the deadlock. Thanks!