rabbitmq / ra

A Raft implementation for Erlang and Elixir that strives to be efficient and make it easier to use multiple Raft clusters in a single system.
798 stars 93 forks source link

Log compaction for hybrid state machines #440

Open kjnilsson opened 1 month ago

kjnilsson commented 1 month ago

Log compaction for hybrid state machines


A hybrid state machine is a state machine that keeps some of its state in memory and some of its state, typically payload data, in the Raft log. RabbitMQ quorum queues is an example of a hybrid state machine that relies on fifo-ish consumption patterns in order to use standard (ish) snapshotting and log truncation.

Combined with the checkpoint feature which allows the state machine to store a snapshot of its (typically light) state on disk without truncating the log at that index we can consider additional options for reducing the data stored by the log without a full truncation. Checkpoints allow hybrid state machines to recover quickly after a restart as they do not need to restore the state from the last snapshot, only from the last checkpoint. This means that the log between last snapshot and the latest checkpoint will, during normal operation, only be read on demand to read a payload value. As we will never need every single entry in this section of the log to deterministically recover the log this part of the log can thus be compacted towards the state where it only contain indexes with payload data that is currently referenced by the latest checkpoint.

A new ra_machine callback that returns a map of all live indexes will be used for compaction calculations.

The log before the last checkpoint will be referred to as the "compacting area" and the log after the last snapshot/checkpoint will be referred to as the "active log". The compacting area of the log is effectively a k/v store where dead entries eventually will be deleted and space reclaimed.

Another way to look at it is that the snapshot is the checkpoint + all live entries in the compacting area. :exploding_head:

Traditional snapshotting (that includes the full state of the state machine) can be achieved by writing a checkpoint and return no live indexes from the new ra_machine callback.

With these generalisations in mind there will be no checkpoints, at least not multiple ones as in the current implementation. Instead there will be a new definition of a snapshot as a checkpoint + live entries in the compacting area of the log.

A compacting log would not take "traditional" snapshots by emitting release_cursors, only checkpoints would be written.

Checkpoints are taken by the Ra process based on entries, time and log size, not initiated by the ra_machine. Once a checkpoint is written and fsynced prior checkpoints will no longer be needed (as there is no promotion to snapshot anymore) and thus will also be deleted.

Follower snapshot replication

This new definition of a snapshot as a checkpoint + live entries in the compacting area of the log complicates the snapshot replication / installation part of the code as it is no longer just necessary to replicate the latest checkpoint but also ensure that the follower has all live entires in the compacting area.

The case where a new follower is added and / or an existing follower will need a full log sync can be extended as follows: The snapshot sender process that is spawned could after replicating the checkpoint then continue to replicate all segments whole to complete the new snapshot state. After that point normal log replication could take over.

However, a follower that just ended up a bit behind may already have many of the live entries and only need some replicated. In this case we will need to negotiate which entries in the compacting area the follower needs. This could be done by first replicating the checkpoint then the follower replying with the indexes it needs (by comparing the indexes in the checkpoint with its own local log state) and then having these replicated on a per entry basis.

To avoid concurrent segment replication and compaction we can move from using transient processes for this work to a single companion worker process that every Ra member has that does all compaction and snapshot replication work. Testing will need to be done to see if this will provide sufficient parallelism.

Impact on segment writer process

The segment writer process is responsible for flushing entries in the mem tables to segments. Currently it is designed to skip any entries with indexes lower than the last written snapshot. As checkpoints can be (and should to reduce write amplifiction) written before any segments have been written with the new approach it will need to inspect the latest checkpoint and use the set of live indexes to ensure it includes all relevant entries but avoid those that are already dead.

Some coordination with the compaction process will no doubt also be needed to avoid compaction and segment writing occurring at the same time.


Compaction should be triggered every time a new checkpoint is written. It is performed in phases in order of efficiency (most efficient first) until some configured ratio of live / vs dead entries has been achieved. This ratio should ideally be byte based but this may require expensive scanning to work out the total size of all live entries or require the state machine to somehow track the approximate size of all live entries in addition to their indexes which may not be practical in all cases. At a first attempt we should a simpler entry based approach where we allow ratio of dead vs live entries, e.g for every 100 live entries we would allow 30 dead entries to remain in the log.

Compaction Phases:

  1. Delete whole segments. Scan the list of segments to find any segments that contain no live indexes and delete these. This is cheap and effective but may leave many dead entries in the log. For checkpoints that return no live entries in the compacting area this becomes a simple cutoff job where all segments that only contain entries with a lower index than the last checkoint will simple be deleted.
  2. Delete trailing data in segment. Because all data portions of a segment file is kept at the end of the file dead data at the end of the file could be truncated. This is also relatively cheap as it only requires a truncate system call to reclaim the space.
  3. Compact multiple segments with few live indexes into fewer segments. This option both has high read and write amplification and will transiently consume more disk space and thus should only be used when necessary. It also requires coordination with any log readers (such as the main Ra process) to avoid deleting a segment where all entries have been written to another segment but that the log readers may want to read.

There is also a 2b option where data within a segment could possibly be moved around to provide more compaction but moving data is always tricky as it would invalidate any in memory caches of the segment index that the Ra process may currently have if it recently read an entry from this segment.

ikavgo commented 1 month ago

Thinking about use cases and motivations other than saving space.

For example message priorities can be implemented like this:

  1. Keep messages in log like QQ
  2. Keep index in state: list of lists Priority -> [MessageIndexInLog...]
  3. When message sent exclude it (its index) from that new ra_machine callback.

Or Delayed messages:

  1. <>
  2. Index is DelayTime => MessageIndexInLog
  3. <>

To generalize Index (Metadata) is in Ra State, Messages in Log (as they are now). This addresses the main concern I have wrt storing delayed messages in log - they are not fifo and will be sent "randomly". Would be wasteful to keep all the log just because some message is sitting behind next two months.

Returning all live can be costly then. Maybe returning dead is also an option?


ikavgo commented 1 month ago

Also wonder if TTL handling in QQs can be improved? Like index kept and messages removed proactively:


lhoguin commented 1 month ago

There is also a 2b option where data within a segment could possibly be moved around to provide more compaction but moving data is always tricky as it would invalidate any in memory caches of the segment index that the Ra process may currently have if it recently read an entry from this segment.

For what it's worth moving data around in rabbit_msg_store worked very well for CQs. It moves data from the end toward the beginning of the file, in holes created by no longer needed data. It then takes special care to not truncate before the existing readers are done reading from the end of the file. This approach allows rabbit_msg_store to have lock-free readers while still compacting when appropriate.

Not sure how that can apply to QQs / ra but the cache invalidation was not an issue compared to the lock the code used before.