databendlabs / openraft

rust raft with improvements
Apache License 2.0
1.41k stars 159 forks source link

Direction towards allocation-free operations #1184

Open schreter opened 4 months ago

schreter commented 4 months ago

Currently, openraft allocates at many places, which is a bit contraproductive, since any OOM will panic and tear down the state machine unnecessarily. We should look at possibilities how to operate in constant memory.

Basically, we have following parts/task that cooperate:

Now, all of these tasks basically operate on the same list of entries, which are either received from the client (on the leader) or replicated from the leader (on the follower).

The idea is to have one buffer which accepts logs, which then are pushed through the consensus (replication), log and state machine. This buffer could be also fixed-size, so it can accept all the Entry objects for the relevant logs and keep them until they are fully processed.

I.e., on the leader:

The API for respective user code should provide entries as a pair of an async Stream and a notification object, where the stream internally awaits the pointer change and then feeds the entries from the buffer until this pointer, then awaiting the next change. When the needful is done, the notification object is informed about processed entries (not necessarily for each single entry). This notification object could be a wrapper over watch::Sender or similar for the first implementation.

On the follower:

What about elections?

What happens upon leader change?

What about the snapshot?

What about purging the log?

So we have following state to watch:

All tasks read entries from the shared buffer up to accepted, only the raft core task adds entries to the buffer and updates accepted afterwards, so the Rust AXM rules are followed. This will likely need a bit of unsafe code to implement the buffer optimally.

The raft core task listens on multiple event sources:

Since the watch state for the core task is relatively large and frequently updated, I'd implement it a bit differently - push the most of the individual requests via atomic change to the state member (up to 16B can be updated in one go, which should be sufficient for most items) and a simple signalization semaphore to ensure the task is running or will wake up after the update. This is also true for other signalization - for example, signaling the log task needs two 8B values, so it can be also done atomically by a simple atomic write to the respective value followed by task wakeup.

With this, we have no unbounded channels or other unbounded allocations in the entire state in the openraft itself (except for leader change, where we restart replication tasks - that needs to be addressed separately). It's up to the user to implement the handling in an OOM-safe way.

I hope this all is somewhat understandable. We can start implementing it also piecewise - for example, first handle the local log writing and state machine update in this way (which can by default call legacy methods easily or alternatively we can provide a wrapper calling legacy interface to allow for a smooth transition). Then, continue with replication tasks.

Comments? Did I forget about something?

github-actions[bot] commented 4 months ago

👋 Thanks for opening this issue!

Get help or engage by:

drmingdrmer commented 4 months ago

I have not reviewed it thoroughly yet, but there are a few design aspects that conflict with the current codebase, AFAIK:

In general, a watch notification may not be suitable for the current architecture because watch does not guarantee the exact order of events. For example, if Openraft updates submitted before actually submitting IO to RaftLogStorage, it is possible that the watch::Receiver notifies the user before the IO is submitted. Conversely, if IO is submitted before updating submitted, the callback to RaftLogStorage::append(..., callback) might be called before submitted is updated.

  • The idea is to have one buffer which accepts logs, which then are pushed through the consensus (replication), log and state machine. This buffer could be also fixed-size, so it can accept all the Entry objects for the relevant logs and keep them until they are fully processed.

It is not feasible to keep Entry objects until they are fully processed. If a follower is offline, the Leader should not retain the Entries that are not sent to the Follower but release the memory as soon as the Entry is committed. Further replication should read the log entries from the storage.

I.e., on the leader: replication tasks watch accepted pointer as well and send entries to the respective follower

There is a problem here because watch may yield an obsolete value. When the replication task receives an accepted value like Leader-2, log-id-3, the buffer may have been overwritten by a new leader such as Leader-3, log-id-3' (a different log-id-3). The replication task may read changed log entries. That’s why in the current implementation, the logs for replication are determined by RaftCore.

  • the core task updates to_apply pointer to the minimum of flushed and committed pointer (we cannot apply locally until the local log was written)

A log entry that is submitted is visible to RaftLogReader, thus the state machine should be able to apply a submitted but not yet flushed log entry. Why does the state machine need the log entry to be flushed before applying it?

On the follower:

  • for each incoming log entry, the entry is pushed to openraft into the same buffer as the client write would put it (unless it's already there or known to be committed - when the leader retries after disconnect/reconnect)

The follower's buffer may be truncated if the Leader switches. Such an approach requires the follower API to handle log truncation directly, which may be somewhat counterintuitive.

schreter commented 4 months ago

In general, a watch notification may not be suitable for the current architecture because watch does not guarantee the exact order of events.

I'm using watch as a placeholder. Nevertheless, I'm pretty sure it should be suitable in general (though it has its own problems). When you post a value, this value will be definitely received by the receiver. There will be no missed reads. Of course, posting a new value overwrites the previous one, but that's OK, we just want to advance pointers.

Also, submitted is probably a wrong name here. OTOH, what's wrong with notifying replication tasks to replicate a log entry which is NOT yet submitted into the local log? The only thing which needs to wait both for the quorum and for the local log persistence is apply, since otherwise we'd have an inconsistent state machine at restart. But maybe we mean different things here?

It is not feasible to keep Entry objects until they are fully processed. If a follower is offline, the Leader should not retain the Entries that are not sent to the Follower but release the memory as soon as the Entry is committed.

Agreed. But, that's also not a problem, IMHO. The entries don't strictly need to be kept longer than they are today. Today we also keep entries around until they are replicated, unless the follower is lagging too much (at the latest, the log implementation does that, since otherwise it would be very inefficient). If the entries are gone they can still be loaded from the log. Anyway, barring offline followers, the entries are processed rather quickly.

Another option is to simply delegate the entry buffer to the log implementation, basically what we have today. Then, the log implementation can force the slow down of the processing by simply blocking append to the log (and thus overflowing the bounded channel directing requests to the log, which would block further requests). I didn't think about exact design here, though.

My point is, we don't need to collect batches of entries in vectors, we just need to update pointers to the log/entry buffer to instruct the state machine and/or replication what to do. And, if we just pass it the LogReader, they can use the LogReader directly to do their own batching (or short-cut the LogReader and directly access the application-specific log in a more efficient way).

There is a problem here because watch may yield an obsolete value. [...]. The replication task may read changed log entries.

Correct. That's a problem which needs to be solved. A trivial solution I outlined above is simply stopping the replication task (closing the channel) before writing further log after leader change. This guarantees that the watch::Receiver after closing the channel will return with an error and the task can safely end itself. Joining the replication task handle afterwards guarantees that no overwritten entries will be read (since they will only be overwritten after joining it).

However, this would require awaiting joining of the replication task in raft core, which is contraproductive. Your point above with the log entry buffer being the problem is taken. So if we use the log as we use it today, the replication task would try to read the log, but the log was already asked to truncate itself. So the user-specific log implementation needs to resolve this properly (as it is today, no change in log storage concepts).

A log entry that is submitted is visible to RaftLogReader, thus the state machine should be able to apply a submitted but not yet flushed log entry. Why does the state machine need the log entry to be flushed before applying it?

Strictly speaking, it does not. But the problem arises if the process is killed after applying the entry to the state machine, but before flushing the log entry. Restarting the process will find applied log ID > flushed log ID, which might lead to issues. Isn't there also somewhere an assertion regarding this in openraft?

The follower's buffer may be truncated if the Leader switches. Such an approach requires the follower API to handle log truncation directly, which may be somewhat counterintuitive.

Why? If the leader dies, all followers either have the newest (uncommitted) entries or do not. The followers then hold an election and the one having the longest log will win and replicate it to others. So there is no truncation on the follower.

When the original leader rejoins (as a follower) and it has yet another (uncommitted, but flushed) log entry, which has NOT been replicated yet (and thus is unknown to all former followers), then this node needs to truncate the log. This will be necessary no matter what, it must be done also today.

Or am I missing something here?

Thanks.


Based on the discussion so far, I'd modify the approach outlined above by removing the explicit entry buffer and delegating it to the log implementation. I.e., the log implementation is responsible for any optimizations of reading log items and ideally keeping them in memory for sufficient time.

Still, the communication between tasks would be modified from unbounded channels to simply communicating pointers/simple state over watch-like channels (not necessarily the "standard" watch - we can optimize here) and/or bounded channels (e.g., for client_write).

drmingdrmer commented 4 months ago

I'm using watch as a placeholder. Nevertheless, I'm pretty sure it should be suitable in general (though it has its own problems). When you post a value, this value will be definitely received by the receiver. There will be no missed reads. Of course, posting a new value overwrites the previous one, but that's OK, we just want to advance pointers.

The problem is that after setting the submitted watch::Sender, you can not prevent the consumer(such as log replicator) from seeing it. But at this time point, the IO may not have been actually submitted yet. It is not about missing a message.

Also, submitted is probably a wrong name here. OTOH, what's wrong with notifying replication tasks to replicate a log entry which is NOT yet submitted into the local log? The only thing which needs to wait both for the quorum and for the local log persistence is apply, since otherwise we'd have an inconsistent state machine at restart. But maybe we mean different things here?

I'm talking about the current Openraft implementation. logs that are submitted to RaftLogStorage are visible via LogReader. accepted means the log entries are stored in memory, not yet reach RaftLogStorage.

This is because Openraft aims to separate the logic from the IO operations. When input messages are received, it simply places the IO commands into the output queue instead of executing them immediately. This approach allows for the reordering or batching of IO commands, enabling more efficient processing.

When an IO command is placed into the output queue, it is considered accepted(not visible to reader yet). When the command is executed, such as when commands are pushed to the RaftLogStorage, it is considered submitted(become visible to reader). Finally, when the callback is invoked, the IO is considered flushed(always visible upon restart).

drmingdrmer commented 4 months ago

It is not feasible to keep Entry objects until they are fully processed. If a follower is offline, the Leader should not retain the Entries that are not sent to the Follower but release the memory as soon as the Entry is committed.

Agreed. But, that's also not a problem, IMHO. The entries don't strictly need to be kept longer than they are today. Today we also keep entries around until they are replicated, unless the follower is lagging too much (at the latest, the log implementation does that, since otherwise it would be very inefficient). If the entries are gone they can still be loaded from the log. Anyway, barring offline followers, the entries are processed rather quickly.

Another option is to simply delegate the entry buffer to the log implementation, basically what we have today. Then, the log implementation can force the slow down of the processing by simply blocking append to the log (and thus overflowing the bounded channel directing requests to the log, which would block further requests). I didn't think about exact design here, though.

Yes the current implementation is using RaftLogStorage as log entries buffer. This way the application is able to manage memory for committed log entries and choose when to evict log entries in case there is a slow Follower.

My point is, we don't need to collect batches of entries in vectors, we just need to update pointers to the log/entry buffer to instruct the state machine and/or replication what to do. And, if we just pass it the LogReader, they can use the LogReader directly to do their own batching (or short-cut the LogReader and directly access the application-specific log in a more efficient way).

This is almost(exactly?) same as the way Openraft does it now.

drmingdrmer commented 4 months ago

There is a problem here because watch may yield an obsolete value. [...]. The replication task may read changed log entries.

Correct. That's a problem which needs to be solved. A trivial solution I outlined above is simply stopping the replication task (closing the channel) before writing further log after leader change. This guarantees that the watch::Receiver after closing the channel will return with an error and the task can safely end itself. Joining the replication task handle afterwards guarantees that no overwritten entries will be read (since they will only be overwritten after joining it).

However, this would require awaiting joining of the replication task in raft core, which is contraproductive. Your point above with the log entry buffer being the problem is taken. So if we use the log as we use it today, the replication task would try to read the log, but the log was already asked to truncate itself. So the user-specific log implementation needs to resolve this properly (as it is today, no change in log storage concepts).

This is the way Openraft does. The problem is that joining then rebuilding replication streams introduces a small delay.

One solution is to let RaftLogReader::try_get_log_entries() to return a stream of tuple of (Vote, Entry). The Vote is the vote value stored in RaftLogStorage when the Entry is returned. This way a Leadership change event can be perceived by replication task. When the Vote changes, the replication task should quit at once.

drmingdrmer commented 4 months ago

A log entry that is submitted is visible to RaftLogReader, thus the state machine should be able to apply a submitted but not yet flushed log entry. Why does the state machine need the log entry to be flushed before applying it?

Strictly speaking, it does not. But the problem arises if the process is killed after applying the entry to the state machine, but before flushing the log entry. Restarting the process will find applied log ID > flushed log ID, which might lead to issues. Isn't there also somewhere an assertion regarding this in openraft?

In the current code base, it is possible for the applied log ID to be greater than the flushed log ID. For example, when a snapshot is installed, the state machine is entirely replaced, and the last log ID may lag behind the snapshot's last log ID.

Openraft has already dealt with these cases.

drmingdrmer commented 4 months ago

The follower's buffer may be truncated if the Leader switches. Such an approach requires the follower API to handle log truncation directly, which may be somewhat counterintuitive.

Why? If the leader dies, all followers either have the newest (uncommitted) entries or do not. The followers then hold an election and the one having the longest log will win and replicate it to others. So there is no truncation on the follower.

When the original leader rejoins (as a follower) and it has yet another (uncommitted, but flushed) log entry, which has NOT been replicated yet (and thus is unknown to all former followers), then this node needs to truncate the log. This will be necessary no matter what, it must be done also today.

What I mean is that using a log entries buffer requires the log to be truncated at a very early stage. Because the log entries can only be put into the buffer if there is room for it. But truncating logs is done by RaftLogStorage::truncate, which means you have to actually execute truncate() at once. But with the current implementation, there may be pending IO such as RaftLogStorage::append() in queue waiting to be execute.

schreter commented 4 months ago

Oh, a lot of comments...

The problem is that after setting the submitted watch::Sender, you can not prevent the consumer(such as log replicator) from seeing it. But at this time point, the IO may not have been actually submitted yet. It is not about missing a message.

Sure, we can only inform the consumer after the entry is actually written into the log/log buffer. As I mentioned, I think the term I used is wrong (accepted/submitted).

This is because Openraft aims to separate the logic from the IO operations.

That's right. And my intention is to optimize it :-).

When input messages are received, it simply places the IO commands into the output queue instead of executing them immediately. This approach allows for the reordering or batching of IO commands, enabling more efficient processing.

and

This is almost(exactly?) same as the way Openraft does it now.

Correct. But these queues are unbounded, so it's a problem to run in constant memory. Also, the batching is at the moment NOT at the storage/log level, but rather at the task driving the storage/log and the batches constructed by openraft are then sent to the user-specific code. This on one side allocates and adds potential latency and on the other side makes optimization much more complex.

We have multiple use cases here. Let's look first at the very simplest, which is the storage implementation doing apply. Currently, openraft sends the entries to apply read from the log to the user-specific code, then awaits it/potentially awaits async I/O via callbacks. This works in principle, but it's ineffective.

My suggestion is NOT to pass entries, but rather to only pass the log pointer (committed log ID) to the user and consume the log pointer (applied log ID) from the user. I.e., there are no "commands", but rather there is a simple watch on the committed log ID wrapped suitably. Let the user code deal with reading what is necessary from the log (current APIs are all allocating ones; streaming might help).

Also, let the user code apply the entries read from the log (most likely from memory) and then just announce applied log ID to openraft via watch-like channel (i.e., updating applied log ID).

Similarly, for snapshots, the watch on committed log ID could also carry a flag/counter requesting a snapshot. No additional channel/no command channel. When the snapshot is done, again, it is simply announced to the Raft core via a watch-like channel.

With that, on the one hand, the storage implementation has the full flexibility to implement any I/O optimizations, etc., and the communication between Raft core and the storage task is in constant memory.

The replication tasks are the same in green, at least for append entries.

One solution is to let RaftLogReader::try_get_log_entries() to return a stream of tuple of (Vote, Entry). The Vote is the vote value stored in RaftLogStorage when the Entry is returned. This way a Leadership change event can be perceived by replication task. When the Vote changes, the replication task should quit at once.

Yes, that's a possibility. But, term would be sufficient for "regular" Raft (i.e., CommittedLeaderId for the general case). And, this is already part of LogId, which is read from the log and member of Entry. So no additional information is necessary, it's already there.

What I mean is that using a log entries buffer requires the log to be truncated at a very early stage. Because the log entries can only be put into the buffer if there is room for it. But truncating logs is done by RaftLogStorage::truncate, which means you have to actually execute truncate() at once. But with the current implementation, there may be pending IO such as RaftLogStorage::append() in queue waiting to be execute.

OK, as mentioned, we can simply go with log storage as the buffer (as it is today in openraft). Then, we don't need a limited buffer and from the PoV of openraft the buffer is unlimited. It is then up to the application to do admission control somewhere (or openraft needs to add admission control).


Where I don't have a good idea yet is how to optimize it with constant memory is the communication from the client, through the raft core task to the log writer. Communication from the client to the raft core task can be done using bounded buffer, but that helps only partially. Writing to the log is still unbounded and can overflow and/or block the raft core task if we'd use a bounded command buffer from raft core to log writer (which is a no-go). The same holds for append entries on the follower.

I was thinking about short-cutting this to have only one bounded queue from client write/append entries directly to the log write task and log write task informing the raft core about accepted entries. Since we anyway need to deal with election and truncating the log, the log write task could simply poll both the entry stream from the user (either client write or append entries) and the watch from the raft core task informing it about new election point (via the truncation LogId, which contains a new CommittedLeaderId).

When the watch returns new leader elected, then the log writing task would simply truncate the log to the log index from the new leader. Depending on the state (we are the leader or not), from now on, only entries received from the correct source (client write or append log) would be accepted, the other ones redirected to leader/discarded.

Again, this concept would allow running in constant memory.

There are still some other things like voting, but those can be also handled in constant memory fairly easily.

drmingdrmer commented 4 months ago

Correct. But these queues are unbounded, so it's a problem to run in constant memory. Also, the batching is at the moment NOT at the storage/log level, but rather at the task driving the storage/log and the batches constructed by openraft are then sent to the user-specific code. This on one side allocates and adds potential latency and on the other side makes optimization much more complex.

Aggree.

We have multiple use cases here. Let's look first at the very simplest, which is the storage implementation doing apply. Currently, openraft sends the entries to apply read from the log to the user-specific code, then awaits it/potentially awaits async I/O via callbacks. This works in principle, but it's ineffective.

My suggestion is NOT to pass entries, but rather to only pass the log pointer (committed log ID) to the user and consume the log pointer (applied log ID) from the user. I.e., there are no "commands", but rather there is a simple watch on the committed log ID wrapped suitably. Let the user code deal with reading what is necessary from the log (current APIs are all allocating ones; streaming might help).

It's acceptable to pass a log pointer to the StateMachine instead of log entries. However, pushing commands makes it easier to manage multiple types of operations, such as applying log entries, building a snapshot, or retrieving a snapshot. Some of these commands may require a strict order. (I'm not quite sure).

Generally speaking, the watch-driven architecture could work, but there are quite a few order-related details to consider.

I'm going to rewrite the part that SM responds apply result to RaftCore with a watcher.

Also, let the user code apply the entries read from the log (most likely from memory) and then just announce applied log ID to openraft via watch-like channel (i.e., updating applied log ID).

Good.

Similarly, for snapshots, the watch on committed log ID could also carry a flag/counter requesting a snapshot. No additional channel/no command channel. When the snapshot is done, again, it is simply announced to the Raft core via a watch-like channel.

With that, on the one hand, the storage implementation has the full flexibility to implement any I/O optimizations, etc., and the communication between Raft core and the storage task is in constant memory.

Good.

The replication tasks are the same in green, at least for append entries.

Good.

Yes, that's a possibility. But, term would be sufficient for "regular" Raft (i.e., CommittedLeaderId for the general case). And, this is already part of LogId, which is read from the log and member of Entry. So no additional information is necessary, it's already there.

The Vote(or term in Raft) is necessary. I have made a short explanation on it: https://github.com/datafuselabs/openraft/blob/ee460f376561a3a36bbd11244aa5846b207345d3/openraft/src/raft_state/io_state/log_io_id.rs#L8-L23

OK, as mentioned, we can simply go with log storage as the buffer (as it is today in openraft). Then, we don't need a limited buffer and from the PoV of openraft the buffer is unlimited. It is then up to the application to do admission control somewhere (or openraft needs to add admission control).

Good. This should be feasible when all of the RaftLogStorage methods are upgraded to callback based.

Where I don't have a good idea yet is how to optimize it with constant memory is the communication from the client, through the raft core task to the log writer. Communication from the client to the raft core task can be done using bounded buffer, but that helps only partially. Writing to the log is still unbounded and can overflow and/or block the raft core task if we'd use a bounded command buffer from raft core to log writer (which is a no-go). The same holds for append entries on the follower.

It is indeed a problem unless RaftLogStorage::append() returns an error indicating the buffer is full, allowing RaftCore to refuse a client write request.

I was thinking about short-cutting this to have only one bounded queue from client write/append entries directly to the log write task and log write task informing the raft core about accepted entries. Since we anyway need to deal with election and truncating the log, the log write task could simply poll both the entry stream from the user (either client write or append entries) and the watch from the raft core task informing it about new election point (via the truncation LogId, which contains a new CommittedLeaderId).

Yes a exclusive single client would be solution.

When the watch returns new leader elected, then the log writing task would simply truncate the log to the log index from the new leader. Depending on the state (we are the leader or not), from now on, only entries received from the correct source (client write or append log) would be accepted, the other ones redirected to leader/discarded.

Again, this concept would allow running in constant memory.

There are still some other things like voting, but those can be also handled in constant memory fairly easily.

Make sense.

schreter commented 4 months ago

It's acceptable to pass a log pointer to the StateMachine instead of log entries. However, pushing commands makes it easier to manage multiple types of operations, such as applying log entries, building a snapshot, or retrieving a snapshot.

Right, pushing command does have it's advantages :-). But, it can likely be handled by a different means.

Some of these commands may require a strict order. (I'm not quite sure).

Well, instead of pushing individual commands, one can send the required state. This has also a huge advantage that it would "cancel" commands which do not make sense to be executed anymore. So in the end, it likely is a win-win scenario.

We just need to ensure watch is implemented optimally for partial state updates (but I can take care of that, we can reuse stuff from our project).

Generally speaking, the watch-driven architecture could work, but there are quite a few order-related details to consider.

Oh yes, I know.

For one, the current architecture requires passing a Responder to send the client reply. This channel is of course not in the log. In our project, it's not a big deal, since the reply to the client is done anyway directly on the state machine level, not sent back via two tasks via openraft (this is correct, since only replies to committed commands are sent and any commands re-executed after restart will send same replies - we have a reply cache).

Possibly, some similar concept could be used here. For example, do not send Entry + Responder, but rather embed the Responder in the Entry, i.e., add Entry::request_completed(self, response: WriteResult), which would do the needful. This can be also mapped to the current Responder, of course. For entries read from the log, this Responder would be either None or an empty one, as the application needs. And, it is the job of the application not to lose the Responder for not yet applied operations.

schreter commented 4 months ago

Ah, forgot to comment on this one:

The Vote (or term in Raft) is necessary.

Yes, but what is a Vote? It is a term or a term + leader ID. Which is exactly what we have in current LogId, isn't it?

drmingdrmer commented 4 months ago

Ah, forgot to comment on this one:

The Vote (or term in Raft) is necessary.

Yes, but what is a Vote? It is a term or a term + leader ID. Which is exactly what we have in current LogId, isn't it?

The log id has the id of the Leader that proposed this log. Not the one that replicates this log.

A log can be proposed by Leader-1 and replicated by Leader-2.

LogId(term, node-id, index) is not monotonic to RaftLogStorage::append(): the same log id could be appended more than once. Thus LogId can not be used as an IO pointer.

An IO pointer must be monotonic increasing, (LeaderVote, LogId) is monotonic increasing. The LeaderVote in it represents the Leader that writes the log to local storage or replicates the log to a remote node.

Thus a complete IOId is defined as (WritingLeader(term,node_id), LogId(term,node_id, index). For example a valid IOId could be: WritingLeader(term=3,node_id=2), LogId(term=2,node_id=1,index). And the IOId.WritingLeader is always greater than or equal IOId.LogId.leader_id.

schreter commented 4 months ago

Thus a complete IOId is defined as (WritingLeader(term,node_id), LogId(term,node_id, index).

OK, thanks.

The problem is, if we push the log to the writer via short circuit, the Vote is unknown. I will have to ponder it a bit later, now the work calls (meetings :-( ).

schreter commented 4 months ago

So, back to the topic...

LogId(term, node-id, index) is not monotonic to RaftLogStorage::append(): the same log id could be appended more than once. Thus LogId can not be used as an IO pointer.

I don't quite get it. Yes, the same index can be appended more than once, with different term, if the leader changes and the log is thus truncated (or in case of the optimization, instead of term it would be CommittedLeaderId which subsumes term + node ID).

Thus a complete IOId is defined as (WritingLeader(term,node_id), LogId(term,node_id, index).

Yes, but how can the writing leader/Vote term differ from the one in LogId, except for lower indices replicated by the leader for log entries committed before the writing leader became the leader? For this case, all these entries with an older term must be already committed, so they will never ever be truncated out again.

We can only truncate that portion of the log, which was written after we became leader (and managed to persist something in the log, which isn't committed by the quorum and will be truncated out when we become follower).

Basically, a truncate request is always a new LogId with a new term and a lower index. From this point on, any committed entries will have this term (which is higher than any term written previously for any index past this point). Even if this new leader appends something and is out-voted later by yet higher term, this still holds.

So I believe it's sufficient to work like this:

For the optimized leader election, replace term with CommittedLeaderId.

Am I missing something?

drmingdrmer commented 4 months ago

Let me take an example to explain what could happen to a certain log id:

Scenario: LogId Truncated and Appended Multiple Times

Consider a scenario where a specific LogId is truncated and appended more than once.

The following discussion is in Raft terminology, a term and LogId are represented as (term, index).

Ni: Node
i-j: log with term i and index j

N1 | 1-1 1-2
N2 | 1-1 1-2
N3 | 3-1
N4 | 4-1
N5 |
N6 |
N7 |
-------------------------> log index

Given the initial cluster state built with the following steps:

  1. N1 as Leader:
    • N1 established itself as the leader with a quorum of N1, N5, N6, N7.
    • N1 appended logs 1-1 and 1-2, replicating only to N2.
  2. N3 as Leader:
    • N3 became the leader with a quorum of N3, N5, N6, N7.
    • N3 appended log 3-1 but failed to replicate any logs to other nodes.
  3. N4 as Leader:
    • N4 became the leader with a quorum of N4, N5, N6, N7.
    • N4 appended log 4-1 but also failed to replicate any logs.

As a result, N1's log will be truncated and appended multiple times:

This scenario can repeat, where a log entry with the same LogId is truncated and appended multiple times if there are enough nodes in the cluster.

However, it's important to note that a specific LogId can only be truncated and appended by a leader with a higher term.

Therefore, the pointer for an IO operation must be represented as term, LogId(term, index).

schreter commented 4 months ago

Therefore, the pointer for an IO operation must be represented as term, LogId(term, index).

Uff, quite a complex example :-). But I understand what you mean. Nevertheless, it should be fairly easy to implement it as well. Basically, we have two sources feeding entries into the log writer:

In both cases, at the time the entry would be put to the log buffer the Vote is known (leader term for simplicity). So we can indeed send the entry to the (bounded) log write channel identified as (term, log_id), where the term is the one of the known leader.

The log write task would truncate the log to the log_id upon seeing a higher term and ignore entries with older term. With that, it's still simple enough and can be run in constant memory.

Or am I still missing something?

drmingdrmer commented 4 months ago

It looks like a working approach. But I can not say for sure for now. It involves too many modification to the codebase.

Can you give some of the top level API signature, such as for Follower to accept AppendEntries.

schreter commented 4 months ago

Sorry for the delay. First, let's look at replication. My suggestion would be on these lines:

trait RaftNetworkFactory<C> {
    /// Create replication stream streaming log entries to a remote node.
    fn new_replication_stream(
        &mut self,
        target: C::NodeId,
        node: C::Node
) -> impl RaftReplicationStream {
    DefaultReplicationStream(self.new_client(target, node));
}

/// State of the replication for a replication stream.
trait RaftReplicationState {
    /// Get the next target log ID to replicate up to, the current commit ID and the current Vote to replicate to the follower.
    async fn next_request(&self) -> Result<(LogId, LogId, Vote), ReplicationClosed>;

    /// Report progress of the replication to the leader.
    fn report_progress(&self, flushed_up_to: LogId) -> Result<(), ReplicationClosed>;
}

/// Replication stream.
trait RaftReplicationStream {
    /// Run the replication loop.
    /// 
    /// Read entries from the `log_reader` and send them to the `target`.
    /// Use `state` object to get the new log ID to replicate to and to report progress.
    async fn replicate(
        self,
        log_reader: RaftLogReader,
        start_log_id: LogId,
        state: impl RaftReplicationState,
    ) -> Result<(), ConflictOrHigherVoteOrNetworkError>;
}

where DefaultReplicationStream could be used to wrap current append entries calls for compatibility, if needed.

The Network would create RaftReplicationStream, which is then subsequently run in a separate task. It is not openraft's problem how the entries are packetized, the implementation simply calls next_request() to get the target replication state, which internally either reads the current state from watch channel or waits on a watch channel for a new state. The leader state loop simply advances the state stored in the watch channel and optionally sets a timeout (but that could be probably also delegated to the stream itself).

When some entries are replicated and logged remotely, the remote node sends a confirmation in application-specific way, which will result in either reporting progress or by reporting an error (conflict/higher vote). In case of error, there is no way for replication to continue, so the loop ends and the "right" error is returned. Upon returning, the driver of the replication in the replication task would inform the raft core and/or restart the replication as needed.

Reporting the progress would just entail setting a new value for the watch, so the state loop would then update its state appropriately based on state change of the replication.

No memory for actual data channels and no entry copying is involved here (aside from reading from the log, but the application should optimize that).

schreter commented 4 months ago

Now, for RaftStateMachine, we can solve it in a very similar way:

trait RaftStateMachine {
     fn new_apply_stream(&mut self) -> impl RaftApplyStream;
}

/// State for applying committed entries.
trait RaftApplyState: 'static {
    /// Get the next committed log ID to apply up to.
    async fn next_request(&self) -> Result<LogId, RaftStopped>;

    /// Report apply progress.
    fn report_progress(last_applied: LogId) -> Result<(), RaftStopped>;
}

// Entry apply stream.
trait RaftApplyStream: 'static {
    /// Run the apply loop.
    /// 
    /// Read entries from the `log_reader` and apply them to the state machine.
    /// Use `state` object to get the new log ID to apply up to to and to report progress.
    async fn apply(
        self,
        log_reader: RaftLogReader,
        start_log_id: LogId,
        state: impl RaftApplyState,
    ) -> Result<(), StorageError>;
}

The apply stream is started at Raft startup in a new task (or runs in the storage task interleaved with other processing as needed). It uses the state object to await the next committed log ID on a watch channel and report apply progress to another watch channel, so raft core can update its state.

In case of error, the apply loop terminates and the Raft stops.

Same as for replication, there are no actual data moved between tasks, just the watches are read/set and no extra memory needs to be allocated. It is again not openraft's responsibility to batch entries, it's on application level (and it can optimize that).

Same as for replication, it would be possible to write a default implementation which simply calls legacy apply() on the state machine.

What is different here, we need to find a way to send responses to the client. I didn't think this through yet, how to post response objects needed for reporting replies w/o the use of an extra channel. Probably it will require moving reporting the response to a callback on the Entry or so.

drmingdrmer commented 4 months ago
trait RaftNetworkFactory<C> {
    /// Create replication stream streaming log entries to a remote node.
    fn new_replication_stream(
        &mut self,
        target: C::NodeId,
        node: C::Node
) -> impl RaftReplicationStream {
    DefaultReplicationStream(self.new_client(target, node));
}

/// State of the replication for a replication stream.
trait RaftReplicationState {
    /// Get the next target log ID to replicate up to, the current commit ID and the current Vote to replicate to the follower.
    async fn next_request(&self) -> Result<(LogId, LogId, Vote), ReplicationClosed>;

    /// Report progress of the replication to the leader.
    fn report_progress(&self, flushed_up_to: LogId) -> Result<(), ReplicationClosed>;
}

/// Replication stream.
trait RaftReplicationStream {
    /// Run the replication loop.
    /// 
    /// Read entries from the `log_reader` and send them to the `target`.
    /// Use `state` object to get the new log ID to replicate to and to report progress.
    async fn replicate(
        self,
        log_reader: RaftLogReader,
        start_log_id: LogId,
        state: impl RaftReplicationState,
    ) -> Result<(), ConflictOrHigherVoteOrNetworkError>;
}

What about snapshot? When RaftReplicationStream receives the next_request, part of the log entries may have already purged. In such case RaftLogReader would return an StorageError. The current approach is to switch to snapshot replication and during this period, log entries replication is paused.

IMHO, there should be a wrapper like ReplicationCore in the current codebase handling log/snapshot replication switch.

To me the major changes include two parts: replace channel based communication with watch. What about the report_progress() implementation? I think it should still be a fixed size channel.

There are a few related detail to be consider. I can not tell if it is totally a working pattern. But the first thing I'm sure that can be done is:

The data flow from RaftLogReader to RaftReplicationStream requires further refinement because during streaming log entries the Vote in the storage may have changed and when this happens, the stream must be closed at once.

drmingdrmer commented 4 months ago

Now, for RaftStateMachine, we can solve it in a very similar way:

trait RaftStateMachine {
     fn new_apply_stream(&mut self) -> impl RaftApplyStream;
}

/// State for applying committed entries.
trait RaftApplyState: 'static {
    /// Get the next committed log ID to apply up to.
    async fn next_request(&self) -> Result<LogId, RaftStopped>;

    /// Report apply progress.
    fn report_progress(last_applied: LogId) -> Result<(), RaftStopped>;
}

// Entry apply stream.
trait RaftApplyStream: 'static {
    /// Run the apply loop.
    /// 
    /// Read entries from the `log_reader` and apply them to the state machine.
    /// Use `state` object to get the new log ID to apply up to to and to report progress.
    async fn apply(
        self,
        log_reader: RaftLogReader,
        start_log_id: LogId,
        state: impl RaftApplyState,
    ) -> Result<(), StorageError>;
}

One minor issues is that report_progress should respond the result(RaftTypeConfig::R) of applying an entries.

The RaftApplyStream may not be able to be a long term task. There are other command such InstallSnapshot that also requires a mutable reference to the state machine. If RaftApplyStream does not hold a &mut SM, it must hold exclusive lock to the StateMachine, which may not be an optimal approach.