cockroachdb / cockroach

CockroachDB - the open source, cloud-native distributed SQL database.
https://www.cockroachlabs.com
Other
29.6k stars 3.71k forks source link

kvserver: throttle writes on followers #79215

Closed erikgrinaker closed 2 years ago

erikgrinaker commented 2 years ago

We currently throttle writes on the receiving store based on store health (e.g. via admission control or via specialized AddSSTable throttling). However, this only takes into account the local store health, and not the associated write cost on followers during replication, which isn't always throttled. We've seen this lead to hotspots where follower stores get overwhelmed, since the follower writes bypass admission control. A similar problem exists with snapshot application.

This has been touched on in several other issues as well:

Jira issue: CRDB-14642

Epic CRDB-15069

sumeerbhola commented 2 years ago

(tried to summarize some recent discussions. @nvanbenschoten @tbg -- please correct/add details).

Considering the following simple scenario, assuming the allocator has already been improved:

Claims/Ideas (from discussions with @nvanbenschoten and @tbg)

erikgrinaker commented 2 years ago

Append to the raft log without bothering with admission control. The raft log is unlikely to be the source of high read amp. Invoke admission control before applying to state machine. etcd/raft changes? Conf changes that need application to state machine before being effective?

At a first glance, I think I'm partial to this solution. Some questions/concerns:

sumeerbhola commented 2 years ago

because there are many Raft logs on the same Pebble store, and so the overall writes will be spread across a wide keyspan, meaning it will have to go through compaction as usual with the associated write amplification.

The raft logs are in the range-id local key space, so all of them will be prefixed by LocalRangeIDPrefix, which will keep them away from all the state machine state. So Pebble compactions should do the right thing, and if not (say we do see L0 flushed sstables spanning from the range-id local key space to the global key space), we can investigate and fix this in Pebble by tweaking the flush split logic.

Should the quota pool use the applied index rather than the commit index, to prevent followers from falling too far behind? Or, related to the above point, should a replica stop accepting new log entries if application falls too far behind?

Let's ignore transient overload for now, since admission control on the store write path doesn't throttle until overload is persistent (and has accumulated significant work in L0). Stated another way, say store S3 with follower replica for range R1 can only handle application at 60% of the throughput of the other replicas (due to admission control). We should assume then that whatever budget we have for how far application at the follower is allowed to fall behind will be exhausted, and the mechanism we need is that which will allow the raft group to efficiently work at this 60% throughput (until the rebalancer shifts the follower elsewhere). So flow control all the way back to the leaseholder seems to be what we need, and not necessarily separation of append from application. This flow control would need to allow raft heartbeats and configuration changes to not get stuck behind other normal raft commands. And we would configure admission control at S3 such that raft configuration changes would bypass the admission queue.

erikgrinaker commented 2 years ago

Pebble compactions should do the right thing, and if not (say we do see L0 flushed sstables spanning from the range-id local key space to the global key space), we can investigate and fix this in Pebble by tweaking the flush split logic.

What's the right thing here? Since all of the levels will be non-overlapping, will it avoid doing any compactions at all until the log gets truncated?

We should assume then that whatever budget we have for how far application at the follower is allowed to fall behind will be exhausted, and the mechanism we need is that which will allow the raft group to efficiently work at this 60% throughput (until the rebalancer shifts the follower elsewhere). So flow control all the way back to the leaseholder seems to be what we need, and not necessarily separation of append from application.

I'm not sure that's always desirable though. A 40% loss in throughput would not be acceptable in a lot of workloads, and we often get complaints of a "sick node" affecting a the entire cluster in this way.

Let's assume that we have a 20-node cluster with 10000 ranges and RF=3. One of the nodes starts experiencing slowness (read amp or otherwise), with 60% throughput. This would affect 1500 ranges (15%) evenly distributed across all nodes, essentially impacting the entire workload if we assume some degree of cross-range txns. In this case, it would likely be better to give up on the follower and upreplicate elsewhere than to have 40% throughput reduction across this many ranges for however long it takes the allocator to move 1500 replicas. We often see this play out in escalations, where operators end up killing the node to force the upreplication.

I think I'm leaning towards appending the log entries and locally throttling application for however long it takes, since this avoids log entry retransmits and leader log retention. But it's still a bit unclear how aggressive the quota pool/allocator should be in cutting off followers (@tbg has suggested considering the relative rates of the leader and followers). If we do lose the leaseholder, then we should prioritize application of these entries to catch up, but hopefully the allocator will step in before it gets that far.

@tbg has been looking into some similar issues with the current below-Raft throttling and Raft queues/buffers, and I think that work points in this direction too -- but he'll write up some of his results later.

tbg commented 2 years ago

One short-term goal we should pursue is to avoid any below-raft throttling, i.e. this

https://github.com/cockroachdb/cockroach/blob/825a4ee8889cc9f777f98c202ae1877d5989c6cd/pkg/kv/kvserver/replica_proposal.go#L429-L525

In my experiments, I see this contribute significantly to memory pressure on the node and generally ... obfuscated what's going on. Holding raftMu for extended periods if time (I've seen 477s, which could be caused by the rate limiter shared across arbitrary numbers of ranges) leads to lots of SSTs queuing up at the raft scheduler on top of many other issues that stem from holding mutexes for extended periods of time (try to get a range status or report metrics on this replica in the meantime, for example). In a sense, this simulates what a "truly bad" system would look like and there should ultimately be backpressure all the way up to the leader, but I think it would be a step-change if we replaced this below-raft throttling by avoiding attempts to apply these entries in the first place. Instead of rate limiting / delaying, we'd determine the max size of entries to apply on a per-Ready basis, based on how much we're willing to ingest/add to the storage engine (this requires some raft changes). Then, whatever we pull to apply, we apply as fast as possible.

When to "cut off" followers is, of course, related, but I think we should be careful to discuss them somewhat separately to keep the concerns clear. I don't have an easy answer here but it feels right to me to make that call based on a velocity difference. If a follower is only 0.5% slower than the others, over time it will fall behind by arbitrary amounts, so an absolute "behind size" isn't a good cutoff. But if a follower is 30% slower than the others, we may want to cut it off once it's fallen enough to start throttling in the quota pool, and we may want to do so not only because it wrecks the range's performance, but also because it may not be able to handle the load we're sending it. To shed load, then, all the follower would have to do is to stop consuming log entries, and we can discuss how sophisticated this mechanism has to be. I would expect that it wouldn't have to be too sophisticated.

This all may not be enough if we're trying to handle the case (as we ought to, ultimately) in which a quorum of followers are getting very slow (and perhaps we're waiting for vertical scaling to occur, etc). Before rendering the range completely unavailable, the leader would need to pick a quorum and make sure to keep it up to date, meaning that these followers would see a steady stream of incoming that they may not be able to handle at the rate. So here explicit backpressure would be better than, say, the follower just dropping appends. But again, the appends are not generally the problem, but the log application, and the log application is under control of the follower.

Would be helpful if we rallied around a number of examples which we want to address, which we can prioritize and see how various solutions address them.

Here's an attempt:

sumeerbhola commented 2 years ago

What's the right thing here? Since all of the levels will be non-overlapping, will it avoid doing any compactions at all until the log gets truncated?

I am not sure what is meant by "all of the levels will be non-overlapping". I was merely remarking that we will hopefully not see many flushed sstables spanning from the range-id local key space to the global key space to the state machine. A consequence of that is that if appends to the raft log get deleted "soon", we will see less write amplification for that part of the key space. It doesn't mean there will be no compactions, but it may be that after reaching Lbase, the log entries get deleted. But if we are just appending to the raft log and not applying, the replica can't truncate the raft log, so the write amplification of the raft log could get as bad as the state machine. In which case the compactions that are falling behind in dealing with the write amp of the state machine, thereby increase read amp, will further be stressed by the raft log.

I'm not sure that's always desirable though. A 40% loss in throughput would not be acceptable in a lot of workloads, and we often get complaints of a "sick node" affecting a the entire cluster in this way. Let's assume that we have a 20-node cluster with 10000 ranges and RF=3. One of the nodes starts experiencing slowness (read amp or otherwise), with 60% throughput. This would affect 1500 ranges (15%) evenly distributed across all nodes

btw, my 60% comment and flow control was in response to the idea "should a replica stop accepting new log entries if application falls too far behind?". What I am claiming is:

It would be nice to see a very clear articulation of requirements/desired high-level behavior (elaborating on the ones @tbg stated), design a clean solution with minimal mechanisms, and then work backwards to the intermediate steps.

erikgrinaker commented 2 years ago

What's the right thing here? Since all of the levels will be non-overlapping, will it avoid doing any compactions at all until the log gets truncated?

I am not sure what is meant by "all of the levels will be non-overlapping". I was merely remarking that we will hopefully not see many flushed sstables spanning from the range-id local key space to the global key space to the state machine. A consequence of that is that if appends to the raft log get deleted "soon", we will see less write amplification for that part of the key space. It doesn't mean there will be no compactions, but it may be that after reaching Lbase, the log entries get deleted. But if we are just appending to the raft log and not applying, the replica can't truncate the raft log, so the write amplification of the raft log could get as bad as the state machine. In which case the compactions that are falling behind in dealing with the write amp of the state machine, thereby increase read amp, will further be stressed by the raft log.

Yeah, this worries me too. In #78412, I saw severe performance degradation in cases where the Raft log lingered, and this would only exacerbate the problem if we halt application but keep appending to the log when we see store overload.

Since the Raft log is sequential in space and time, we could end up with e.g. 1-10 in L2, 11-20 in L1, and 21-30 in L0. There is marginal benefit in compacting this, since it will only be read once and eventually removed. Can/should Pebble be smart about this?

The WAL+LSM overhead seems unfortunate and unnecessary here. In the optimal case, we would have a single append write for each log entry, and a cheap way to remove the tail of the log. Have we considered using a different storage backend for the Raft log?

I'm not sure that's always desirable though. A 40% loss in throughput would not be acceptable in a lot of workloads, and we often get complaints of a "sick node" affecting a the entire cluster in this way. Let's assume that we have a 20-node cluster with 10000 ranges and RF=3. One of the nodes starts experiencing slowness (read amp or otherwise), with 60% throughput. This would affect 1500 ranges (15%) evenly distributed across all nodes

btw, my 60% comment and flow control was in response to the idea "should a replica stop accepting new log entries if application falls too far behind?". What I am claiming is:

  • If there is some maximum lag in application after which appends will stop, then it should either be long enough to allow the allocator to take action (5min?), or we should assume that the lag will be reached and we need to degrade gracefully after that.
  • 60% capacity aggregated across all the replicas in a store may be ok: If we are plumbing tenant info and priority through raft replication, both high priority and inter-tenant fairness should be respected on the store that is falling behind. For example, if we are suddenly seeing a 2x load spike because of an index backfill that is hitting 100 of the follower replicas on the store, if we arrange the code correctly we should see only those ranges getting throttled via flow control all the way back to their leaseholders.

Yeah, there's a difference here between slowdowns due to ingest volume (which should ideally only affect the target ranges) and slowdown due to node problems (which affects all ranges on that node).

Another difficulty here is that bulk ingestion is typically done with a lower priority, but I don't think we can respect these priorities at the Raft level because we can't reorder the writes. We can likely use this to prioritize different replicas for application, but we would have head-of-line blocking on the range.

It would be nice to see a very clear articulation of requirements/desired high-level behavior (elaborating on the ones @tbg stated), design a clean solution with minimal mechanisms, and then work backwards to the intermediate steps.

I think we should draft a design doc around this, for short- and long-term solutions, as there are several issues we need to consider. @tbg would you be willing to get one started, since you're already pretty deep in these issues? I need to stay focused on MVCC range tombstones, and will be OOO for a week as well.

tbg commented 2 years ago

I think we should draft a design doc around this, for short- and long-term solutions, as there are several issues we need to consider. @tbg would you be willing to get one started, since you're already pretty deep in these issues? I need to stay focused on MVCC range tombstones, and will be OOO for a week as well.

👍🏽

jbowens commented 2 years ago

Since the Raft log is sequential in space and time, we could end up with e.g. 1-10 in L2, 11-20 in L1, and 21-30 in L0. There is marginal benefit in compacting this, since it will only be read once and eventually removed. Can/should Pebble be smart about this?

The log is keyed by range, so it consists of many short sequential runs, right?

I was merely remarking that we will hopefully not see many flushed sstables spanning from the range-id local key space to the global key space to the state machine.

The min-overlapping ratio heuristic used in L1 and lower prioritizes compacting files that are large with respect to the data beneath them. This seems undesirable for ephemeral data like the raft log. The incoming data is 'dense,' as opposed to the sparser distribution you'd get with, for eg, random writes across the keyspace. If raft log entries survive into L1 and accumulate, it seems like they'd be prime candidates for compaction :/

erikgrinaker commented 2 years ago

Since the Raft log is sequential in space and time, we could end up with e.g. 1-10 in L2, 11-20 in L1, and 21-30 in L0. There is marginal benefit in compacting this, since it will only be read once and eventually removed. Can/should Pebble be smart about this?

The log is keyed by range, so it consists of many short sequential runs, right?

Correct. If we were to use a new/different storage engine for this, we could possibly store them as separate Raft logs, since cheap appends and truncates would presumably be easier to handle that way. That would require frequent seeks across different logs, but maybe that doesn't matter much with SSDs.

nvanbenschoten commented 2 years ago

Conf changes that need application to state machine before being effective?

Apply-time config changes are problematic for solutions that try to retain voting capabilities on followers while throttling log application. The idea here would be to decouple these concerns so that range availability (i.e. the ability for the range to reach quorum) has no dependency on admission control on followers. In other words, a follower could continue to append entries to its log and vote in elections even if its applied state lagged arbitrarily behind.

The concern is that the invariants outlined in @tbg's comment in https://github.com/etcd-io/etcd/issues/7625#issuecomment-489232411 would lead to subtle dependencies from range availability to entry application on followers. We wouldn't want for a follower's applied state to lag far behind its log without affecting its range's availability in the common case, but then in an edge case for a config change (for example) to suddenly block that range's availability for an indefinite amount of time as application catches up. These three invariants don't directly introduce such a dependency for followers, but I think the Campaign invariant is the sticking point. If all followers in the majority quorum were allowed to fall arbitrarily far behind on application, the failure of the range's leader could lead to a period of time where no eligible replicas (those with the full committed log) are permitted to campaign.

So if range availability does need to have a dependency on each follower's admission control state, I think we'd want that interaction to be reflected in the common case and not just in obscure edge cases. That way, we can immediately start pushing back on proposals instead of building up a large amount of application debt that could stall a range entirely for an indefinite amount of time under just the right circumstances.

erikgrinaker commented 2 years ago

If all followers in the majority quorum were allowed to fall arbitrarily far behind on application, the failure of the range's leader could lead to a period of time where no eligible replicas (those with the full committed log) are permitted to campaign.

Doesn't this just mean that the leader must start throttling proposals when a quorum of voters are behind on conf change application, as opposed to when a single voter is behind?

Of course, this means that if 2/5 voters are behind on application, and we lose one of the up-to-date replicas, then the leader will immediately begin throttling -- possibly for a long time.

I think we'd want that interaction to be reflected in the common case and not just in obscure edge cases. That way, we can immediately start pushing back on proposals instead of building up a large amount of application debt that could stall a range entirely for an indefinite amount of time under just the right circumstances.

I'm not sure I fully agree. There's definitely a risk that we build up too much application debt and fall of a cliff if we suddenly have to repay that. But it also seems bad to always pay a penalty when a follower is behind to avoid the rare case where we'd have to suddenly catch up that follower.

I think this really comes down to how fast the allocator (or some other system) is able to resolve the problem. I think I would be fine with allowing a minority of followers to lag for long enough that we can reasonably expect the problem to be resolved (typically by upreplicating elsewhere), but not allow them to fall behind indefinitely. So we would throttle proposals as a function of the median and maximum replica lag.

This also implies that a lagging replica with lag <= the median lag should get application priority (which also implies that a new leader/leaseholder gets priority). However, I'm not sure if we can reliably determine the global median lag for the range, especially not if the local replica is behind of conf changes and has an outdated view of the range configuration.

sumeerbhola commented 2 years ago

This also implies that a lagging replica with lag <= the median lag should get application priority

I understand the thinking behind this, but I would prefer if we first tried to orient our thinking on the overall behavior of the system and not whether a range is getting worse performance, and think fully through where that leads us. In that way of thinking, what should get application priority ought to be dictated by inter-tenant fairness and secondarily by priority of work. It is good that a range does not span tenants. Priority of the work of course can vary within a range, and we are subject to head-of-line blocking when applying for a range, but I was positing earlier that this may also be acceptable for many of the common scenarios when a range is only receiving low priority bulk work. Adding additional criteria like who is lagging by what amount creates tension between the various goals, which would be really nice to avoid.

tbg commented 2 years ago

Here's a straightforward experiment that reproduces the problem in the simplest setting.

roachprod create -n 4 --os-volume-size 100 --clouds aws --local-ssd=false tobias-lsm
roachprod extend --lifetime 240h tobias-lsm
roachprod start tobias-lsm:1-3

# edit n1 and n2's disks in AWS console to give them 200mb/s throughput (n3 remains at 125mb/s)

# wait for replication and pin leases to n1 (what matters is that they're not on n3)
time roachprod sql tobias-lsm:1 -- -e "
select crdb_internal.force_retry('3100ms') from crdb_internal.ranges_no_leases where array_length(replicas, 1) < 3;
select count(range_id) from crdb_internal.ranges_no_leases where array_length(replicas, 1) >= 3;
ALTER RANGE default CONFIGURE ZONE USING lease_preferences = '[[+node1]]';
"

roachprod run tobias-lsm:1 ./cockroach workload init kv

# 6mb of goodput is enough to lead to >125mb/s write throughput on the nodes due to write amplification, see https://cockroachlabs.slack.com/archives/C0KB9Q03D/p1652441967965779
roachprod ssh tobias-lsm:4 -- sudo systemd-run --unit kv-rand -- ./cockroach workload run kv --read-percent 0 --max-rate 600 --concurrency 1000 --min-block-bytes 10000 --max-block-bytes 10000 --tolerate-errors {pgurl:1-3}

chart

image

tbg commented 2 years ago

The admission/follower-overload roachtest that reproduces the purest form of write overload on a follower that I could conceive of is https://github.com/cockroachdb/cockroach/pull/81516.

There is a recorded demo of it here

tbg commented 2 years ago

Unassigning to make clear that I am not closing this issue in the short term. I'm working my way through the dependencies for the "append" column. Nobody is currently assigned to the issues in the "snapshot" column.

tbg commented 2 years ago

I finished exploring https://github.com/cockroachdb/cockroach/issues/82132. The TL;DR is that the idea seems to work as expected: if we let leaseholders avoid sending to followers that have IO overload and also ignore such followers in the quota pool, we reduce IO work on followers without compromising the SQL workload directed at the leaseholder.

I think we should pursue something like this for v22.1 of CockroachDB.

My thinking about follower overload in general has also clarified. I think pushing back on clients should be a last resort and is out of scope for now. There are two reasons for pushing back on client traffic in the face of lagging follower(s):

  1. we want certain followers to be up to date because we need them to serve up-to-date follower reads (for example, a MR topology built on bounded staleness reads); maybe there are similar arguments for CDC, etc.
  2. multiple followers are behind and we need at least one of them for quorum

For 2), the proposal above in effect already has this likely effect in practice, though in a brutal way. If the follower traffic is truly the reason for the IO overload, replication will stall as a quorum goes into overload and we stop sending to it. This should cause the overload to cease (or we continue sending and make it worse, we can choose), so there will be progress again, leading to another stall, etc, so in effect a very choppy kind of admission control. Not great I don't think at present we consider this case a high priority (@mwang1026) and it would be fairly complex to solve it. There is a workaround, which is adding more machines before push comes to shove and possibly moving load manually if the allocator can't figure it out.

For 1), this is as much a technical challenge as a UX one. CRDB doesn't really distinguish between followers that are important for follower reads vs those that aren't (a VOTER follower might be used just for quorum; or it might be very important for local follower reads). If the zone configs or voter type encoded which one it is we could justify backpressuring foreground traffic to save follower reads, but this seems like an unreasonable default choice; perhaps heuristically presence of a NON_VOTER could be indicated to prioritize follower reads. To implement the backpressure, we would then need some form of distributed admission control where the leaseholder keeps a view of followers' health and allocates replication tokens from them. It all seems rather difficult to get right and certainly hard to do so for 22.2.

For now, running with an overloaded follower has to be considered an undesirable interim state that should resolve - ideally - once the allocator figures out where to move load (possibly only possible after the operator has scaled up). We are not at the point where a cluster under IO overload performs very favorable under workloads that require predictably reasonable write latencies (i.e. many OLTP workloads), so even if we did something more ambitious here, we wouldn't necessarily be solving that use case.


I've removed the snapshot-induced overload case from this issue, instead using https://github.com/cockroachdb/cockroach/issues/80607 to track that work.

irfansharif commented 2 years ago

I agree that doing anything more involved for follower admission should be out of scope for 22.2 but I hope we can invest here for 23.1 and beyond. For 22.2:

If the follower traffic is truly the reason for the IO overload, replication will stall as a quorum goes into overload and we stop sending to it. This should cause the overload to cease (or we continue sending and make it worse, we can choose), so there will be progress again, leading to another stall, etc, so in effect a very choppy kind of admission control. Not great I don’t think at present we consider this case a high priority (@mwang1026) and it would be fairly complex to solve it. ... To implement the backpressure, we would then need some form of distributed admission control where the leaseholder keeps a view of followers' health and allocates replication tokens from them.

In an internal reproduction of a customer workload we’ve seen an example of “follower traffic is truly the reason for the IO overload” (+cc @arulajmani). We had a configuration with follower-replica-only-regions with different # of nodes from the primary region, which in aggregate meant a less disk bandwidth for follower replicas. Here we observed the following: quota pool filled up, blocking incoming writes momentarily, quota returned to the pool as the slowest followers caught up, and repeat all over again. I can’t speak to how important this is as a problem to solve but the choppiness was unfortunate. Perhaps we can talk about this class of issues in terms of “distributed admission control”, replacing the per-range quota pool entirely: we want to pace the rate of incoming writes such that a follower never gets into a regime of constantly needing snapshots, all without this “hard stall” behaviour we have today when you reach the quota pool limit.

For now, running with an overloaded follower has to be considered an undesirable interim state that should resolve - ideally - once the allocator figures out where to move load (possibly only possible after the operator has scaled up)

I agree with the framing that overload should be considered an intermediate state for the allocator to resolve. In 23.1 and beyond it seems important to tool our allocator to understand:

These feel like pre-requisites in order for CRDB to relocate such replicas onto nodes with available headroom for the resource in question. In follower-induced disk overload scenarios, the allocator would ideally be able to relocate follower replicas onto nodes with available bandwidth. If there are no such nodes, I'm imagining "distributed admission control" would need to shift queuing out of the follower replicas onto the leaseholder in order for the leaseholder node to prioritize traffic to other ranges/replicas on the node (the point where such differentiation can take place).

tbg commented 2 years ago

in addition to ignoring overloaded followers from the quota pool, would we also enqueue follower requests through admission control? i.e. “below raft throttling”.

No, this is difficult to make useful. We can't reorder below raft - if one low-prio request is in the log and another high-priority one follows, we have to replicate the low-prio one to replicate the high-prio one. So you can only approximately preserve priority order and the way our raft transport is architected (a single store-to-store stream) makes it difficult to have anything good come out of blocking at or below the raft transport. In fact, in https://github.com/cockroachdb/cockroach/issues/75066#issuecomment-1143065417 we are trying to remove all below-raft throttling. Throttling at the level of raft message streams is below-raft throttling. I hope that once we protect followers in the way described above, we can at least get rid of the below-raft throttling for AddSSTable (not sure if there's much more than that), and possibly the concurrency clamp for above-raft AddSSTable as well. I am less scared of trying that out now because we have set up our experiments and can more or less quickly experiment with the impact of such changes.

have you understood the implications of ignoring such overloaded followers? How do we react once the follower’s far enough behind to be in constant need of snapshots? Assuming some form of “permanently overloaded node with follower replicas”.

In the experiment in #81834, what tends to happen is that over time the follower may fall behind to the point where the raft log queue truncates the log. The follower then needs a raft snapshot, but our existing throttling for raft snapshots is already pretty aggressive (at most one stream inflight & it has a rate limit), so by "default" this has less potential to create severe overload. Some of this is particular to the experiment, though, and in particular in settings where the snap rate limits have been cranked up what might happen is that we trade MsgApp overload for snapshot overload. It's thus desirable to do something for https://github.com/cockroachdb/cockroach/issues/80607 as well. In https://github.com/cockroachdb/cockroach/issues/80607#issuecomment-1167824841, I am suggesting using the same mechanism to prevent snapshots to overloaded nodes, as long as the snapshot isn't required for quorum. I think together this should give overloaded followers the break they need. We could see, in such cases, a tendency to catch followers up via repeated snapshots, which isn't necessarily efficient (but note that catching up on the log may also be less efficient - it's really a case-by-case), but it's better than the status quo (and with a kill switch for the MsgApp mechanism, we could also revert to today if we find that advantageous in the moment, even if we don't manage to add the snapshot mechanism).

In an internal reproduction of a customer workload we’ve seen an example of “follower traffic is truly the reason for the IO overload” (+cc @arulajmani). We had a configuration with follower-replica-only-regions with different # of nodes from the primary region, which in aggregate meant a less disk bandwidth for follower replicas. Here we observed the following: quota pool filled up, blocking incoming writes momentarily, quota returned to the pool as the slowest followers caught up, and repeat all over again. I can’t speak to how important this is as a problem to solve but the choppiness was unfortunate.

It would be nice to support this at some point and it does seem like a nice real-world version of 1) from my post. Yes, ultimately we need the leaseholder to be able to do the queueing. At some level this seems similar to the distributed rate limiter we have in place for multi-tenancy, where instead of allocating RUs we need to allocate resources (for the issue at hand, read/write throughput, but maybe down the road CPU, memory, etc) from the followers to which the command would need to be replicated.

Everything you say about the allocator resonates with me. I hope that I am fairly excited by https://github.com/cockroachdb/cockroach/pull/82813 which evolves the inputs to IO admission control to more closely resemble the underlying resources. @shralex @kvoli @lidorcarmel @andrewbaptist (y'all want to get a @cockroachdb/kv-allocator handle? Had to pull this from a distribution weekly email) I can file allocator issue(s) for moving replicas by IO load, as I couldn't find any, but maybe they exist? We should also agree on what signal to use for "IO overload" to activate the below-raft protection of followers (i.e. ignoring followers temporarily as discussed in this issue). In the experiment I used the store capacity's L0Sublevels which is a windowed-max of the true L0Sublevels (in particular experimentally takes ~10m to decay). This is notably different from what actual store admission control uses, and I wonder if this divergence is helpful or problematic, and why we don't also use the file count here (and what the strategy would be if #82813 went in and the signal changed entirely).

tbg commented 2 years ago

Re: 2), it's interesting how we added the quota pool years ago and in doing so chose, by default, to stall foreground traffic over letting (arbitrary, possibly write-only) followers fall behind, and as @arulajmani has observed, throttle in a very choppy way (admission control does too, though probably not as poorly: https://github.com/cockroachdb/cockroach/issues/82114). I think in hindsight this was a mistake, we have 8mb of proposal quota today. If anything, the quota pool should be opt-in (via zone config or something like that), and it should operate at the level of closed timestamps, not proposal quota (what good is a follower that is 8mb of small writes behind when that is a week of closed timestamps? Why does a follower that is missing one SST count as too far behind?)

Confronting myself with this - in hindsight - questionable design makes me fairly bullish on https://github.com/cockroachdb/cockroach/issues/77251, perhaps coupled with a significant bump of the default quota size (but then by extension also aggressive raft log trunc threshold).

irfansharif commented 2 years ago

I can file allocator issue(s) for moving replicas by IO load, as I couldn't find any, but maybe they exist?

Filed https://github.com/cockroachdb/cockroach/issues/83490. Disk bandwidth is one dimension, CPU feels like a useful another.

kvoli commented 2 years ago

@shralex @kvoli @lidorcarmel @andrewbaptist (y'all want to get a @cockroachdb/kv-allocator handle? Had to pull this from a distribution weekly email)

Added @cockroachdb/kv-distribution handle.

I can file allocator issue(s) for moving replicas by IO load, as I couldn't find any, but maybe they exist?

We have existing issues for short term items for overload that is planned this release but not bigger picture things that are medium-long term changes. I think we came to a conclusion that enforcing a threshold on resource overload (threshold + x% > mean) was less risky for shorter term changes. Rather than updating the core "ranking" algorithm that strictly enforces a hierarchy of signals, then sorts candidates. issue: https://github.com/cockroachdb/cockroach/issues/82611

In the experiment I used the store capacity's L0Sublevels which is a windowed-max of the true L0Sublevels (in particular experimentally takes ~10m to decay). This is notably different from what actual store admission control uses, and I wonder if this divergence is helpful or problematic, [..]

I think it serves its purpose in the allocator, to avoid thrashing between overload and non-overloaded. It serves as a watermark and was suggested by @sumeerbhola. I'm not sure if its a good fit for shorter timeframe decisions for io-overload on followers however.

[...] and why we don't also use the file count here

This was the less useful of the two, in my understanding. We should still include it individually or an aggregate that contains it from admission control.

[...] (and what the strategy would be if https://github.com/cockroachdb/cockroach/pull/82813 went in and the signal changed entirely).

Talking to @irfansharif, I think there's a good case for getting resource signals that are derived from admission control in the future. I think in the shorter term this is an issue because we don't have a complete view from admission control yet (snaps, follower writes, not sure what else).

erikgrinaker commented 2 years ago

I'm generally on board with this approach, but I'm curious why we've chosen for this to be detected and controlled by the leaseholder, as opposed to the follower rejecting/dropping messages when it's overloaded (and the leaseholder detecting that and holding off). At least at the Raft level, it seems like this would generalize better to arbitrary failures, such that a node that is struggling to keep up for whatever reason would get cut off, and it doesn't rely on some side channel (store health gossip) to be functional.

The allocator level is a different story, and would need to rely on remote signal propagation, but I think there's value in lower layers being resistant to certain failure modes even if the allocator isn't.

tbg commented 2 years ago

The leaseholder needs to know anyway, since we also want to ignore the follower for the purposes of proposal quota. Also, in the general spirit of admission control, ultimately these decisions should move as high up in the stack as they can, i.e. to the admission control system at the leaseholder (which would then have to bargain with remote stores for IO tokens, etc).

Sumeer and I met earlier and we tried to think about a possible half-way point between that and backpressuring (or dropping selectively) on a Replica-by-Replica basis. Our conclusion was that there isn't much to gain there. If something is in the raft log - it's in the raft log, and it is hard to do prioritisation decisions except in some cases where a log is wholly higher priority than another.

However there is something that does decide on per-Replica priority, namely whether some appends are necessary for quorum or not. In other words, if a follower is overloaded due raft appends coming from a leaseholder that does not need that follower for quorum, then that append should be low-priority. I am trying to get a hopefully good enough approximation of that thinking in place for 22.2 - the approximation being that we can't really deal with the case in which multiple replicas are overloaded very cleanly, since raft.RawNode doesn't give us a way to selectively "overlay" the progress state of replicas. But I hope that I can somewhat approximate it well enough, though there may be some cases in which we could be dropping messages but won't. On the other hand, if a replica is needed for quorum, or if (and none of this exists but could in the future) the zone config indicates that the replica needs to be kept up-to-date (say it's serving important follower reads, and we'd rather block writes than fall further behind), we will treat this replication stream as high priority and will keep the follower up to date.

This isn't to say that the follower couldn't send additional signals that don't play a role in admission control. For example, if the raft recv queues fill up, the follower could selective ask for traffic to that range to be throttled for some time. (Since raft transport multiplexes multiple replicas onto a single destination store stream, unfortunately it's difficult to "just backpressure" without a bigger refactor, but also this wouldn't be in the spirit of admission control anyway).

Chatting with Sumeer earlier, we arrived at the strawman that what we'll do is

next steps that we may not get to in 22.2 timeframe:

erikgrinaker commented 2 years ago

if a follower is overloaded due raft appends coming from a leaseholder that does not need that follower for quorum, then that append should be low-priority

What specifically does this refer to? Appends that have already been committed, appends on non-voting replicas, or some other classification?

tbg commented 2 years ago

I think as a first step, I will check the raft status and allow dropping to non-voters and to a single voter (if that voter isn't "clearly" needed for quorum, i.e. rest of the voters are in StateReplicate, etc). Down the line this could get smarter but it also increases the chance of getting it wrong and causing an unnecessary LOQ. Need to see how hard this all actually is when I get to the code but there should be something sensible there.

If we're worried about multiple overloaded nodes at the same time (not unreasonable during imports, since you're hitting multiple nodes with the same intense workload), rather than trying to make this perfect, I would probably as a first pass resort to unconditionally dropping at the follower. The gossip signal that I'm adding could pick up on that too, but it's less clear that this is the right next move.

mwang1026 commented 2 years ago

@tbg just caught up on the https://github.com/cockroachdb/cockroach/issues/79215#issuecomment-1167857356 comment where I was @ mentioned.

For scenario 2) where there is choppy admission control, is there an easy way for a human (i.e. TSE) to identify that follower is the cause of the IO overload? Otherwise how would we know to manually move X replica to a different node?

tbg commented 2 years ago

For scenario 2) where there is choppy admission control, is there an easy way for a human (i.e. TSE) to identify that follower is the cause of the IO overload? Otherwise how would we know to manually move X replica to a different node?

TSEs can generally see which nodes are overloaded. Ranges that are stalling due to a quorum of followers being overloaded would typically register as under-replicated, so they would be visible at least in some fashion. But really fixing the overloaded nodes would fix the problems the cluster has at at that point.

I managed to make #83851 quite a bit more smart than I thought was possible, btw. When a quorum is overloaded, other nodes will randomly decide (per range) which minority of the followers overloaded nodes to pause replication to, which hopefully translates into "partially relieving" all overloaded stores equally. I haven't experimentally validated the behavior of this yet though, since it amounts to an additional cycle of experiments that I haven't had time for.

irfansharif commented 1 year ago

X-linking #95563, which is orthogonal to follower pausing and a form of follower write control.