cockroachdb / cockroach

CockroachDB - the open source, cloud-native distributed SQL database.
https://www.cockroachlabs.com
Other
29.51k stars 3.7k forks source link

perf: distsql row batching #20555

Closed asubiotto closed 5 years ago

asubiotto commented 6 years ago

DistSQL processors currently operate on a row-by-row basis. The downside to doing this is that processors may only process one row at a time. Take for example the TableReader. Although the underlying MultiRowFetcher fetches a batch of rows at once, it is constrained by the RowReceiver API whose Push method only allows pushing one row at a time. This results in the TableReader looping over the batch of rows and sending one row at a time to the RowReceiver (which could be local or remote communication). The TableReader also waits until this row is read by a downstream processor which use the equally constrained RowSource.Next() to receive the next available row.

There are two main points of work to change the distsql infrastructure to allow row batching after performance investigation has been done: 1) Change the RowReceiver and RowSource APIs and implementations to allow pushing/pulling of batches of rows. 2) Change the processors to be able to operate on batches of rows as opposed to one row at a time.

rjnn commented 6 years ago
  1. We need a shim processor/router that unbatches rows for processors that do not support row batches.
  2. Initially we want to rely on fixed sized arrays of rows as our batch format, but in the long term we probably want to columnarize the batches, so that we pass fixed size arrays of columns as our batch format.
  3. We want to investigate the cost of having our batches be always decoded, instead of having the extra layer of indirection in passing encoded datums and calling EnsureDecoded on every datum.
petermattis commented 6 years ago

One thing I'm fuzzy on: while it is straightforward to define the interface to RowSource.Next() and RowReceiver.Push() to take multiple rows, it is not clear to me how processors internally decide to form batches. For example, let's imagine we're implementing #20550 and make mergeJoiner implement the RowSource interface. When mergeJoiner.Next() is called it needs to a batch from the left and right inputs and then compute the output rows. That output might be empty forcing it to get more rows for the left or right input. Let's say it generates one output row at this point. Does it return from the call to Next(), or does it try to fetch more rows from the inputs? Returning from Next() with a single row has the upside that it reduces latency. Trying to batch more rows might allow more efficient processing.

  1. Always decoding row datums is likely a pessimization for some processors. For example, count(*) never decodes any datums.
rjnn commented 6 years ago

Always decoding row datums is likely a pessimization for some processors. For example, count(*) never decodes any datums.

We have a notion of neededColumns. If our batch stores a representation that decodes all the needed columns and simply elides the unneeded columns, I think that would eliminate most of the downsides.

Where we currently elide decodes is when we have equality comparisons between two datums that are encoded under the same encoding - we can directly compare the encoded forms. However, it's not clear to me how often this fast path gets used, particularly as our current code re-encodes using a default value encoding if the encodings are different, which makes performance worse.

Benchmarking is needed.

rjnn commented 6 years ago

Does it return from the call to Next(), or does it try to fetch more rows from the inputs? Returning from Next() with a single row has the upside that it reduces latency. Trying to batch more rows might allow more efficient processing.

I'd say this can only really be answered with benchmarking. To lay my biases out in the open, once processors are cache-aligned and vectorized, waiting to accumulate a full batch is probably faster for memory bandwidth reasons.

petermattis commented 6 years ago

I'd say this can only really be answered with benchmarking. To lay my biases out in the open, once processors are cache-aligned and vectorized, waiting to accumulate a full batch is probably faster for memory bandwidth reasons.

I agree with the need to benchmark/experiment. While accumulating a full batch will be better for memory bandwidth, it might decrease concurrency. These things can be tricky to predict.

My instinct is to first move forward with eliding RowChannel where possible (#20550) since I think the work is relatively straightforward. It is interesting that both @RaduBerinde and @andreimatei think batching should be tackled first. They are both very smart and more knowledgeable about distsql, so I'm wondering what I'm missing here.

RaduBerinde commented 6 years ago

One thing I'm fuzzy on: while it is straightforward to define the interface to RowSource.Next() and RowReceiver.Push() to take multiple rows, it is not clear to me how processors internally decide to form batches.

An insight here is that it's not critical for all batches to have a bunch of rows; the important thing is that each batch should be associated with enough "work" (e.g. a bunch of rows having been processed at some point). So if a processor received a batch of rows but can produce only one output row, it's fine to just send that output row.

andreimatei commented 6 years ago

One thing I'm fuzzy on: while it is straightforward to define the interface to RowSource.Next() and RowReceiver.Push() to take multiple rows, it is not clear to me how processors internally decide to form batches.

Ideally, a processor would batch things up until trying to batch more would cause it (or the whole flow underneath) to block. Batching less than that (for better latency) seems not worth it to me; batching more would be expensive. Batches it produces cannot be empty (that'd be nonsensical), so it'd also wait until it has produced at least one row. If all the processors run synchronously wrt each other, than a first approximation of that would be the TableReaders doing something custom, the network protocol being made aware of batches somehow, and everybody else reading one batch at a time from one of its inputs and producing a batch with whatever that resulted in. For example, in the mergeJoiner case, a call to Next() could mean that Next() is called on whatever input doesn't have anything buffered, and that batch would be joined with whatever is currently buffered from the other one (so, we're assuming that a call to Next() on the other input would always block). Beyond that, I think we need to expand the RowSource interface with non-blocking reads.

My instinct is to first move forward with eliding RowChannel where possible (#20550) since I think the work is relatively straightforward. It is interesting that both @RaduBerinde and @andreimatei think batching should be tackled first. They are both very smart and more knowledgeable about distsql, so I'm wondering what I'm missing here.

Well, my opinion is that we need to do batching anyway because it would result in tighter loops in all processors, which has got to be good for something. And with batching, probably benchmarks numbers informing "eliding RowChannels" would look differently. I'm definitely also a fan in general of getting rid of those channels when advantageous, but it's just that I didn't have time to think about this recently and I'm not immediately sure what we want. Like, we may want some things to run in parallel within a machine (e.g. multiple TableReaders wrt each other, or multiple components of a flow that's not connected on a machine - the components only connect on a different machine). And then the question becomes who choses which processors are scheduled concurrently and which are "fused". So basically here I would like to see a design written down. It's possible that this is not very hard and you or someone else has all the answers, but I just didn't think about it very much.

petermattis commented 6 years ago

@RaduBerinde says:

An insight here is that it's not critical for all batches to have a bunch of rows; the important thing is that each batch should be associated with enough "work" (e.g. a bunch of rows having been processed at some point). So if a processor received a batch of rows but can produce only one output row, it's fine to just send that output row.

Ok, that heuristic definitely simplifies things an gives a concrete starting point.

@andreimatei says:

Well, my opinion is that we need to do batching anyway because it would result in tighter loops in all processors, which has got to be good for something. And with batching, probably benchmarks numbers informing "eliding RowChannels" would look differently.

"Tighter loops" within the processors is a bit speculative. I agree that we'll eventually be able to do something there, but I think it is unlikely we'll find significant wins in the near term. What is likely is that batching will reduce the overhead of inter-processor communication which will reduce the need for eliding RowChannels. My contention here is that eliding RowChannels will be a bigger win, because batching can devolve into the current row-at-a-time under query-specific circumstances.

I'm definitely also a fan in general of getting rid of those channels when advantageous, but it's just that I didn't have time to think about this recently and I'm not immediately sure what we want. Like, we may want some things to run in parallel within a machine (e.g. multiple TableReaders wrt each other, or multiple components of a flow that's not connected on a machine - the components only connect on a different machine). And then the question becomes who choses which processors are scheduled concurrently and which are "fused". So basically here I would like to see a design written down. It's possible that this is not very hard and you or someone else has all the answers, but I just didn't think about it very much.

I'm not seeing a big downside to always "fusing" processors (i.e. eliding RowChannels) when possible. Yeah, I think there are cases where we'd benefit from concurrency on a single node, but I think there are many more cases where the current approach is a pessimization. As for the question of who choses which processors are scheduled concurrently and which are "fused", I think that would be decided by distsql physical planning which will eventually be incorporated into normal sql planning.

We really need some Go-level benchmarks of the various distsql processors and example flows. Those benchmarks along with some experimentation will make the tradeoff between concurrency and "fusing" much clearer. #20584 is one data point. We need more.

asubiotto commented 6 years ago

How I see it is that we currently have a system that performs at a certain level. Adding row batching is only going to increase performance. We amortize the communication overhead (trending towards eliding RowChannels and remote communication) and unlock potential gains by allowing processors to operate on batches of rows at a time. Additionally, it's a pretty straightforward change (this is subjective, but I think basic infrastructure changes and use inCOUNT(*) can be achieved by the end of next week). We won't ever wait to create batches of rows (as andrei says) resulting in worst case performance doing what we already do.

The biggest performance impact that row channel elision is going to offer us will be addressed by row batching since row channel overhead will be amortized. It's not going to help with any remote communication or processor efficiency with the cost of changes to the planner to figure out when we want synchronous or asynchronous scheduling.

Both of these things will be implemented at some point or another but row batching is in my opinion, lower hanging and more impactful fruit.

What concrete examples are you thinking about for benchmarks? I'm pushing up a draft branch with basic row batching (in rowchannel and tablereader) and an example (relatively meaningless) benchmark.

rjnn commented 6 years ago

I'd like to see batched vs unbatched benchmarks for COUNT(*) and SUM queries. Then we can quickly adapt the benchmark from #20584 to do a 4 way benchmark. If most of the gain in RowChannel elision is captured by batching, we should be able to see that.

petermattis commented 6 years ago

@asubiotto #20584 is a data point showing the benefit of eliding RowChannel for a count(*) query. I will be very surprised if row batching provides more benefit, though I suppose it is theoretically possible due to concurrency.

What concrete examples are you thinking about for benchmarks? I'm pushing up a draft branch with basic row batching (in rowchannel and tablereader) and an example (relatively meaningless) benchmark.

We need benchmarks for each of the processors. See BenchmarkHashJoiner for an example. We also need benchmarks for a few common examples of local flows. I don't know what those local flows would look like, but if they are representative it will make it easier to contrast RowChannel elision with row batching. The benchmarks should probably be added in isolated PRs.

Both of these things will be implemented at some point or another but row batching is in my opinion, lower hanging and more impactful fruit.

Ok. I'm still not seeing it, but I like to be proven wrong.

asubiotto commented 6 years ago

Cross posting from #20621, might be better to continue the conversation here:

Thanks for taking a look at this. It's definitely useful to have these benchmarks. I'll add these in separate PRs and see if I can do them in a less manual manner.

The big thing that stands out to me is the benefit that row channel elision gives us regarding allocations. We can try to approximate that with row batching by, as you say, pooling allocations and reusing batches once consumed but it is nice to have that by default.

I want to go back to the question of what we should focus on before the feature freeze. We definitely get a 50% performance improvement in this benchmark by using row batching. Row channel elision can provide better performance by removing unnecessary allocations, but I'm wondering how general that improvement can be since this only applies when we would synchronously plan processors on the same node. Consider the example in this benchmark. Although realistic to a certain extent, a user running a COUNT(*) will probably have at least one other node involved in the query. The aggregator at the end will have to wait for the slowest node to return the last results (unless we implement online aggregation), so although local results will flow in much faster, remote flows will dominate latency. Row batching, on the other hand, will improve these cases.

Additionally, row channel elision is going to be a huge change. It fundamentally changes how we think about processors/communication in distsql and requires some careful thinking when planning. I definitely don't see this change making it into 2.0. Row batching is a much less invasive change that we need anyway, gives us significant performance improvements and needs a lot less work to provide us with significant results (I'm thinking COUNT(*)).

I would like to make a decision by the end of the day since the feature freeze is coming up pretty soon.

petermattis commented 6 years ago

I want to go back to the question of what we should focus on before the feature freeze.

Why can't we do both row batching and row channel elision by feature freeze?

rjnn commented 6 years ago

I believe we can do both. I can get to elision after mvccscan improvements.

asubiotto commented 6 years ago

I've done some investigation and believe that the performance benefits of adding row batching communication between all processors have decreased significantly since this issue was created and it's not worth pursuing at this time. This change had two motivations: 1) Amortize RowChannel overhead 2) Increase processor efficiency by operating on batches of rows

However, row channel elision (https://github.com/cockroachdb/cockroach/issues/20550) has proven to provide the same benefits with regards to row channel overhead and increased processor efficiency was a vague idea that when investigated further, didn't prove to be easily achievable (if at all until we figure out better row/batch representation https://github.com/cockroachdb/cockroach/issues/21523) in the short term.

So the only reason we would have to pursue this change further is if we can get some wins with regards to row channel overhead that we can't achieve easily with row channel elision. Looking at various TPCH queries, there are various components used in a distsql plan that currently make use of RowChannels: 1) Processors (https://github.com/cockroachdb/cockroach/issues/21220)

2) Routers

3) Unordered synchronizers (implemented as a RowChannel for single input and MultiplexedRowChannel for multiple inputs).

4) outboxes and inbound streams

With the current code, we could batch the rows sent over the MultiplexedRowChannel by the inbound stream to amortize the overhead since the inbound streams read batches sent over the network https://github.com/cockroachdb/cockroach/blob/b01eaea5168e5498c4ddd0291cdcc0295d02ac97/pkg/sql/distsqlrun/data.pb.go#L501. This would be then unbatched by a downstream row accumulator.

So it seems that row channel elision through implementing RowSource takes care of nearly all the cases of RowChannel usage. We could easily add a batching change to the MultiplexedRowChannel but that seems like the only change related to batching that will have an impact.

cc @jordanlewis

jordanlewis commented 6 years ago

Thanks for digging into this, and great analysis. I think that you're likely spot-on that there are bigger fish to fry than row batching right now.

Could you include some of the TPCH queries and their generated plans that create these kinds of undesirable RowChannels?

Perhaps that could inform us further as to which of the items you mention are high priority to fix first.

petermattis commented 6 years ago

@asubiotto Thanks for the write-up, though I agree with @jordanlewis that there is some additional legwork to do here. In addition to looking at the TPCH queries, I think we should also look at the distsql logic tests as doing so could identify simple queries that touch each of the remaining uses of RowChannel.

Routers

All routers implement RowReceiver, changing them to implement RowSource shouldn't be too hard.

We have issues filed about some of the other uses of RowChannel (e.g. https://github.com/cockroachdb/cockroach/issues/20553 and https://github.com/cockroachdb/cockroach/issues/20568), can you file one about changing routers to implement RowSource? Be sure to include some high-level guidance on what would be involved.

asubiotto commented 6 years ago
screen shot 2018-02-05 at 12 26 36 pm

The above is TPCH query 3. Currently, only direct connections from a TableReader to a downstream processor (here the left side of a HashJoiner and two JoinReaders) have RowChannels elided, the biggest impact for this query will be the work already being done in https://github.com/cockroachdb/cockroach/issues/21220, specifically work on the JoinReader, HashJoiner, and Sorter.

screen shot 2018-02-05 at 12 27 50 pm

This is TPCH query 7 where we start to see an interesting pattern, and one that comes up quite often (as we can see in following queries) where a hashRouter routes rows to a HashJoiner (local + remote). The nice thing is that up to the first uses of hashRouters, all RowChannels are elided (including in some TableReader-JoinReader pair ups similar to the last query), but we have two main, related, items of work that would speed up this query: 1) hashRouter to local HashJoiner (wherever you see an unordered oval with an incoming stream from a hashRouter on the same node): The hashRouter pushes rows to a RowChannel that the downstream HashJoiner pulls from. 2) hashRouter to remote HashJoiner: This seems to be a crippler considering the likely large amount of rows (TableReaders without filters) that are sent over the wire. This case can be further decomposed into three problem areas: a) hashRouter to outbox. This is any time you see an arrow cross over from a hashRouter to another node. There is a RowChannel between the two components. b) outbox batching. We're sending a maximum of 16 rows over the network: https://github.com/cockroachdb/cockroach/issues/22385. c) ProcessInboundStream on the receiving node which pushes to a RowChannel read by a HashJoiner. These are the unordered ovals that receive rows from another node. Note that these are sometimes MultiplexedRowChannels which are a bit more complicated, as described in a previous comment.

It is important to note that work on processors will affect this query minimally. The only effect that is not currently present that will be in the future is Sorter's implementation of RowSource. This will remove the RowChannel between the final Aggregator and Sorter on each of the participating nodes.

Also note that the hashRouter foils any RowChannel elision that would happen otherwise (e.g. Aggregator->Aggregator. Any remote communication between nodes from processor to ordered synchronizer (e.g. Sorter on node 3 to orderedSynchronizer on node 1) is subject to the same limitations as described with the hashRouter's remote communication.

screen shot 2018-02-05 at 12 29 22 pm

TPCH query 8 exhibits the same performance limitations as query 7. As does query 9 (there isn't much point to displaying the plan).

A different sort of query was query 19:

screen shot 2018-02-05 at 12 30 28 pm

There are no hashRouters here but we can see a common limitation with TableReaders that communicate remotely to a HashJoiner (same remote communication limitations as discussed above).

Next, instead of looking at logictest queries I looked at querybench queries, which is a group of queries we wanted to focus our performance efforts on (https://github.com/cockroachdb/cockroach/issues/14288).

I'm not looking at simple aggregations which send minimal rows over the network and are pretty well targeted by the work to have processors implement RowSource. Let's start with a distributed sort:

screen shot 2018-02-05 at 2 49 37 pm

On each node, there is currently a RowChannel between processors, the work being done on the Sorter will be a significant improvement. There is the same remote communication limitation as discussed in all other queries. Another point of focus is the orderedSync. It has two types of inputs: 1) ProcessInboundStream with a RowChannel Since this is an ordered synchronizer and it needs to read from all inputs before producing the next row, we don't run into the issues described with MultiplexedRowChannel. The RowChannel between the ProcessInboundStream and the orderedSync can be gotten rid of by making the inbound stream implement RowSource. This falls under the remote communication limitations that a lot of the other queries face. 2) Sorter with a RowChannel This will be solved onceSorterimplementsRowSource`.

screen shot 2018-02-05 at 12 51 34 pm

It's the same kind of story with distinct (but with less rows). The communication to the unordered synchronizer is the protagonist. Distinct implementing RowSource will get rid of some overhead.

The other queries exhibit the same limitations and the hash join underlines again the effect the hashRouter has on RowChannel elision.

Based on this I think it's clear that: 1) The hashRouter is blocking a lot of easy gains (local communication) in joins. 2) There is a more involved overhead related to remote communication that all queries exhibit. Work to be done here includes: a) Elide the upstream RowChannel from the outbox. b) Investigate batching over the network (https://github.com/cockroachdb/cockroach/issues/22385). c) Model the inbound stream as a RowSource to be able to elide the downstream RowChannel (https://github.com/cockroachdb/cockroach/issues/20553).

These are in what I believe ascending order of ease/impact given how important and widely used joins are.

petermattis commented 6 years ago

Thanks, @asubiotto.

The hashRouter is blocking a lot of easy gains (local communication) in joins.

Let's get a separate issue filed about this. I haven't grokked the needs of routers yet. Did you have a concrete idea for how to "invert" them in order to allow data to be pulled through them? This seems to tie in directly to the next item as it appears that hash routers are connected to outboxes in the diagrams you provided. (Is the every not the case?)

a) Elide the upstream RowChannel from the outbox.

This is #20568, though it is possible that isn't the right approach.

asubiotto commented 6 years ago

@petermattis I thought I had a concrete idea but it turns out I didn't think about it hard enough. Inverting routers doesn't seem a solution here, especially since we can currently have multiple concurrent consumers. The pattern seems to be mostly outboxes which we could push to directly + a local HashJoiner. I'll create an issue about this and we can discuss further there.

asubiotto commented 5 years ago

Work on the vectorized execution engine, where operators work on batches of columns at once has rendered distsql row batching unnecessary.