cockroachdb / cockroach

CockroachDB — the cloud native, distributed SQL database designed for high availability, effortless scale, and control over data placement.
https://www.cockroachlabs.com
Other
29.99k stars 3.79k forks source link

replication: avoid fsync during raft log append #88442

Open tbg opened 2 years ago

tbg commented 2 years ago

Traditional quorum replication requires all log entries to be durably stored on stable storage on a quorum of replicas before being considered committed.

In practice, for us this means fdatasync'ing raft log appends. The main downside of this requirement is that fsync is latency-intensive - CockroachDB runs much faster on write-heavy workload with fsync turned off or reduced. In fact, some customers are known to de facto do this, by running on zfs with the fsync syscall mocked out as a noop.[^2]

Since CockroachDB ranges are usually deployed across AZs or even regions, where correlated power failures are likely rare, it stands to reason that trading durability for performance could be beneficial.

As explained in ^1, one can run CockroachDB "correctly" with fsync turned off if one ensures that a node that crashes (i.e. exits in a way that may lose log writes) does not return to the cluster (i.e. has to be wiped and re-join as new node). This is equivalent to running with fsync turned on (though more performant) and pretending that any crash failure is permanent.

This is unappealing due to the need to replicate a lot of data, almost all of which redundantly. The missing piece is a mechanism that allows a power-cycled node to return to the cluster gracefully.

To give an explicit example of why naively letting the node rejoin when it didn't properly obey durability leads to incorrect behavior, consider three nodes n1, n2, and n3 which form the members of some range r1.

An entry at index 100 is appended to n1 and n2 and reaches quorum. It is committed and applied by n1. n2 power-cycles and loses index 100 (which it previously acked). Then we are in the following state:

                 committed
                     |
                     v
log(n1) = [..., 99, 100]
log(n2) = [..., 99]
log(n3) = [..., 99]

which makes it possible for n2 and n3 to jointly elect either of them as the leader, and to subsequently replace the committed (and, likely applied on n1) entry at index 100.

To avoid this, we need to ensure that n2 doesn't vote or campaign until it is guaranteed to have been caught up across all entries that it may, in a previous life, have acked. A simple way to do this is to propose (i.e. ask the leader to propose) an entry carrying a UUID and abstaining from voting until n2 has this entry in its log. In effect, the follower "is semantically down" until it has been caught up past what it had acked previously, but it can be brought "up" again with the minimal amount of work possible.

More work would be necessary to actually do this.

For one, etcd/raft is heavy on assertions tracking whether durable state has regressed. For example, upon restarting, n2 might be contacted by n1 with an MsgApp to append an index 101, which n1 considers possible based on what it believes the durable state of n2 to be (it thinks n2 has index 100). Upon receiving this message, n2 will exit with a fatal error. We would need to make raft more accepting of loss of durability.

We may also need to be careful about command application, especially for specialized commands such as AddSTs, log truncation, splits, etc., though I'm not sure there are any new complications. To be safe, we could always use a separate raft command encoding for "non-vanilla" entries and make sure to fsync whenever one of these enters the log. However, the semantics around configuration changes are already very complex[^3]

A somewhat related issue is https://github.com/etcd-io/etcd/issues/12257[^4].

There are alternatives to do this outside of raft. For example, if we had bulk replication changes, we could "re-add" the node in place, but under a new replicaID, and with a way to re-use the existing snapshot (i.e. apply it in-place). This would have the same effect, but avoid any complications at the raft layer (since we're not violating durability). Bulk replication changes are tricky, though, since currently any replication change has to update the range descriptor and in particular the meta2 copy. One relevant observation is that currently, the meta2 is identical to the range copy, but it doesn't have to be. The meta2 copy only needs to have enough information to allow CPuts as part of replication changes and to route requests; maybe there is a way to not have it include the ReplicaID, in which case we could bump the replication ID and run a replication change in a 1PC txn on the range itself.

[^2]: though the main driver there is resilience to EBS-level slowdowns on certain write-heavy workloads on underprovisioned gp3 volumes; I'm hesitant to think of disabling raft fsync as the right solution since we are lease-based and so a disk that has slow writes will still have severe problems serving user traffic [^3]: https://github.com/etcd-io/etcd/issues/11284 https://github.com/etcd-io/etcd/issues/7625 https://github.com/etcd-io/etcd/issues/12359 [^4]: which proposes a change to the raft library to let the application decide when to append log entries. However, this issue does not compromise on the durability - log entries would only be acked once they are durable.

Jira issue: CRDB-19825

Epic CRDB-40197

blathers-crl[bot] commented 2 years ago

cc @cockroachdb/replication

erikgrinaker commented 2 years ago

To avoid this, we need to ensure that n2 doesn't vote or campaign until it is guaranteed to have been caught up across all entries that it may, in a previous life, have acked. A simple way to do this is to propose (i.e. ask the leader to propose) an entry carrying a UUID and abstaining from voting until n2 has this entry in its log. In effect, the follower "is semantically down" until it has been caught up past what it had acked previously, but it can be brought "up" again with the minimal amount of work possible.

There are additional problems here around correlated failures, i.e. if we were to also lose n1 then all bets are off. Can we reliably detect this situation and surface a hard error to the operator? I would consider this a requirement, since we'd otherwise risk silent data loss.

erikgrinaker commented 2 years ago

CC @sean- who's also voiced interest in this.

tbg commented 2 years ago

There are additional problems here around correlated failures, i.e. if we were to also lose n1 then all bets are off.

Right, our availability weakens. A node counts as down in the "raft sense" until it's been caught up by the leader. If we lose a quorum simultaneously, there may not be a leader left to get these followers back.

erikgrinaker commented 2 years ago

There are additional problems here around correlated failures, i.e. if we were to also lose n1 then all bets are off.

Right, our availability weakens. A node counts as down in the "raft sense" until it's been caught up by the leader. If we lose a quorum simultaneously, there may not be a leader left to get these followers back.

Sure, the availability issue seems fine. But if we lose n1 then we also lose entry 100 (in your example above). Is there any way we can know that a previously committed entry is now completely gone from all replicas? If not, we'll be at risk of silent data loss.

tbg commented 2 years ago

What will happen in your example is that n1 and n2 are both unable to elect a leader (and they will not campaign themselves), and so the raft group will be unable to make progress. It has in effect lost quorum. So in a sense it is detecting that an entry got lost, because a peer only fully recovers once it is guaranteed to be fully up to date.

erikgrinaker commented 2 years ago

Right, so a replica effectively needs an existing quorum to rejoin the cluster. I think we'll also need a marker for clean shutdowns (with fsyncs) that disables this check, such that it's possible to do a cold start of the cluster. But yeah, this seems viable.

sumeerbhola commented 2 years ago

Would the gains here be minor once the async work outlined in https://github.com/cockroachdb/cockroach/issues/17500#issuecomment-727094672 and https://github.com/cockroachdb/cockroach/pull/87050 is done since it would remove any effect of fsync latency on throughput and minimize the effect of fsync latency on writes (since raft commit and state machine application would see the K-th lowest fsync latency where K is the quorum size)?

I value the thought exercise in this proposal, but now that some details have been fleshed out, I worry that this is too complicated/risky a durability/availability story.

tbg commented 2 years ago

17500 and #87050 contract the blast radius - if the leader is slower to fsync, that's fine as long as the followers are fast enough (there will be some increase in latency vs steady state since now we're waiting for two RPCs instead of one, but it's tolerable).

What this issue brings to the table is not waiting for any fsync at all. This should be significant no matter what, but just to be clear, I am not pushing for this as something we try to do in the near-term future (and perhaps not ever, and certainly not as a default). The issues you mentioned above are much more important.

ajwerner commented 1 year ago

It'd be nice to gain the benefits of not waiting for an fsync on the write path while also being able to survive simultaneous power failure to the entire cluster and to make claims about what sort of data loss might be experienced.

As Tobi noted, we'll definitely want to sync things that affect the raft group state like config changes, term changes, and well a whole host of other special things.

A thing folks have come to expect of traditional DBMS systems which do not run with fsyncs on is that if you lose power, you lose recent transactions. Importantly, in such systems, you generally lose entire transactions. If parts of transactions were to be lost the system may find itself in an invalid state with regards to referential integrity. It'd be hard to talk about.

An ideal property would be that when we lose data due to accepted, and potentially opt-in durability weakening, that we only lose whole transactions.

One approach to only lose whole transactions might be to make sure that we actually wait for the data to be flushed when making transactions explicitly committed and when resolving intents. These operations are already asynchronous with respect to the client performing the write (assuming parallel commits is in use). A negative implication is that these flush operations would be on the synchronous "contention footprint" for concurrent operations which might want to read or overwrite this newly data. Other ideas, discussed in https://github.com/cockroachdb/cockroach/issues/22349 could be used to also make the contention footprint fully asynchronous too -- such that we only need to wait for one non-sync'd write before the system could allow a contending transaction to proceed. This would be a huge win.

All of the above could lead to higher throughput, lower latency, and lower IOP usage at the loss of durability in the face of black swan events. Furthermore, it would not be hard to add a mechanism for client to opt in (or opt out) of the durability claims we already make.

None of this note is in conflict with anything in the initial proposal here. The re-joining discussion in the face of lost log entries remains equally valid.

tbg commented 1 year ago

Importantly, in such systems, you generally lose entire transactions. If parts of transactions were to be lost the system may find itself in an invalid state with regards to referential integrity. It'd be hard to talk about.

More precisely, you lose a suffix of transactions (if transaction N was lost, N+1, ..., are also lost). Otherwise, if you delete a row to free up a unique constraint and then insert another row that uses it, then losing only the first txn suddenly you have a unique constraint violation. In CRDB, where there isn't a single operation order, I think this becomes "if txn N is lost, then all subsequent txns overlapping N are also lost or are aborted". And in both cases, "transaction" means "mutation".

One approach to only lose whole transactions might be to make sure that we actually wait for the data to be flushed when making transactions explicitly committed and when resolving intents.

One annoying detail here is that we need to wait for all intents to be durable before resolving any of them. Otherwise:

Instead, we need to wait:

So the contention footprint increase will be pretty massive due to this need to wait for durability before commit, which amounts to another round-trip during intent resolution or the fsync (whichever is larger).

Let's say we try to hide this contention footprint from clients. Let's say we do this by having the txn gateway tell all nodes that have been touched by the txn that the txn is now considered committed (i.e. is STAGING and all intents were actually "written"). Nodes could then treat these intents as committed eagerly, reducing the contention footprint. But if a replica set worth of nodes power-cycles and loses an intent, everyone is seeing a partial txn anyway.

I think (worry) the end result is that the widened contention footprint described above will be near optimal if we really want to avoid anyone seeing a partial txn.

Also, I worry that it still isn't giving us what we need. You could have two txns that overlap only on their read/read-write (but not write-write) sets:

if r1 and r2 aren't colocated, you might well lose txn1, but commit txn2. This could certainly lose some app-level invariants. I wonder if it could also violate SQL-internal invariants, for example that a row is housed in exactly one table (where txn2 would check table1 before writing to table2).

tbg commented 1 year ago

Also, I worry that it still isn't giving us what we need. You could have two txns that overlap only on their read/read-write (but not write-write) sets:

I was wrong here, only durable txns can be observed - so if we lose a txn, it couldn't have been observed and so the property we want holds trivially.

The longer contention footprint seems necessary, though, and is a real bummer.

andrewbaptist commented 1 year ago

TL/DR It is possible to write the WAL using O_DIRECT on a preallocated file.

fdatasync does two things: 1) Flushes data from the OS write cache (known as dirty blocks) to the drive. 2) Forces the drive to flush its cache (by sending a SYNCHRONIZE_CACHE SCSI command).

Drives only accept writes in "block size" amounts (typically 4K). The purpose of the OS write cache is to allow unaligned writes to the file and flush them when it decides to (or is forced to with a fsync/fdatasync).

The difference between fsync and fdatasync is whether "non-critical" metadata is also flushed. Non-critical metadata is things like the last modified time.

A fairly small change that could be made to remove the need to call fdatasync at all is to fully manage the writing of the WAL. Specifically, the following three things need to happen: 1) The WAL is pre-allocated (and zeroed out). Using O_DIRECT on a non-preallocated file is poorly defined behavior. The core reason is that if the underlying blocks that make up a file change, the timing of the "critical" metadata updates and the inode updates for the block addresses for the file are not coordinated. 2) The writes would need to be done in 4K (or larger) blocks. O_DIRECT operations will fail if the size of the write does not match the size of the underlying device. 3) The writes are all done with the FUA bit set. This happens automatically if the file is opened with O_DSYNC. Setting the FUA bit forces the data to bypass the drive level cache.

Fortunately setting the FUA bit is a NO-OP for both AWS and Google as well as most enterprise drives. This means setting the FUA bit is free. There will be a performance cost for (cheap) drives that don't guarantee power loss protection, but this isn't any more than the cost of later sending a fdatasync. If we go forward with this, we should make some recommendations for non-cloud customers to purchase enterprise SSD drives (I don't think anyone uses CRDB on spinning rust).

If we can make these changes, then WAL writes should be very fast, and bypass the OS completely. Optionally this could be combined with AIO, however, I'm not sure that is necessary. Generally the way I think this could be done is having a "WAL buffer" in userspace. When a write that needs to go to the WAL occurs, it would go to a cache that we would control. Periodically (every 10ms?) or once there is at least one block (4k) of data, the flush thread would flush the data to disk. There is already support in the WAL for using it in a "circular buffer" fashion, so this would just use that capability.

tbg commented 1 year ago

^-- the above was explored a bit by @andrewbaptist in https://github.com/cockroachdb/cockroach/pull/91272. There's also this slack thread. The TL;DR is (though maybe @andrewbaptist has a better TLDR!) that there's currently no good way of having the above "free fsync" strategy actually play out in practice[^1]. This is sort of sad, I wonder if we can finagle it somehow but I suppose if that were possible folks would be doing it already.

[^1]: i.e. can't do something like described in https://linux-scsi.vger.kernel.narkive.com/yNnBRBPn/o-direct-and-barriers

tbg commented 1 year ago

Just in case we need some more bolstering of the no-fsync approach from academia, reproduced below is a passage from Viewstamped Replication revisited. What's described matches very closely what's described in this issue in spirit, except they elide log disk writes altogether. It first thought TigerBeetle had adopted this approach^1 (which seemed a bit hardcore even for them) but it says here^2 "primary disk plus backup's disk", and the code^3 suggests we're doing vanilla fdatasync.


When a replica recovers after a crash it cannot participate in request processing and view changes until it has a state at least as recent as when it failed. If it could participate sooner than this, the system can fail. For example, if it forgets that it prepared some operation, this operation might then be known to fewer than a quorum of replicas even though it committed, which could cause the operation to be forgotten in a view change. If nodes record their state on disk before sending messages, a node will be able to rejoin the system as soon as it has reinitialized its state by reading from disk. The reason is that in this case a recovering node hasn’t forgotten anything it did before the crash (assuming the disk is intact). Instead it is the same as a node that has been unable to communicate for some period of time: its state is old but it hasn’t forgotten anything it did before.

However, running the protocol this way is unattractive since it adds a delay to normal case processing: the primary would need to write to disk before sending the PREPARE message, and the other replicas would need to write to disk before sending the PREPAREOK response. Furthermore, it is unnecessary to do the disk write because the state is also stored at the other replicas and can be retrieved from them, using a recovery protocol. Retrieving state will be successful provided replicas are failure independent, i.e., highly unlikely to fail at the same time. If all replicas were to fail simultaneously, state will be lost if the information on disk isn’t up to date; with failure independence a simultaneous failure is unlikely. If nodes are all in the same data center, the use of UPS’s (uninterruptible power supplies) or non-volatile memory can provide failure independence if the problem is a power failure. Placing replicas at different geographical locations can additionally avoid loss of information when there is a local problem like a fire. This section describes a recovery protocol that doesn’t require disk I/O during either normal processing or during a view change. The original VR specification used a protocol that wrote to disk during the view change but did not require writing to disk during normal case processing. When a node comes back up after a crash it sets its status to recovering and carries out the recovery protocol. While a replica’s status is recovering it does not participate in either the request processing protocol or the view change protocol. To carry out the recovery protocol, the node needs to know the configuration. It can learn this by waiting to receive messages from other group members and then fetching the configuration from one of them; alternatively this information could be stored on disk.

nvanbenschoten commented 1 year ago

To avoid this, we need to ensure that n2 doesn't vote or campaign until it is guaranteed to have been caught up across all entries that it may, in a previous life, have acked.

This is how Kafka's replication recovery protocol works. See https://jack-vanlightly.com/blog/2023/4/24/why-apache-kafka-doesnt-need-fsync-to-be-safe.

tbg commented 1 year ago

Slack thread