facebookincubator / velox

A composable and fully extensible C++ execution engine library for data management systems.
https://velox-lib.io/
Apache License 2.0
3.5k stars 1.15k forks source link

Optimize operator's performance when vector has low selectivity #7801

Open shenh062326 opened 11 months ago

shenh062326 commented 11 months ago

Description

When we tested the performance of velox, in most cases the HashAggregation performance of velox was much faster than that of presto, but we found that it was slower than presto in one case. In the HashAggregation of TPCH-100G Q3, since the join and filter conditions before the HashAggregation operator will filter most of the data, which is about 600 million -> 3 million, and the velox filter does not materialize new Vectors, the Vector's selectivity passed to HashAggregation is very low. Sometimes a Vector only has 1 or 2 records. In fact, DictionaryVector only has one or two records, but the amount of Vector data in DictionaryVector is very large. In this case, the vectorization optimization of HashAggregation cannot work, and the performance is greatly reduced, even slower than presto.

To address this problem, we proposed an optimization idea. When the input Vector selectivity of HashAggregation is very low (such as 0.5%), the HashAggregation operator copies the Vector data to a new reusable Vector, and does not process it immediately, when the new Vector data volume is greater than a certain amount, the new Vector will be processed. In this case, there is the cost of additional data copying, however, it can take advantage of the vectorization optimization in HashAggregation. In our tests, the performance of HashAggregation increased by more than 6 times when the selectivity was very low.

In addition, this optimization idea is not only effective for HashAggregation, but also theoretically has optimization effects on other operators, such as join, windows, etc. Therefore, we will implement a general optimization, that is, the Operator can add a method to determine whether sparse vector optimization needs to be turned on. If an operator turned on the optimization, it will be optimized by compacts sparsity Vector.

In addition, we have also seen similar optimizations in photon’s paper, "Photon compacts column batches before probing hash tables in order to increase memory parallelism. This feature is adaptive because Photon tracks the sparsity of column batches at runtime. To illustrate its benefits, we ran TPC-DS query 24 on a 16-node cluster with and without adaptive join compaction. Figure 9 shows the results. Overall, compaction produces a 1.5× improvement compared to probing the sparse batches directly. "

Yuhta commented 11 months ago

@mbasmanova This is similar to what you did for exchange in #7404 (a little different that we were accumulating pages instead of vectors there), shall we have some logic in driver to apply this logic to all non-source operators?

Another idea is to accumulate this in source operators only (i.e. TableScan and Exchange). Exchange is done with #7404, we can add something in TableScan to make the batch size larger as well.

mbasmanova commented 11 months ago

This makes a lot of sense. Presto has this optimization:

/**
 * This class is intended to be used right after the PageProcessor to ensure
 * that the size of the pages returned by FilterAndProject and ScanFilterAndProject
 * is big enough so it does not introduce considerable synchronization overhead.
 * <p>
 * As long as the input page contains more than {@link MergingPageOutput#minRowCount} rows
 * or is bigger than {@link MergingPageOutput#minPageSizeInBytes} it is returned as is without
 * additional memory copy.
 * <p>
 * The page data that has been buffered so far before receiving a "big" page is being flushed
 * before transferring a "big" page.
 * <p>
 * Although it is still possible that the {@link MergingPageOutput} may return a tiny page,
 * this situation is considered to be rare due to the assumption that filter selectivity may not
 * vary a lot based on the particular input page.
 * <p>
 * Considering the CPU time required to process(filter, project) a full (~1MB) page returned by a
 * connector, the CPU cost of memory copying (< 50kb, < 1024 rows) is supposed to be negligible.
 */
@NotThreadSafe
public class MergingPageOutput

It would be nice to come up with something similar in Velox. Then, operators that "filter" data (Filter, Join, etc.) can use this to produce output of a reasonable size.

Yuhta commented 11 months ago

@mbasmanova So we need to do it for all the filtering operators. Shall we add it in the driver run logic or add it to the separate operators?

mbasmanova commented 11 months ago

I think it is better be part of individual operators so that CPU and memory usage is attributed properly.

Yuhta commented 11 months ago

I can start adding something to TableScan to implement this then

So the idea is to accumulate the output of operator instead of input (except the Exchange we have already done). The operators that can filter:

  1. TableScan
  2. FilterProject
  3. HashProbe (inner)
  4. MergeJoin (inner)
shenh062326 commented 11 months ago

It seems better to accumulate the output of operator instead of input, since the operators are lesser. Is there a detail development plan? I can help if need.

Yuhta commented 11 months ago

Thins we need are:

  1. A utility to merge multiple vectors into one vector. Potentially reuse components from input (e.g. string buffers). Interface is simple (RowVectorPtr merge(folly::Range<const RowVectorPtr*>) but implementation can be complex, for example what is the strategy to merge different encodings? We may need to merge and deduplicate dictionary values if we want to preserve dictionary encoding. We can start with a flattening copy but need to iterate on the implementation later on real world queries.
  2. Decide a mechanism to when to apply this merging. The ideal mechanism should be adaptive, so we need to read the timing from the sink operator to decide whether the merging should happen or not. Add metrics to track how often the optimization kicks in.
  3. Use that mechanism in the 4 operators above.

I think (2) above needs more discussion. How do we measure if we need to apply this optimization in an adaptive way? Do we measure the ingestion time of sink operator? Or do we measure the output throughput?

shenh062326 commented 11 months ago
  1. We have implemented "merge multiple vectors into one vector". If necessary, we can submit the a PR.
  2. "Decide a mechanism to when to apply this merging", Presto MergingPageOutput.producePositionCountOnlyOutput has implemented the merging mechanism, (1) If the number of input page rows is greater than MAX_BATCH_SIZE, output directly, (2) Add two configurations filter_and_project_min_output_page_row_count, filter_and_project_min_output_page_size, if the page or buffer pages are greater than the configured value, it will be output, otherwise the page will be cached. (3) Corresponding metrics need to be added to confirm the cost of this optimization.
shenh062326 commented 11 months ago

In addition, we can also contribute benchmark for this optimization.

mbasmanova commented 11 months ago

@shenh062326

It would be nice to reuse preferred_output_batch_bytes config for deciding whether the output vector is large enough.

We have implemented "merge multiple vectors into one vector". If necessary, we can submit the a PR.

This would be nice. We can iterate from there.

shenh062326 commented 11 months ago

@mbasmanova Ok, I will submit a PR first.

Yuhta commented 11 months ago

If the number of input page rows is greater than MAX_BATCH_SIZE, output directly,

This does not sound correct to me. The output of filtering operators is not directly related to the size of input.

I am thinking about in addition to the configs as the size thresholds, we should add some adaptive feedback control to decided whether this optimization kicks in.

xumingming commented 11 months ago

I can start adding something to TableScan to implement this then

So the idea is to accumulate the output of operator instead of input (except the Exchange we have already done). The operators that can filter:

  1. TableScan
  2. FilterProject
  3. HashProbe (inner)
  4. MergeJoin (inner)

hi @Yuhta, I don't understand why TableScan can filter data, it is the source of data and have no upstream operator?

mbasmanova commented 11 months ago

@xumingming TableScan may have an optional filter.

pedroerp commented 6 months ago

Just getting to this discussion. Are there any updates?

Overall I agree with the point that the batching should be done in the source operator; essentially making sure that operators always produce batches that are of reasonable size. Otherwise we would need to add this batching to the input of every single operator, as we can't really control what is feeding them.

So essentially we need this logic on every single operator that can be cardinality reducing, like filterproject, hash, and merge joins. Where else?

waitinfuture commented 5 months ago

So essentially we need this logic on every single operator that can be cardinality reducing, like filterproject, hash, and merge joins. Where else?

@pedroerp distinct hash agg, hash agg with pregrouped keys, local partition, can also reduce cardinality

Yuhta commented 5 months ago

There 2 types of operators that reduce batch size, filtering (FilterProject, joins) and aggregate, the former is more important because the number of batches is not significantly reduced. For aggregation, if we reduce the number of batches at the same time when we reduce size, it is much less of a problem.

YimingQiao commented 1 day ago

Hi all, @pedroerp @Yuhta

I encountered a similar issue in DuckDB and wanted to share my research paper, Data Chunk Compaction in Vectorized Execution, which has just been accepted at SIGMOD 2025.

In this paper, we highlight the "small chunk problem" in vectorized execution, identifying the filter and hash join as the most problematic operators. We propose two solutions: Learning Compaction for filters and Logical Compaction for hash joins. I think the Logical Compaction is applicable to many vectorized hash join variants.

Additionally, I have open-sourced the (step-by-step) code, which includes problem simulations, micro-benchmarks, and end-to-end experiments.

I hope this work contributes meaningfully to the database community."

YimingQiao commented 1 day ago

Hi all, @pedroerp @Yuhta

I encountered a similar issue in DuckDB and wanted to share my research paper, Data Chunk Compaction in Vectorized Execution, which has just been accepted at SIGMOD 2025.

In this paper, we highlight the "small chunk problem" in vectorized execution, identifying the filter and hash join as the most problematic operators. We propose two solutions: Learning Compaction for filters and Logical Compaction for hash joins. I think the Logical Compaction is applicable to many vectorized hash join variants.

Additionally, I have open-sourced the (step-by-step) code, which includes problem simulations, micro-benchmarks, and end-to-end experiments.

I hope this work contributes meaningfully to the database community."

A limitation of this paper is that it doesn’t address the distributed database case, as I’m not yet deeply familiar with distributed systems. I wonder if there are additional challenges specific to distributed databases beyond those observed in standalone systems.