cometbft / cometbft

CometBFT (fork of Tendermint Core): A distributed, Byzantine fault-tolerant, deterministic state machine replication engine
https://docs.cometbft.com
Apache License 2.0
569 stars 389 forks source link

Make sure that reactors' implementation of `Receive()` method are non-blocking #2685

Open cason opened 3 months ago

cason commented 3 months ago

Motivated by #2533.

The Reactor interface defines methods that a component must implement in order in order to use the communication services provided by the p2p layer. A core method that a reactor has to be implement is Receive(), invoked by the p2p layer to deliver a message to the reactor. The implementation of this method should be non-blocking (see details here).

If the implementation of Receive() in some reactor blocks when processing a message received from a given peer, the receive routine (recvRoutine()) of the p2p connection with that peer also blocks until the Receive() call returns:

https://github.com/cometbft/cometbft/blob/0147e634d51f5b27c60f914b02dad348e1ad96d9/p2p/conn/connection.go#L654-L655

This scenario has some consequences:

  1. Messages received from the same peer are not handled by the p2p connection. This means that other reactors are affected, since they will not receive new messages until the blocking Receive() call returns;
  2. As described in #2533, ping/pong messages, that play the role of protocol-level keepalive messages, are not handled either. As a result, the p2p connection pong timeout expires, then the connection produces an error and quits, which leads the switch to disconnect from that peer.

Possible solutions

This situation could be avoided if the Receive() call was not executed by the receive routine of the p2p connection. This change, however, is far from trivial, given the design of the p2p multiples connections. Moreover, even if the Receive() call is executed in a different routine, this other routine will eventually block if the Receive() method of a reactor blocks for a long period of time.

The other approach, suggested in this issue, is to review the implementation of each reactor to make sure that the Receive() method is not blocking. This approach is also not trivial, as ultimately the way to prevent this is to buffer received messages that cannot be immediately processed. Since buffers are always finite in size, this would eventually lead to dropping messages.

evan-forbes commented 3 weeks ago

one simple solution that we played around with was

https://github.com/celestiaorg/celestia-core/blob/146746b114624029432720cfa74e0a4a55a87b9b/p2p/base_reactor.go#L98-L105

the envelop buffers have to be quite large in order not to block for the consensus reactor, at least when blocks are large. As one might expect, most of messages that end up blocking are small messages, not larger messages such as block parts. The most common blocking channels are consensus state and votes, then block parts, at least from some recent experiments.

We're collecting some tcp packet traces soon, but when traced on a local machine we can see that the reason we're not able to utilize all of the bandwidth is simply because we are unable to empty the tcp buffers fast enough. This also explains why other optimizations (like not using a mempool) or changes to prioritization don't make any meaningful change to throughput, at least for large blocks.

One reason, but not the only reason, we block is because all incoming messages from all peers are effectively processed synchronously in the consensus reactor. This also explains the age old issue when connecting more peers reduces throughput. While not frequent, this can result in processing a vote, block part, or state message taking up to 700ms (!). Below is the graph of a particularly egregious example where we see max waits of 2.5seconds. (time taken to process a msg after receiving it in ms on y axis, channel on the x axis, max, average w/ stdev bars, and then the number of msgs for that channel that took over 100ms)

slow processing

Another is because we are not buffering the tcp connection properly. For example when we increase this constant, we see a meaningful but modest increase in throughput. I'm still working through the like 5 io.Reader/io.Writer buffered and unbuffered wrappers around the tcp connections. There are so many io.Reader/io.Writer wrapper around the tcp connections, its difficult to grok which need buffers and which actually degrade performance when we increase them.