datafuselabs / openraft

rust raft with improvements
Apache License 2.0
1.34k stars 147 forks source link

Non-blocking `RaftStateMachine::apply()` #1156

Open drmingdrmer opened 1 month ago

drmingdrmer commented 1 month ago

Similarly, any I/O triggered by the state machine currently must complete inline in apply(). Here, it might be also helpful to have some kind of callback to notify the caller about applied entries asynchronously.

But, do we need to report applied log ID to the caller at all? Especially for a storage which works with snapshots, the applied log ID is actually a lie - it is applied to the in-memory state, but not to the persistent state. Only a snapshot will really apply it. We just need to account for applied log ID somewhere in the sense that the same entry is not applied twice. With that, there is no need for I/O accounting of individual apply calls and the I/O can continue fully overlapped in the background even past the execution of apply(). Solely log I/O must be awaited for correctness.

In fact, in our implementation, the client response is a ZST, since we send the responses to the client already during apply via a channel directly to the client connection (reducing latency and allowing the client connection await the I/O, if it's necessary for some reason). This could be generalized, since we have a response receiver for the client abstracted out anyway - so instead of collecting results, sending them to another task and processing them there, we can also directly process them here (and send results to respective oneshot channels in the default impl). This would also simplify out applying_entries and also our special implementation for replies.

Originally posted by @schreter in https://github.com/datafuselabs/openraft/pull/1154#pullrequestreview-2161901546

Allows apply() to return before the IO completion, and when the IO completes, call the callback Callback.completed() to inform RaftCore about the result. Callback.complete() should also be responsible to send back the result to client.

/// `apply()` use this Callback to inform the client and RaftCore.
struct Callback {
    to_client: Option<Responder>,
    to_raft_core: Option<mpsc::Sender>, // Optional because RaftCore does not need to be informed for every log.
}
impl Callback{
    fn complete(self) {
      self.to_client.map(|x| x.send(...));
      self.to_raft_core.map(|x| x.send(...));
    }
}

pub trait RaftStateMachineV2<C>
{
    async fn apply<I>(&mut self, entries: I) -> Result<Vec<C::R>, StorageError<C>>
    where
        I: IntoIterator<Item = (C::Entry, Callback)> + OptionalSend,
        I::IntoIter: OptionalSend;
github-actions[bot] commented 1 month ago

👋 Thanks for opening this issue!

Get help or engage by:

schreter commented 1 month ago

I think that we actually don't need the callback.

Rather, we should pass the receiver of the client reply to the caller of apply() and as soon as apply() returns a result for an entry, directly push this result to Responder::send() (which is a oneshot-channel in the default impl).

We have two issues to solve:

The first one can be solved by a watch-like channel where the newest log ID accepted by quorum is published, so the SM task can loop on it.

The latter one is more problematic. Ideally, we'd have the Responder in our hands and simply push the result of apply() into it and/or push both entry and Responder to the apply(), so it can directly respond to the client w/o going through multiple tasks.

However, the Responder is not available in SM task as of now. Theoretically, it could be pushed into a channel before the replication, as soon as we get our hands on it in the core task. Then, the SM task would always have the next Responder to fill with reply on its hands at the time the quorum accepts an entry. I.e., the SM task would schematically do something like this:

async fn sm_task() -> Result<(), ...> {
    let mut current_log_id = ...;
    loop {
        let next_quorum_log_id = log_id.await;
        let stream = (current_log_id..next_quorum_log_id).iter().as_stream()
            .zip(log_reader.get_logs(current_log_id..next_quorum_log_id).await)
            .zip(responder_channel.take(next_quorum_log_id - current_log_id).await);
        sm.apply(stream).await?;
        current_log_id = next_quorum_log_id;
    }
}

Where SM::apply() would take a stream giving it log ID, log entry for this log ID and the responder into which to respond directly. Since the stream with Responders is pre-filled, there will be sufficient responders to satisfy all applied log IDs (and the same is true for the logs). Probably it could be rewritten as a single stream receiving data as-needed from the three sources and then it would be an endless stream. The only question is how to bring snapshot into it. Maybe as a virtual entry terminating the stream, so apply() returns, then start the snapshot, then resume applying.

What do you think?

drmingdrmer commented 1 month ago

Makes sense.

The callback in my proposed snippet could include a Responder to respond to the client and a channel sender to inform RaftCore about the applied log ID. This corresponds to the stream item in your snippet. Does this sound good?

struct Callback {
    to_client: Option<Responder>,
    to_raft_core: Option<mpsc::Sender>, // Optional because RaftCore does not need to be informed for every log.
}

pub trait RaftStateMachineV2<C> {
    async fn apply<I>(&mut self, entries: I) -> Result<Vec<C::R>, StorageError<C>>
    where
        I: IntoIterator<Item = (C::Entry, Callback)> + OptionalSend,
        I::IntoIter: OptionalSend;

Currently, the sm_task is channel-triggered. It watches for an input range of log IDs to apply or a command to build a snapshot:

https://github.com/datafuselabs/openraft/blob/97afacad151a03fa0be0613538077b6656ddf879/openraft/src/core/sm/worker.rs#L92-L94

This current approach seems sufficient for upgrading the state machine, as far as I know. A watch channel might miss some input messages because it only stores the last updated one.

This approach won't have the how to trigger applying further entries problem, right?

What say you?

schreter commented 1 month ago

The callback in my proposed snippet could include a Responder to respond to the client and a channel sender to inform RaftCore about the applied log ID. This corresponds to the stream item in your snippet. Does this sound good?

Sure, that's also OK. But, the applied log ID doesn't have to be sent for each entry processed in the apply(), so I'd keep it at Responder only and notify the RaftCore in batches in the caller of apply(). Also, the result then is a no-value Result, since replies have been sent already.

BTW, RaftCore could be in principle notified by a watch on the applied log ID, so we'd even spare the mpsc channel (though, it already exists, so it's probably not a problem).

But again, do we need to notify RaftCore at all? The entry with log ID is applied, but not guaranteed persisted in the state machine anyway. The RaftCore can send next batch of entries at any time. The only issue we need to handle is an error from the SM to stop RaftCore from further processing, right?

drmingdrmer commented 1 month ago

Sure, that's also OK. But, the applied log ID doesn't have to be sent for each entry processed in the apply(), so I'd keep it at Responder only and notify the RaftCore in batches in the caller of apply(). Also, the result then is a no-value Result, since replies have been sent already.

RaftCore could be notified only by the last applied entry in a batch. This is done by set all of the Callback.to_raft_core to None except the last one.

But again, do we need to notify RaftCore at all? The entry with log ID is applied, but not guaranteed persisted in the state machine anyway. The RaftCore can send next batch of entries at any time. The only issue we need to handle is an error from the SM to stop RaftCore from further processing, right?

Yes. RaftCore needs the last-applied-log-id in several scenarios:

As the usage of io_applied() shows where last-applied-log-id are used: https://github.com/datafuselabs/openraft/blob/f1792dce153fd592634076a6340bf07696a0c43f/openraft/src/raft_state/log_state_reader.rs#L57