cockroachdb / cockroach

CockroachDB — the cloud native, distributed SQL database designed for high availability, effortless scale, and control over data placement.
https://www.cockroachlabs.com
Other
30.12k stars 3.81k forks source link

kvcoord: permit parallelization of scans with limits #54680

Open jordanlewis opened 4 years ago

jordanlewis commented 4 years ago

Must-haves for 22.2:

Nice-to-haves for 22.2:

Bug fixes:


tl;dr we need to find a way to let kvcoord/dist_sender parallelize its requests even if the requests have a TargetBytes attribute set to prevent out of memory conditions caused by index or lookup joins against tables with large rows.

Currently, MVCCScan requests that have a TargetBytes or a MaxSpanRequestKeys cannot be parallelized by dist_sender. This presents a major problem for SQL, which must set a TargetBytes on every request to remain safe from out of memory conditions, but must also parallelize index joins (and lookup joins, if possible) to preserve reasonable performance.

SQL takes the performance side of the tradeoff: index joins and lookup joins that can return at most one row per input row do not set a bytes limit to gain parallel scans. To limit the size of the return batch, SQL limits the number of spans that are included in the scan request. This limit is set to 10,000.

Taking the performance side of the tradeoff is very dangerous, because applications that have tables with wide rows can easily expose the database to out of memory conditions, even if SQL does memory accounting (as it does as of #52496). To see why, consider the following situation:

root@127.0.0.1:50151/defaultdb> CREATE TABLE a (id INT PRIMARY KEY, attribute INT, blob TEXT, INDEX(attribute));
-- Imagine that each blob column is 10 megabytes.
root@127.0.0.1:50151/defaultdb> explain(opt) SELECT * FROM a WHERE attribute=10 AND blob LIKE 'blah%';
                      text
------------------------------------------------
  select
   ├── index-join a
   │    └── scan a@a_attribute_idx
   │         └── constraint: /2/1: [/10 - /10]
   └── filters
        └── blob LIKE 'blah%'

This index join will use the parallel scan method mentioned above for performance, and therefore does not include a target bytes attribute. As a result, KV will happily scan 10,000 rows of this table, which will cause KV to pull in 10,000 * 10 MB = 100 GiB of row data into memory, destroying even the most well-provisioned of servers.

This is a critical problem. Even with memory accounting on the SQL side (see #52496), KV has no protections here and can easily and haplessly kill its server or client by pulling too much data into memory.

Jira issue: CRDB-3727

jordanlewis commented 4 years ago

I'm not exactly sure how this will work. It seems we'd need to enhance the scan API to permit multiple resume spans, and SQL's kv fetcher would have to be extended to use this model properly. It's tricky because of ordering - the users of the fetcher expect the responses back in the same order as the requests, which would no longer be true with this model.

We'd have to make sure it's always possible to match a returned key value with the span that generated it, so SQL could correlate responses back with requests.

This API will be quite painful to work with, actually. Is there a better idea?

tbg commented 4 years ago

Maybe the better API would be adding a "prefetch limit" to the KV API, which defaults to whatever DistSender uses today for unlimited scans (and today is zero for limited scans). DistSender could parallelize the scans internally (partitioning the limit over the ranges in flight) without the need for more resume spans - it would return results in the same order as today (i.e. until range 1 is exhausted, range 2 would remain buffered). The prefetch limit could be set to something like 5-10, I assume, so that you get 1-2mb of target bytes per range, which is hopefully enough for reasonable cases. I don't know what kind of parallelism you need to make it worth your while. If you need something very high (100?) then at target bytes 10mb in the worst case we immediately load 9.9mb from some ranges of the right and then the rest of the scan degenerates into a sequential fetch with a very small chunk size, which is also bad (at that point, it might be better to discard the late spans and to just fall back to sequential).

I'm trying to come up with alternatives but nothing obvious comes to mind, and the loss of parallelism has been an issue before. What's a little awkward is that DistSender has no stats of the keyspace it's scanning, while SQL does. But on the other hand stats cannot be relied upon when OOMs are the price for getting it wrong. Another approach could be KV memory accounting, but we would have to chose the queuing variety or retry these RPCs from DistSender (but how often? We don't want another DOS vector here).

If we do work on this as I agree we should, I assume @nvanbenschoten and @andreimatei would make your best sparring partners.

petermattis commented 4 years ago

If DistSender is sending X parallel requests, the problem occurs when those X responses are received and blow out the memory on the receiver. Rather than sending unlimited Scan requests, DistSender could set TargetBytes on each as some fraction of a total TargetBytes value. We could have DistSender still obey the ordering guarantee it provides. If the first range returns too little data, then we have to return a ResumeSpan pointing into that range. I've forgotten the DistSender interface, but I think that means we have to toss out any data from later ranges. How problematic is that? We'll end up reading the data again, but at least we haven't OOM'd.

Reading @tbg's suggestion, mine seems largely similar. Optimistically try to scan in parallel, but set limits so we don't blow up. Failure scenario is bad performance, not out of memory. This might not be an optimal solution, but it avoids mucking with the DistSender API.

tbg commented 4 years ago

If the first range returns too little data, then we have to return a ResumeSpan pointing into that range.

Or we fall back to a linear scan (with limit of course), to avoid repeatedly hitting that same problem when the table reader comes back to read more. Either way, this approach will require tuning the TargetBytes/MaxNumResults/parallelism against each other. Intuitively 10mb/10k/10 might work well, if the parallelism of 10 ranges at a time is good enough. Maybe 100 also still works.

tbg commented 4 years ago

Btw, TargetBytes returns at least one row. So if a row is 10mb, and we're fanning out to 100 ranges, then we're still loading 1GB of ram regardless of the TargetBytes. Just food for thought.

Btw, I missed that SQL knows in the interesting cases that there's at most one row (=#colfams kv pairs) per span. That seems to indicate that the approach Peter and I suggest should work well, if the parameters are chosen such that the expected row size is smaller than the TargetBytes resulting from the choice of parameters. For example, if rows should be << 10kb each, then it would be reasonable to use concurrency of up to 10mb/10kb=1k goroutines, which should be more than enough (likely too many!)

nvanbenschoten commented 4 years ago

The proposals from @tbg and @petermattis make sense to me. I think we would want to fall back to something like a linear scan at the DistSender level in the case where the first range is truncated and returns a resume span, otherwise we'd need the user of DistSender to know when and how to adjust its prefetch limit.

I've always thought that we'd want to do some kind of adaptive parallelism at the DistSender level using an exponentially increasing parallelism count. But that runs into problems that these solutions don't, like that we'd be pessimistic about parallelism at first and only become efficient over time. Maybe that idea can be combined with these to achieve the benefits of each. Instead of falling back to a linear scan, we could generalize this by exponentially decreasing the parallelism.

So for instance, a scan may adapt in the following way:

target bytes: 16mb, per-request target bytes 128kb, 128 paralleism => returns resume span
target bytes: 16mb, per-request target bytes 256kb, 64 paralleism => returns resume span
target bytes: 16mb, per-request target bytes 512kb, 32 paralleism => succeeds

That would avoid the sharp cliff when we overestimate the prefetch limit. Instead, the penalty would be more gradual. Of course, the trade-off here is that this is more complex to implement.

Btw, TargetBytes returns at least one row. So if a row is 10mb, and we're fanning out to 100 ranges, then we're still loading 1GB of ram regardless of the TargetBytes. Just food for thought.

I'm skeptical that we're ever going to be able to handle unboundedly large rows effectively at this level. Maybe it's time to reconsider some kind of column/row size limit (https://github.com/cockroachdb/cockroach/issues/9556) so that KV can make even the smallest assumptions about data size limits.

sumeerbhola commented 3 years ago

FWIW, the approach I've observed/written in past systems (these were NoSQL systems without stats to guide fetching) is something like the following.

The desired scan consists of a single span. Caller indicates whether it requires sequential results or not for this scan. This way we can migrate callers as they become more capable in being able to handle parallel scan results. There is a flow control budget for memory reasons on the client node. That, and how much data the caller desires, is used to compute a memory budget on this client node. The "fetcher" library partitions the span across server nodes based on the current placement of data at nodes. This is the parallelization for the duration of this scan even if the data moves. So [a, g) may become [a, b):n1, [b, c):n2, [c, d):n1, [d, g):n3. Resumption is tracked separately for each of these partitioned spans. Within a partition, the reads will be sequential, even if later the data spreads across multiple nodes. In the CRDB context, [d, g) would not need to fall within a replica -- it can be wider (to reduce the number of partitions/fanout). And the fetcher can explicitly break the initially computed partitions into smaller ones to utilize more concurrency at a server node, if the fanout from the initial partitioning is small.

The fetcher

The fetcher has to repeat a request with budget to get more data from a server node. Since the response returns some budget the fetcher can redistribute it for its next requests:

The fetcher keeps returning rows to the initial caller as rows are retrieved. The caller has to indicate that it has processed a row (by accounting for its memory elsewhere) before that budget can be reused by the fetcher.

One issue is when the budget to a server node is insufficient for even a single row. We simply allowed the server to go over budget and return a single row. The fetcher would account for this overuse, so the remaining budget may become negative. But it could blow up the client if the fanout were high. We didn't have super-huge rows so we ignored this issue. Another option would be to return no row, and specify the length of the first row and return the budget. The fetcher can then decide how to redistribute the budget to get some server nodes to provide a row.

This can be extended to multiple spans in a BatchRequest, say [a, g), [i, k), [p, x) where the number of spans can be large (10,000 in the original example). If all data in [d, j) was at n1, then one of the partitions would be [d,g),[i,j):n1 which would have one resumption point, and would be scanned sequentially.

I suspect this is not dissimilar to what has been discussed earlier on this issue, but it was hard for me to tell given the code level details being discussed above.

ajwerner commented 3 years ago

Can somebody help me understand why we're setting out to do this underneath the Batch API? I agree that providing a mechanism to stream back a parallelized scan of a span while conforming to memory limits is super valuable and would be utilized in a number of places. I don't understand why we want this to happen underneath the existing batch request API? Wouldn't it be better to arm some library with some awareness of data placement (RangeCache, RangeIterator) and have it construct and send off batches? It can have full control over both memory and parallelism and deal with the client's expectation of a stream oriented API rather than a batch oriented one.

tbg commented 3 years ago

Can somebody help me understand why we're setting out to do this underneath the Batch API?

If I understand you correctly, you're asking why we're shoving this in DistSender.Send? That's a valid concern. We primarily want to make these changes to benefit table readers (or more generally, read-only requests emitted from SQL) and so there's an opportunity here to introduce a better API that the table readers actually want to use. Maybe that API sits on a library that uses a DistSender, or it's directly offered by DistSender, I'm not sure, there is enough tricky stuff going on behind the scenes that keeping it strictly on top of DistSender may be awkward.

ajwerner commented 3 years ago

I agree there are details which are tricky, which is why this library should be owned by the kv team. That being said, let's work backwards from what API the table reader wants. My intuition is that the Sender API which can send scans which do have memory limits and don't have parallelism is exactly the primitive you'd want to use to build a thing that does parallel scans.

The distsender is quite a complex abstraction as it is.

tbg commented 3 years ago

I agree that the Sender API is not what this needs to be using, so I think we're saying the same thing. If it can be a clean library I'm for it

ajwerner commented 3 years ago

I agree that the Sender API is not what this needs to be using, so I think we're saying the same thing.

:wink: depends on what "this" is in the sentence. To clarify, I'm saying the TableReader is already jumping through some hoops to map what it wants to a Sender and it probably would benefit from having a better, more streaming and iterating API. I'm also saying that such a thing can and should just use a Sender.

(this comment isn't necessarily looking for any reply)

tbg commented 3 years ago

I agree with that. The reason I am not sure it will be able to be its own little standalone library on top of a Sender is that we have all of these semantics around merging batch responses, handling errors, etc, which I think will make this library a little more complicated than you perhaps initially anticipate (in the worst case, it just becomes a second DistSender...). Or not! We will see that as we go through the exercise of trying. If we can use this as an opportunity to slim down DistSender rather than complicate it further, that would be great.

ajwerner commented 3 years ago

I'm not seeing it the way you are. I'm envisioning the library will never ever need to think about merging batch requests/headers/responses. It will only deal in KVs. One blocker here, on some level, is concurrent use of a transaction. I filed an issue about that recently. However, for read-only things, which this is, we can just construct N leaf transaction objects.

If we ever needed to get into the guts of thinking about transaction proto handling, I'd feel differently but I don't see why we would.

tbg commented 3 years ago

Are you just saying that there's no need to merge anything since we're talking about read-only requests only? I spot checked a few of the combine() methods and that largely makes sense, maybe with the exception of observed timestamps, which you'd want to propagate from responses to future request. I don't think we'd see many other relevant updates to the txn proto, but I haven't checked.

ajwerner commented 3 years ago

I'm not saying we throw away the accumulated state picked up by the concurrent scan workers (as I am now thinking of them), but that we use the mechanisms we have already built to handle exactly this use case. At the end of the day, when we schedule table readers for the same table on different nodes in distsql, it's parallel scanning, in a sense.

At the end of the scanning, we'd accumulate the state by calling LeafTxnFinalState(), merging those together, and then calling UpdateRootWithLeafFinalState(). Maybe we need to add some ability to create new leaf txns from leaf txns and them merge their state back together but that feels pretty trivial.

https://github.com/cockroachdb/cockroach/blob/01fd091148b88a5a5d4d135bf06f0c346388f74e/pkg/kv/txn.go#L1010-L1014

ajwerner commented 3 years ago

I'm realizing that this issue is, in some ways, too limited. This issue deals with creating a library for properly dealing with acheiving high throughput with proper memory management when we a single span which is presumably very large. There's another, related problem: when you have a bunch of keys or spans you would like to fetch with high throughput and proper memory management. My sense, right now, is that for things like lookup joins, we just pick some arbitrary row limit and construct a batch. However, if each of the keys is large, we may be in serious trouble. We have no way to tell a range to only service some of the GetRequests or ScanRequests in a batch based on memory limits. My sense is that these are pretty closely related problems.

nvanbenschoten commented 3 years ago

@aayushshah15, @stevendanna, and I have determined that this is quite important for TPC-E. TPC-E relies quite heavily on pipelines of lookup joins, often with branching factors between 30-40. We don't currently set row/memory limits on lookup-joins if the lookup is on a key - meaning that each lookup scan will only return at most one row. https://github.com/cockroachdb/cockroach/blob/9eb1b960201e4a2a07bdbdf132f85b4e2b8563b8/pkg/sql/rowexec/joinreader.go#L228 However, this is not the case in a few of TPC-E's most expensive queries, where the lookup joins can return 2-3 rows per scan. And so we end up running these scans sequentially, which hurts. In a custom branch, I've disabled row/memory limits across all lookup-joins and saw a large improvement in performance.

jordanlewis commented 3 years ago

Great data @nvanbenschoten, thanks for putting this together. I actually wasn't thinking much about the performance angle when I originally wrote this issue - just the safety angle. To summarize, we have 2 reasons for this work:

  1. Safety for index joins (we currently disable the limit, which can cause OOMs with wide rows).
  2. Performance for lookup joins (we enable the limit to prevent OOMs, which causes poor performance).
jordanlewis commented 3 years ago

Another data point is that @sumeerbhola and I don't think we can enable #66362 by default, which enables KV-level memory accounting, without a solution for this issue.

The reason is that, assuming KV can decide to reject a ScanRequest half way through, it is important that KV can return its partial results rather than return an error to make sure queries are always completable in normal circumstances. If KV had to return an error, and not a partial result, we could conceivably have a case where a single index join on a table with large rows would be impossible to complete with no reasonable workaround from users.

tbg commented 1 year ago

This issue seems a bit sleepy, but related OOMs still come up in customer escalations, most recently here: https://github.com/cockroachlabs/support/issues/2338#issuecomment-1570944081. Since these issues tend to traverse TSE -> SQL Q L2 -> KV L2, they are quite costly when they happen.

yuzefovich commented 1 year ago

That particular instance of this issue (index joins) from https://github.com/cockroachlabs/support/issues/2338 has already been addressed in 22.2.