tikv / raft-rs

Raft distributed consensus algorithm implemented in Rust.
Apache License 2.0
2.86k stars 391 forks source link

Is there some guidelines on calling step, ready and advance? #506

Closed tisonkun closed 1 year ago

tisonkun commented 1 year ago
  1. I don't know if I must call has_ready and ready after each step, or I can call several steps in batch and then call ready.
  2. I don't know the difference between the three or four variants of advance. Although they have docs, it's vague to understand what a typical call round should look like beyond the example - is it the only approach to use?
  3. Given that the main process is synchronous, I'm unsure how to trigger tick in a thread-safe manner. That said, the last operation can take a long time to complete. Will it advance the inner timer? Otherwise, ticks queuing with other operations can be later than it's supposed to be.

cc @BusyJay

BTW, shall we discuss this kind of questions on discussions, or just reuse issues?

BusyJay commented 1 year ago

There is also other examples that should answer your questions. https://github.com/tikv/raft-rs/tree/master/examples.

I'm unsure how to trigger tick in a thread-safe manner.

You can use locks or use message queue, it's OK as long as the Rust compiler doesn't complain.

Will it advance the inner timer

There is no inner timer, you have to call tick to let RawNode know time has passed. If IO has to be happen in the thread that drives Raft, I'm afraid it has to delay the timer and cause unexpected elections.

BTW, shall we discuss this kind of questions on discussions, or just reuse issues?

Both are OK. This is a small project that doesn't need complicated rules. :)

tisonkun commented 1 year ago

Thank you!

Both are OK

I'd prefer Discussions and thus check if it works for you :D

There is no inner timer

Yeah. I see there is only tick counter. But I don't check if other method increase those counters internally. As you said, if only tick advance the counter (effective timer), and heavy IO delays that logic, the raft state can be unexpected unstable.

May I ask how TiKV handles or works around this risk?

locks or use message queue

You're right that tick take a mut reference instead of interior mutability and Rust compiler should help here.

other examples

I checked them roughly. But those examples seems quite handmade that preconfigure role and state a bit. My examination case is running a raft nodes cluster in different process/hosts and now I'm trying to make three nodes campaign works stably.

I'll bring more concrete questions back if any. Now the main loop is like:

    pub fn start(mut self) -> anyhow::Result<()> {
        // simulate self.node.campaign
        self.tx_message.send(RaftMessage {
            msg_type: MessageType::MsgHup as i32,
            ..Default::default()
        })?;

        loop {
            self.select_any();

            match self.shutdown.try_recv() {
                Err(TryRecvError::Empty) => {
                    // no shutdown signal so far
                }
                Ok(()) | Err(TryRecvError::Disconnected) => {
                    info!(self.logger, "shutting down because server stopped");
                    return Ok(());
                }
            }

            for raft_msg in self.rx_message.try_iter() {
                if raw_node::is_local_msg(raft_msg.msg_type()) {
                    self.node.raft.step(raft_msg)?;
                } else {
                    self.node.step(raft_msg)?;
                }
            }

            self.on_ready();
        }
    }

I'm understanding how to handle and advance the raft core state and which state I should hold in server layer now.

tisonkun commented 1 year ago

FYI I can run a three nodes cluster with a stable leader now. Below is the main loop I'd like to share. Perhaps I will disclose the source once an MVP is constructed.

For questions above:

call several steps in batch and then call ready

It seems correct. While I don't know if some ready state must be handled to update the raft store before step more messages. But that order should be considered and coordinated in the server.

In the main loop below, for each run, the node processes in -

  1. Tick (while there is an issue https://github.com/crossbeam-rs/crossbeam/issues/989 on timer semantic)
  2. Process local messages ("local" should be well-defined later, that I'm unsure if it's different from is_local_msg in raft-rs. So far, it means messages sent from the same server)
  3. Process remote messages.
  4. Handle possible Ready.

difference between the three or four variants of advance

It seems advance is enough now. When I work on the store part, perhaps more variants are needed.

pub struct RaftNode {
    node: RawNode<MemStorage>,
    state: NodeState,

    shutdown: Receiver<()>,
    tx_message: Sender<RaftMessage>,
    rx_message: Receiver<RaftMessage>,
    #[allow(dead_code)] // prevent unnecessary recv error
    tx_api: Sender<ServiceMessage>,
    rx_api: Receiver<ServiceMessage>,

    tick: Receiver<Instant>,

    peers: PeerProxyMap,

    logger: Logger,
}

pub struct NodeInitContext {
    pub id: u64,
    pub peers: Vec<Peer>,
    pub shutdown: Receiver<()>,
    pub tx_message: Sender<RaftMessage>,
    pub rx_message: Receiver<RaftMessage>,
    pub tx_api: Sender<ServiceMessage>,
    pub rx_api: Receiver<ServiceMessage>,
    pub logger: Logger,
}

#[derive(Debug)]
struct NodeState {
    role: StateRole,
}

impl RaftNode {
    pub fn new(context: NodeInitContext) -> RaftNode {
        let NodeInitContext {
            id,
            peers,
            shutdown,
            tx_message,
            rx_message,
            tx_api,
            rx_api,
            logger,
        } = context;

        let config = {
            let mut config = Config::new(id);
            config.priority = (1 << id) as i64;
            config.election_tick = 10;
            config.heartbeat_tick = 1;
            config.max_size_per_msg = 1024 * 1024 * 1024;
            config.validate().unwrap();
            config
        };

        let voters = peers.iter().map(|p| p.id).collect::<Vec<_>>();
        let storage = MemStorage::new_with_conf_state((voters, vec![]));
        let logger = logger.new(o!("tag" => format!("peer_{id}")));
        let node = RawNode::new(&config, storage, &logger).unwrap();

        let peers = PeerProxyMap::new(peers, logger.clone());
        let state = NodeState {
            role: StateRole::Follower,
        };

        let tick = crossbeam_channel::tick(Duration::from_millis(100));

        RaftNode {
            node,
            state,
            peers,
            tx_message,
            rx_message,
            tx_api,
            rx_api,
            tick,
            shutdown,
            logger,
        }
    }

    pub fn do_main(self) {
        let logger = self.logger.clone();
        if let Err(err) = self.start() {
            error!(logger, "raft node shutdown improperly"; "err" => ?err);
        }
    }

    fn start(mut self) -> anyhow::Result<()> {
        // simulate self.node.campaign
        self.tx_message.send(RaftMessage::campaign())?;

        loop {
            self.select_any();

            match self.shutdown.try_recv() {
                Err(TryRecvError::Empty) => {
                    // no shutdown signal so far
                }
                Ok(()) | Err(TryRecvError::Disconnected) => {
                    info!(self.logger, "shutting down because server stopped");
                    return Ok(());
                }
            }

            if self.tick.try_recv().is_ok() {
                self.node.tick();
            }

            for msg in self.rx_message.try_iter() {
                let msg = msg.0;
                if suraft::raw_node::is_local_msg(msg.msg_type()) {
                    self.node.raft.step(msg)?;
                } else {
                    self.node.step(msg)?;
                }
            }

            for msg in self.rx_api.try_iter() {
                self.node.step(msg.0)?;
            }

            self.on_ready();
        }
    }

    fn on_ready(&mut self) {
        if !self.node.has_ready() {
            return;
        }
        trace!(self.logger, "on_ready");

        let mut ready = self.node.ready();
        // trace!(self.logger, "{:#?}", ready);

        if let Some(ss) = ready.ss() {
            if ss.raft_state != self.state.role {
                info!(
                    self.logger,
                    "changing raft node role from {:?} to {:?}", self.state.role, ss.raft_state
                );
                self.state.role = ss.raft_state;
            }
        }

        for msg in ready.take_messages() {
            match msg.msg_type() {
                eraftpb::MessageType::MsgAppend => {
                    self.tx_message.send(RaftMessage(msg)).unwrap();
                }
                eraftpb::MessageType::MsgHeartbeat => {
                    let peer = self.peers.get_peer(msg.to).unwrap();
                    std::thread::spawn(move || peer.tell(msg));
                }
                msg => {
                    error!(self.logger, "unimplemented {:?}", msg);
                    unimplemented!("{:?}", msg)
                }
            }
        }

        for msg in ready.take_persisted_messages() {
            match msg.msg_type() {
                eraftpb::MessageType::MsgRequestVote
                | eraftpb::MessageType::MsgRequestVoteResponse
                | eraftpb::MessageType::MsgHeartbeatResponse => {
                    let peer = self.peers.get_peer(msg.to).unwrap();
                    std::thread::spawn(move || peer.tell(msg));
                }
                msg => {
                    error!(self.logger, "unimplemented {:?}", msg);
                    unimplemented!("{:?}", msg)
                }
            }
        }

        self.node.advance(ready);
    }

    fn select_any(&self) {
        let mut select = Select::new();
        select.recv(&self.shutdown);
        select.recv(&self.rx_message);
        select.recv(&self.rx_api);
        select.recv(&self.tick);
        select.ready();
    }
}