facebookincubator / velox

A composable and fully extensible C++ execution engine library for data management systems.
https://velox-lib.io/
Apache License 2.0
3.53k stars 1.16k forks source link

Optimization of Left Semi Join and Anti Join #11212

Open liujiayi771 opened 1 month ago

liujiayi771 commented 1 month ago

Description

Background

The semantics of left semi join involve retaining rows from the probe side that have matching join keys with the build side. In constrast, anti join retains rows from the probe side that do not have matching join keys with the build side. Both types of join will only return rows from the probe side, and build side only needs to check for the presence of matching join keys. As a result, the build side does not need to store rows with duplicate join keys.

In the current implementation of Velox's HashBuild during the addInput stage, even though left semi and anti join can ignore duplicate join keys from the build side, all input rows are still stored in the RowContainer of the HashTable. This is because during the addInput stage, the HashTable is not yet build, making it impossible to perform deduplication at this point. Once the addInput process is complete, we end up with a RowContainer that contains all input rows. During the finishHashBuild stage, the HashTable is build using the rows from the RowContainer. The HashTable has an allowDuplicates parameter that controls whether it includes rows with duplicate join keys. For left semi and anti join, duplicate rows are ignored in this stage, but these rows are not discarded and remain in the RowContainer, which consumes a significant amount of memory space.

Taking TPC-DS Q95 as an example, it contains multiple left semi join, and more than 90% of the rows of the build side have duplicate join keys. If we can remove duplicate rows stored in the RowContainer, it would save a lots of memory space.

Design

To deduplicate the rows on the build side during the addInpute stage, we need to build the HashTable directly during this stage and avoid adding rows with duplicate join keys to the RowContainer. This process is similar to the HashAggregation, where during the addInput stage, we can use the HashTable::prepareForGroupProbe and groupProbe methods to construct the HashTable directly. Since the HashTable's allowDuplicates parameter is set to false, it will not retain rows with duplicate join keys. When HashBuild operator runs single-threaded, the hash table obtained in this way can be used directly. However, when executed in a multi-threaded way, each thread's RowContainer does not contain duplicate join keys, but there will still be duplicate join keys overall. it is still necessary to re-build the hash table, although it can help reduce the amount of data stored in the RowContainer.

The above optimization will be effective when the join keys in the input data have a high duplication rate. In cases where the input data has a very low duplication rate, it will be inefficient to build the hash table at this stage given that it will finally re-build the hash table. Therefore, making this optimization adaptive would be a better choice. This is very similar to the adaptive feature of early abandon in aggregation. We can calculate the duplication rate of the data during the HashBuild addInput stage. If it falls below a certain threshold, we can skip constructing the hash table in the addInput stage. Additionally, no extra operations are needed after skipping the optimization; everything can remain consistent with the original process. The data that has already been inserted into the RowContainer can be preserved, and the hash table can be built after all data has been added. This will not affect the correctness of the final semi/anti join hash table.

Some additional explanations

Since the join will construct a dynamicFilter, and the dynamicFilter relies on uniqueValues_, the contents of uniqueValues_ need to be complete. The original addInput process for HashBuild continuously computes the input’s valueId when analyzeKeys_ is true, ensuring that the hasher is analyzed and contains complete min, max, and uniqueValues_ information. After introducing this optimization, the hasher may not be analyzed after the hash table is constructed; it would require actively calling the analyze method on the HashTable to obtain the correct uniqueValues_ for use in the dynamicFilter.

https://github.com/facebookincubator/velox/pull/7066

liujiayi771 commented 1 month ago

cc @mbasmanova, @aditi-pandit.