libp2p / jvm-libp2p

a libp2p implementation for the JVM, written in Kotlin 🔥
https://libp2p.io
Apache License 2.0
270 stars 75 forks source link

Complete stream write `ChannelFuture` on the actual write of a Muxer frame #316

Open Nashatyrev opened 1 year ago

Nashatyrev commented 1 year ago

The issue closely relates to #282

The problem:

When writing anything to a stream [child] Channel there is an associated ChannelFuture (either passed to or returned from the corresponding write() method). That ChannelFuture should be completed when the associated message buffer is flushed to the wire. In other words when the associated muxer frame(s) write operation is completed. In yet other words when any buffer(s) associated with the message are released.

That allows various backpressure mechanisms to function correctly on the client side

Currently a stream write ChannelFuture is completed earlier here: https://github.com/libp2p/jvm-libp2p/blob/0981ec69761a469052aea388b452cb22bf65b9cb/libp2p/src/main/kotlin/io/libp2p/etc/util/netty/mux/MuxChannel.kt#L47 So it happens effectively right upon a stream Channel.flush() call. Thus even when the client code is respecting backpressure, i.e. writes the next chunk of data (e.g. writes the next Ethereum block in a batch) only after the previous write was completed, it could still overflow write buffers.

E.g. for the mplex muxer internal Netty write buffers are filled up if the remote reader is slower than local writer E.g. for the yamux muxer it's internal stream write buffer is filled up

Nashatyrev commented 1 year ago

The fix complicated by the following:

https://github.com/libp2p/jvm-libp2p/blob/0981ec69761a469052aea388b452cb22bf65b9cb/libp2p/src/main/kotlin/io/libp2p/etc/util/netty/mux/MuxChannel.kt#L35-L52

doWrite() is the only entry point in the AbstractChannel supposed to handle flush operation. It receives ChannelOutboundBuffer where the only way to get the next buffer is calling remove() which in turn completes corresponding write operation ChannelFuture

Not sure if the AbstractChannel design is suitable for child channel usage scenarios at all

Nashatyrev commented 1 year ago

Another potential problem: Channel.Unsafe.outboundBuffer() returns ChannelOutboundBuffer which is a final class.

Nashatyrev commented 1 year ago

Test and draft attempt trying to address the issue: https://github.com/libp2p/jvm-libp2p/commit/ad6c2d3830e0f80cf3dd9a9f66240c596709bf49