libp2p / jvm-libp2p

a libp2p implementation for the JVM, written in Kotlin 🔥
https://libp2p.io
Apache License 2.0
260 stars 74 forks source link

Don't throw when stream id doesn't exist on disconnect #341

Closed StefanBratanov closed 10 months ago

StefanBratanov commented 10 months ago

When running Teku with develop and yamux enabled, getting following exception relatively often. Essentially stream seems to be already closed when LibP2PRpcStream closes it, so think we shouldn't throw in this case.

2023-10-26 16:24:57.518 | ERROR | teku-status-log                | PLEASE FIX OR REPORT | Unexpected exception thrown for nioEventLoopGroup-3-23
io.libp2p.mux.UnknownStreamIdMuxerException: Stream with id 84c5a6fffe1df5ab-00005bbc-00000108-2816c1bcd7f6b2a3-f7101b9b/2 not found
    at io.libp2p.mux.yamux.YamuxHandler.getStreamHandlerOrReleaseAndThrow(YamuxHandler.kt:187) ~[jvm-libp2p-develop.jar:?]
    at io.libp2p.mux.yamux.YamuxHandler.getStreamHandlerOrThrow(YamuxHandler.kt:180) ~[jvm-libp2p-develop.jar:?]
    at io.libp2p.mux.yamux.YamuxHandler.onLocalDisconnect(YamuxHandler.kt:263) ~[jvm-libp2p-develop.jar:?]
    at io.libp2p.etc.util.netty.mux.AbstractMuxHandler.localDisconnect(AbstractMuxHandler.kt:114) ~[jvm-libp2p-develop.jar:?]
    at io.libp2p.etc.util.netty.mux.MuxChannel.doDisconnect(MuxChannel.kt:56) ~[jvm-libp2p-develop.jar:?]
    at io.netty.channel.AbstractChannel$AbstractUnsafe.disconnect(AbstractChannel.java:591) ~[netty-transport-4.1.99.Final.jar:4.1.99.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.disconnect(DefaultChannelPipeline.java:1347) ~[netty-transport-4.1.99.Final.jar:4.1.99.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeDisconnect(AbstractChannelHandlerContext.java:703) ~[netty-transport-4.1.99.Final.jar:4.1.99.Final]
    at io.netty.channel.AbstractChannelHandlerContext.disconnect(AbstractChannelHandlerContext.java:682) ~[netty-transport-4.1.99.Final.jar:4.1.99.Final]
    at io.netty.channel.AbstractChannelHandlerContext.disconnect(AbstractChannelHandlerContext.java:555) ~[netty-transport-4.1.99.Final.jar:4.1.99.Final]
    at io.netty.channel.DefaultChannelPipeline.disconnect(DefaultChannelPipeline.java:952) ~[netty-transport-4.1.99.Final.jar:4.1.99.Final]
    at io.netty.channel.AbstractChannel.disconnect(AbstractChannel.java:239) ~[netty-transport-4.1.99.Final.jar:4.1.99.Final]
    at tech.pegasys.teku.networking.p2p.libp2p.rpc.LibP2PRpcStream.closeWriteStream(LibP2PRpcStream.java:62) ~[classes/:?]
    at tech.pegasys.teku.networking.eth2.rpc.core.RpcResponseCallback.completeWithErrorResponse(RpcResponseCallback.java:73) ~[classes/:?]
    at tech.pegasys.teku.networking.eth2.rpc.core.Eth2IncomingRequestHandler.readComplete(Eth2IncomingRequestHandler.java:93) ~[classes/:?]
    at tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler$Controller.lambda$onRemoteWriteClosed$3(RpcHandler.java:251) ~[classes/:?]
    at java.util.Optional.ifPresentOrElse(Optional.java:196) [?:?]
    at tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler$Controller.runHandler(RpcHandler.java:265) [classes/:?]
    at tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler$Controller.onRemoteWriteClosed(RpcHandler.java:251) [classes/:?]
    at tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler$Controller.onChannelClosed(RpcHandler.java:257) [classes/:?]
    at tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler$Controller.handlerRemoved(RpcHandler.java:238) [classes/:?]
    at io.netty.channel.AbstractChannelHandlerContext.callHandlerRemoved(AbstractChannelHandlerContext.java:1122) [netty-transport-4.1.99.Final.jar:4.1.99.Final]
    at io.netty.channel.DefaultChannelPipeline.callHandlerRemoved0(DefaultChannelPipeline.java:637) [netty-transport-4.1.99.Final.jar:4.1.99.Final]
    at io.netty.channel.DefaultChannelPipeline.destroyDown(DefaultChannelPipeline.java:876) [netty-transport-4.1.99.Final.jar:4.1.99.Final]
    at io.netty.channel.DefaultChannelPipeline.destroyUp(DefaultChannelPipeline.java:844) [netty-transport-4.1.99.Final.jar:4.1.99.Final]
    at io.netty.channel.DefaultChannelPipeline.destroy(DefaultChannelPipeline.java:836) [netty-transport-4.1.99.Final.jar:4.1.99.Final]
    at io.netty.channel.DefaultChannelPipeline.access$700(DefaultChannelPipeline.java:46) [netty-transport-4.1.99.Final.jar:4.1.99.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelUnregistered(DefaultChannelPipeline.java:1392) [netty-transport-4.1.99.Final.jar:4.1.99.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:215) [netty-transport-4.1.99.Final.jar:4.1.99.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:195) [netty-transport-4.1.99.Final.jar:4.1.99.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:821) [netty-transport-4.1.99.Final.jar:4.1.99.Final]
    at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:821) [netty-transport-4.1.99.Final.jar:4.1.99.Final]
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173) [netty-common-4.1.99.Final.jar:4.1.99.Final]
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166) [netty-common-4.1.99.Final.jar:4.1.99.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) [netty-common-4.1.99.Final.jar:4.1.99.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566) [netty-transport-4.1.99.Final.jar:4.1.99.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) [netty-common-4.1.99.Final.jar:4.1.99.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.99.Final.jar:4.1.99.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.99.Final.jar:4.1.99.Final]
    at java.lang.Thread.run(Thread.java:833) [?:?]
Nashatyrev commented 10 months ago

Basically speaking this is actually an exceptional situation. The same as trying to write to a closed (RST) stream. When local party is disconnecting (or closing for write or FIN) it doesn't necessarily expect the stream to close, it may e.g. expect some data from remote party.

If we want to be more lenient to that case then it probably makes sense to catch that explicit exception on client side (Teku) as it is now implemented in some places.

It also confuses me a bit that Teku tries to close the stream for writing when it was closed (presumably remotely):

    at tech.pegasys.teku.networking.p2p.libp2p.rpc.LibP2PRpcStream.closeWriteStream(LibP2PRpcStream.java:62) ~[classes/:?]
    at tech.pegasys.teku.networking.eth2.rpc.core.RpcResponseCallback.completeWithErrorResponse(RpcResponseCallback.java:73) ~[classes/:?]
    at tech.pegasys.teku.networking.eth2.rpc.core.Eth2IncomingRequestHandler.readComplete(Eth2IncomingRequestHandler.java:93) ~[classes/:?]
    at tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler$Controller.lambda$onRemoteWriteClosed$3(RpcHandler.java:251) ~[classes/:?]
    at java.util.Optional.ifPresentOrElse(Optional.java:196) [?:?]
    at tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler$Controller.runHandler(RpcHandler.java:265) [classes/:?]
    at tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler$Controller.onRemoteWriteClosed(RpcHandler.java:251) [classes/:?]
    at tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler$Controller.onChannelClosed(RpcHandler.java:257) [classes/:?]
    at tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler$Controller.handlerRemoved(RpcHandler.java:238) [classes/:?]
StefanBratanov commented 10 months ago

Thanks for t

Basically speaking this is actually an exceptional situation. The same as trying to write to a closed (RST) stream. When local party is disconnecting (or closing for write or FIN) it doesn't necessarily expect the stream to close, it may e.g. expect some data from remote party.

If we want to be more lenient to that case then it probably makes sense to catch that explicit exception on client side (Teku) as it is now implemented in some places.

It also confuses me a bit that Teku tries to close the stream for writing when it was closed (presumably remotely):

  at tech.pegasys.teku.networking.p2p.libp2p.rpc.LibP2PRpcStream.closeWriteStream(LibP2PRpcStream.java:62) ~[classes/:?]
  at tech.pegasys.teku.networking.eth2.rpc.core.RpcResponseCallback.completeWithErrorResponse(RpcResponseCallback.java:73) ~[classes/:?]
  at tech.pegasys.teku.networking.eth2.rpc.core.Eth2IncomingRequestHandler.readComplete(Eth2IncomingRequestHandler.java:93) ~[classes/:?]
  at tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler$Controller.lambda$onRemoteWriteClosed$3(RpcHandler.java:251) ~[classes/:?]
  at java.util.Optional.ifPresentOrElse(Optional.java:196) [?:?]
  at tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler$Controller.runHandler(RpcHandler.java:265) [classes/:?]
  at tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler$Controller.onRemoteWriteClosed(RpcHandler.java:251) [classes/:?]
  at tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler$Controller.onChannelClosed(RpcHandler.java:257) [classes/:?]
  at tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler$Controller.handlerRemoved(RpcHandler.java:238) [classes/:?]

Thank you. It makes sense. I will try to make sense of it and fix it in Teku.