Closed betamos closed 1 year ago
Hey @betamos
Thanks for this report and sorry for the long delay for an answer. Would you like to propose this change into the it-pushable
module?
Hey, yeah kinda got stuck last time due to tests in it-pushable failing after my refactor (still works fine for me in libp2p) but it still seemed risky to proceed in case I break some use case I wasn't covering in my application. It needs a little more work.
The API of it-pushable seems more extensive than what's used in mplex (e.g. return
, throw
, returning "self" from some methods, and multi-consumer support). I'm not sure if they're used or their exact semantics, it'd affect the implementation choices.
Happy to take a second stab, but it'd help if I could get some clarification:
libp2p-mplex
, it-handshake
, js-libp2p
that need to be considered (and tested) for a change to land? For instance, I'm seeing libp2p-deamon
using it, but the exact usage pattern is a little hard to decipher on a quick glance..+CC @alanshaw
I'd welcome a PR to it-pushable
to add some sort of push buffer that can be filled and the ability to await on push. Note the work in https://github.com/alanshaw/it-pushable/issues/1 would give you the latter.
Are there other modules aside from libp2p-mplex, it-handshake, js-libp2p that need to be considered (and tested) for a change to land? For instance, I'm seeing libp2p-deamon using it, but the exact usage pattern is a little hard to decipher on a quick glance.
Testing the update against the libp2p CI test suite should be sufficient depending on the changes. If we want to do additional verification we'll probably run the update against the js-ipfs test suite, but I don't anticipate that being needed for this change
it-pushable
now has primitives to do this sort of thing - pushable.readableLength
will tell you how big the queue is and pushable.onEmpty
will let you wait for the pushable to drain.
TBH making every .push
async will add a lot of latency due to the promises that need to be awaited so it's better to use synchronous methods where possible.
That said, everyone should use Yamux instead since it has back pressure built in and Mplex doesn't.
Mplex uses
it-pushable
internally to route messages to the right stream. Sinceit-pushable
always succeeds thepush
call, it results in unbounded memory consumption in cases were the source outpaces the network on the sender side. I suspect that the same issue occurs on the receiver side when the network outpaces the sink but I have not yet confirmed it. Since everything else uses streaming iterables, I argue that basic flow control is the expected behavior.I solved this by making
push
async (promise resolves once the message has been consumed). Then I modified mplex to use async/await in some locations (note: not complete).To reproduce, simply create a large stream on the sender side:
On the receiver, simply consume and drain the stream. The receiver (in my case) eventually receives a corrupted message: