etcd-io / raft

Raft library for maintaining a replicated state machine
Apache License 2.0
624 stars 159 forks source link

Flow control for MsgApp messages #130

Open pav-kv opened 7 months ago

pav-kv commented 7 months ago

This issue describes examples of limited control of the data flows in raft package, and what this can lead to. This motivates improvements in this area. The umbrella issue for this is #64.

Consider a general class of systems[^1] for which a single server can host multiple/many instances of raft Node / RawNode. Say, in a cluster of O(N) servers each server is hosting O(K) instances of raft.RawNode. K can be in [tens of] thousands. RawNodes of a single raft group (typically, 3x or 5x replicas per group) are distributed across the cluster arbitrarily, e.g. one group can be on servers {0, 1, 2}, another group on servers {0, 10, 57}, etc.

In the picture below, we first consider the MsgApp data flows from raft leaders on server 0 to followers on other N-1 servers.

fan-out

In the current design of raft, each RawNode acts independently of others, and makes local decisions on when to send a MsgApp to a follower, based on merely the fact that the follower is behind on the log. The RawNode does this as soon as possible (at any Step/Ready iteration), and the application/server layer has little control of this behaviour. The RawNode drives the process, and the application has to adapt.

Such design is prone to a bunch of overload scenarios.

Scenario 1: Leader-side overload

Details In a loaded system, followers are always behind by a little, and sometimes by much (e.g. after a server crash/restart). As a result, many `RawNode`s always independently decide to send a `MsgApp` to their followers. Each `RawNode` fetches a few entries, constructs a `MsgApp`, and sends to the application layer via the `Ready` [struct](https://github.com/etcd-io/raft/blob/e22adc041041d1980ddefd974c4ea414edd90cd4/node.go#L49-L115). Server 0 has limited capacity. If there are too many messages, server 0 can crash because of reaching an out-of-memory situation. To prevent this, the application layer has a few options, which can be categorised as “workarounds”: 1. Pace sending new entries into the leader `RawNode`s. This will indirectly cause `RawNode` to send fewer `MsgApp` traffic to the followers. [but only if most followers are up-to-date; otherwise, `RawNode` will still push new messages to catch up slow followers] 2. Pace `Ready` calls on the `RawNode`s. However, `RawNode` controls other critical components of raft protocol, such as heartbeats. Calling `Ready` regularly is a necessary part of the API. 3. Drop messages that `Ready` tries to push if the overall volume approaches limits. This will cause `RawNode` to retry sending these message later. However, we are still paying the cost of fetching the unnecessary entries from storage, and constructing the messages. The most unfortunate scenarios still can OOM, when many of these unnecessary message constructions happen simultaneously. 4. Use the `raft`'s built-in pacing [mechanisms](https://github.com/etcd-io/raft/blob/e22adc041041d1980ddefd974c4ea414edd90cd4/raft.go#L187-L220), such as max-in-flight bytes. These only work well when there is a single `RawNode` per server. The overall volume of messages can still be excessive when there are tens of thousands. Setting these static limits to low values, like O(memory/K), artificially reduces the cluster throughput. Ideally, the flow needs to be controlled dynamically.

Scenario 2: Follower-side overload

Details Consider the symmetrical picture from the followers point of view. A single server hosts up to K `RawNode`s. The leaders for these nodes are distributed across the cluster, and can send `MsgApp` flows to followers hosted on server 0 independently. ![fan-in](https://github.com/etcd-io/raft/assets/3757441/fd8e2a73-e811-4f42-a49c-dfedac070e0d) Server 0 has limited capacity. Similarly, it is prone to overload if many leader `RawNode`s independently decide to send some messages to server 0 (an example of such a high fan-in is when the server has been down for some non-trivial amount of time). To protect from this, server 0 has fewer options: 1. Drop the `MsgApp` messages, so that they don't consume resources on the receiver server. This is similar to option (3) for the leader nodes, except the cost of it is higher: the message has been fetched from the leader's storage and travelled across the network. This will result in unnecessary retries and more fetches from storage on the leader. 2. In addition to (1), send some signals to the leader nodes to ask them to slow down. Then the leader will employ one of its (1)-(4) workarounds.

Design Considerations

To improve on the scenarios above, there needs to be a more general/flexible mechanism for flow control / back-pressure in raft. It should be possible to slow down MsgApp flows from RawNode to the hosting server/application, and from an individual leader RawNode to an individual follower RawNode.

There are multiple ways to achieve this, on a spectrum between:

  1. Enrich the flow control capabilities in raft. During the Ready exchange, raft would get some signals from the application, and pace the MsgApp flows accordingly.
  2. Invert the control. Application/server layer is driving the flow and makes decisions when to fetch from storage and construct the outgoing messages, raft provides assistance for doing this correctly.

There might be some quick wins in approach (1), but in the long term approach (2) seems more appropriate. Approach (2) makes raft package simpler and more maintainable. Currently, raft is a mixture of the core protocol and control mechanisms, and the latter can be factored out.

To implement approach (2), raft needs to export/delegate the notion of "leader->follower flow" in such a way that the application layer can drive it. It would be similar to the Progress tracker that raft already uses internally, and exposes for informational purposes. The application would use the "progress" information to decide when and how much to send to a follower, and notify raft to update the progress accordingly. This would be a shared data structure.

Currently, a recent portion of the raft log is stored in the unstable in-memory structure, and the rest of the log resides in the application-specific Storage implementation. Raft abstracts the whole log (which fetches from memory or storage) behind its internal raftLog wrapper. Some of the capabilities of this wrapper would need to be accessible to the application layer, in order to construct messages.

The use of such new capabilities would not be limited to leader->follower MsgApp flows. The asynchronous log writes API [introduced here] was partly necessitated by the limitations of the current raft design too. The design improvements considered here would make raft asynchronous "from the box" and alleviate the need to make its API complex to support multiple modes of operation. I.e. instead of implementing workarounds for each new use-case, it is "cheaper" to expose a flexible API.

[^1]: such as CockroachDB; or etcd if there is any "multi-tenancy" in the way it can be hosted - e.g. many etcd processes per VM.

pav-kv commented 7 months ago

A simpler example of why something like this is needed:

When one of the follower nodes is down, and the application layer (specifically, the Transport component in the pictures above) knows it, it is suboptimal to let raft continue building and sending MsgApp messages (and most of other types of messages really) to this follower. To not let this happen, there must be a mechanism for the application layer to opt out of / reduce limits on specific message flows.

As of today, raft will only stop sending MsgApp messages to a follower after maxing out the static in-flight limits.

ahrtr commented 7 months ago

To implement approach (2), raft needs to export/delegate the notion of "leader->follower flow" in such a way that the application layer can drive it. It would be similar to the Progress tracker that raft already uses internally, and exposes for informational purposes. The application would use the "progress" information to decide when and how much to send to a follower, and notify raft to update the progress accordingly. This would be a shared data structure.

High level looks good to me. A couple of comments: