akiradeveloper / lolraft

A Multi-Raft implementation in Rust language.
MIT License
179 stars 18 forks source link

Multiplexed heartbeat #357

Closed akiradeveloper closed 3 months ago

akiradeveloper commented 4 months ago

As of 0.10, the heartbeat isn't optimized. If two leader processes in a node send heartbeats to their followers in the same node, ideally, the heartbeat can be batched in one RPC call.

Let's calculate the effect of this optimization. Let's assume N nodes, L shards, K replications per shard. Then

If (1) is evenly distributed on (2), each path in (2) is attributed to LK/N(N-1) heartbeats and this is the expected number of heartbeat in a batch.

Take L=1000, N=8, K=5 for example. The number is 5000/56 ~ 100. This means the heartbeat RPC is reduced 99%. This doesn't mean the network traffic due to heartbeats is reduced 99% though, the effect seems promising.

ref. https://github.com/akiradeveloper/lolraft/discussions/340

akiradeveloper commented 3 months ago

Heartbeat request will surely be altered

before:

message Heartbeat {
  uint32 lane_id = 1;
  uint64 leader_term = 2;
  string leader_id = 3;
  uint64 leader_commit_index = 4;
}

after:

message LeaderCommitState {
  uint64 leader_term = 1;
  uint64 leader_commit_index = 2;
}
message Heartbeat {
  string leader_id = 1;
  map<uint32, LeaderCommitState> leader_commit_state = 2;
}

When a process calls send_heartbeat, the request is buffered and sent over the network after some period. I think this algorithm consequently synchronizes the processes because they receives the response at the same time.

#[derive(Clone)]
pub struct Thread {
    follower_id: NodeId,
    voter: Ref<Voter>,
}
impl Thread {
    async fn run_once(&self) -> Result<()> {
        let election_state = self.voter.read_election_state();
        ensure!(std::matches!(election_state, voter::ElectionState::Leader));

        self.voter.send_heartbeat(self.follower_id.clone()).await
    }

    fn do_loop(self) -> ThreadHandle {
        let hdl = tokio::spawn(async move {
            loop {
                // Every iteration involves
                // T = 100ms sleep + RPC round trip time.
                // So, heartbeat is observed at follower site every T time.
                // We can't use tokio::time::interval instead because it results in
                // follower receives heartbeat every 100ms regardless of RPC round trip time.
                // In this case, the failure detector at follower site will not work correctly.
                tokio::time::sleep(Duration::from_millis(100)).await;
                self.run_once().await.ok();
                // Processes will be synced here.
            }
        })
        .abort_handle();

        ThreadHandle(hdl)
    }
}
akiradeveloper commented 3 months ago

I think this optimization won't affect any code in process layer.

This code in communicator should be modified.

    pub async fn send_heartbeat(&self, req: request::Heartbeat) -> Result<()> {
        let req = raft::Heartbeat {
            lane_id: self.lane_id,
            leader_id: req.leader_id.to_string(),
            leader_term: req.leader_term,
            leader_commit_index: req.leader_commit_index,
        };
        self.cli.clone().send_heartbeat(req).await?;
        Ok(())
    }
akiradeveloper commented 3 months ago

Related topic: https://github.com/sofastack/sofa-jraft/issues/135