facebookincubator / velox

A C++ vectorized database acceleration library aimed to optimizing query engines and data processing systems.
https://velox-lib.io/
Apache License 2.0
3.3k stars 1.09k forks source link

Support reading Iceberg equality delete files (Design) #8748

Open yingsu00 opened 5 months ago

yingsu00 commented 5 months ago

Description

In https://github.com/facebookincubator/velox/pull/7847we introduced IcebergSplitReader and the support of reading positional delete files. In this doc we will discuss the implementation of reading equality delete files.

Iceberg Equality Deletes Overview

A general introduction of equality delete files can be found at https://iceberg.apache.org/spec/#equality-delete-files. Some key takeaways:

  1. An equality delete file can contain multiple fields(could be sub-fields), and the values for the fields in the same row are in AND relationship. E.g. The following equality delete file
equality_ids=[1, 3]
1: id  | 2: category | 3: name
-------|-------------|---------
 3     | NULL        | Grizzly

means:

    - A row is deleted if (id = 3 AND name = 'Grizzly') is true. Or
    - A row is selected if (id <> 3 OR name <> 'Grizzly') is true
  1. The equality delete field value could be NULL, which means a row is deleted if that field is NULL.

    equality_ids=[2]
    1: id  | 2: category | 3: name
    -------|-------------|---------
    3     | NULL        | Grizzly

    The expression specifies:

    - A row is deleted if category IS NULL. Or
    - A row is selected if category IS NOT NULL
  2. An equality delete file could contain multiple rows.

    equality_ids=[1, 3]
    1: id | 2: category | 3: name
    -------|-------------|---------
    3      |    NULL     | Grizzly
    5      |    Bear     | Polar

    means:

    - A row is deleted if (id = 3 AND name = 'Grizzly') OR (id = 5 AND name = 'Polar')  is true. Or
    - A row is selected if (id <> 3 OR name <> 'Grizzly') AND (id <> 5 OR name <> 'Polar')  is true
  3. A split can contain multiple equality or positional delete files, and a row is deleted if any row expression in these delete files is true. E.g. a split may come with 3 delete files:

Equality delete file 1

equality_ids=[1, 3]
 1: id | 2: category | 3: name
-------|-------------|---------
 3     | NULL        | Grizzly

Equality delete file 2

equality_ids=[3]
`1: id | 2: category | 3: name
-------|-------------|---------
 1     | NULL        | Polar 

Positional delete file 1

    100
    101

means

    - a row is deleted iff (id = 3 AND name = 'Grizzly') OR (name = 'Polar') OR (row_in_file = 100)  OR (row_in_file = 101)  
    - a row is selected iff (id <> 3 OR name <> 'Grizzly') AND (name <> 'Polar')  AND(row_in_file <>100)  AND (row_in_file <> 100) 
  1. A split can contain many equality and positional delete files.

Design considerations

Build Hash Tables or Filters/FilterFunctions?

The equality delete files can be interpreted as logical expressions and become the remaining filters that can be evaluated after all rows in a batch is read out into the result vectors. Or alternatively, they can be used to construct a number of hash tables that will be probed against after all rows are read into the result vectors. Suppose the equality delete file contains the following information:

equality_ids=[2, 3]
 1: id | 2: category | 3: name
-------|-------------|---------
 1      |   Bear       | Grizzly
 3      |   Bear      | Brown

It means

- a row is deleted iff (category = 'Bear' AND name = 'Grizzly') OR (category = 'Bear' AND name = 'Polar')
- a row is selected iff (category <> 'Bear' OR name <> 'Grizzly') AND (category <> 'Bear' OR name <> 'Polar')  

To build the hash tables, we will need to concatenate the hash values of column 2 and 3 together, and the hash table will contain two hash values for 'Bear##Grizzly' and 'Bear##Brown'. Then in the matching phase, the hash values of column 2 and 3 for all the rows in the output RowVectors would be calculated and concatenated before probing the hash table. If it's not a match, it means this row was definitely not deleted; if it is a match, then the row needs to be compared with the original delete values to confirm if it's really deleted. Only when all the values are the same the row shall be confirmed to have been removed. Note that the final comparison is necessary, because there is still a very small possibility that hash probe collision could happen, especially when there are many columns involved.

Note that creating hash tables on single columns is not correct without additional processing. For example, suppose the base file is as follows:

1: id | 2: category | 3: name
-------|-------------|---------
 1       |   Bear       | Grizzly
 2      |   Bear       | Brown
 3      |   Bear       | Polar
 4      |   Dog        | Brown

If we build one hash table on the second column "category" that contains {'Bear'}, and another hash table on name that contains {'Grizzly', 'Brown'}, then probing the category hash table to exclude rows with category = 'Bear' would incorrectly remove row 3, probing the name hash table to exclude 'Grizzly' and 'Brown' would incorrectly remove row 4. Taking logical AND or OR on the two probe results is also incorrect.

Now let's take one step back, and build the hashtables on single values. So we have hashtable A on "category" with one value 'Bear', and another hash table B on "name" with value "Grizzly" and another hash table C on "name" with value "Brown", then a row would pass if (category <> 'Bear' OR name <> 'Grizzly') AND (category <> 'Bear' OR name <> 'Polar') by probing hash table A twice, and hash table B and C once, then compute the logical ORs and ANDs. However, this is no difference than just comparing the values and no hash tables are actually needed.

The other way is to compile these delete values into logical expressions that can be executed as the remaining filter functions, or even domain filters that can be pushed down to the base file reader. This can be more efficient than taking the hash table approach. Firstly, filter pushdown can eliminate a lot of decoding costs; Secondly, computing and concatenating the hash values for all rows are very expensive. In fact, it is much slower than just performing simple comparisons on single column values. The latter could be efficiently done by SIMD operations, while the hash value computation cannot be efficiently implemented using SIMD. And lastly, the values need to be compared anyways even for the hash table approach.

We should also notice that if we convert them into logical expressions as remaining filters, the existing Velox expression evaluation implementation can automatically choose the best evaulation strategy, e.g. whether to build hash tables, or do efficient SIMD logical comparisons when it sees fit. This is much more flexible than building fixed hash tables in the connector DataSources. In many cases, the ExpressionEvaluator can choose more efficient way to evaluate the equivalent expressions. Today, it's already very easy to construct the logical expressions, and for the equality delete to work, there is no additional code needed beyond the expression constructions what's so ever. The existing Velox data source and readers can already handle them, so the implementation would be fairly simple. Plus, we can additionally improve existing filter function / expression evaluation implementations that can potentially benefit other components of the driver pipeline in the future. So we propose to choose the remaining filter path and just convert the equality delete files into filters and filter functions.

Where and How to open the equality delete files

Query engines like Presto or Spark usually have some central coordinators, where the distributed plan and splits are created. The splits would then be sent to the workers and executed there using the Velox stack. A query may issue many splits for each table, and each of them may include many (may be up to hundreds) delete files. We have the choice to open the delete files in the coordinator, and/or in the workers. There are some basic design considerations here:

Native workers running Velox need to have the ability to read both equality delete and positional delete files.

Based on these considerations, I think we need to implement reading equality deletes in Velox. However it doesn't mean we cannot open some of the equality delete files on the coordinator for optimization purpose. But that optimization should not be mandatory for the engines built on top of Velox.

Performance Considerations

We want to push down the filters as much, and as deep as possible.

By pushing down the filters to the readers or even decoder's level, we can efficiently avoid the costs of decoding skipped rows, or even save some decompression costs. This savings could be huge if the selectivity rate is very small. We shall notice that some of the equality delete files and all positional delete files could be converted to TupleDomain filters or initial row numbers that can be pushed to the readers. In order to achieve this, we will need to extract the parts that can be pushed down, and guarantee the rest parts are evaluated or tested correctly.

We want to avoid opening the delete files as much as possible

A split may include hundreds of delete files, and a worker could receive many splits with the same set of delete files. Ideally, each delete file should be opened only once on one worker. This is because 1) opening files is expensive 2) expression compilation, or building hashtables that can be later probed are also not cheap. There're a couple of ways to achieve this

We want to reduce the amount of expression evaluation as much as possible.

We have shown that the equality delete files can be interpreted as some conjunctive logical expressions. However, logical expression evaluations are also expensive when the expression contains many terms. We notice that Velox can already extract common sub expressions and flatten adjacent logical expressions, but the more general logical expression simplifications is still not implemented. Nonetheless, there are some ways to simplify the expressions for some simple cases for Iceberg. We will discuss them later.

Design

EqualityDeleteFileReader

We will be introducing the EqualityDeleteFileReader class, and each reader is responsible for opening one equality delete file. The content will be read in batches, and for each batch, the logical expressions will be built and merged with existing remainingFilter in the HiveDataSource.

The equality delete file schema is not fixed and can only be known at query run time. The equality Ids and the base file schema are used together to get the output row type and build the ScanSpec for the equality delete file. The equality Ids are the same as the id in Velox TypeWithId, and therefore we can directly use dwio::common::typeutils::buildSelectedType() to get the delete file schema. Note that this Id is not necessarily from a primitive type column, but could also be a sub-field from a complex type column. For example, deleting from an ARRAY[INTEGER] column c where c[i]=5 can also be expressed as an equality delete file. The field Ids for this column is

0: root
1: ARRAY
2: INTEGER

Therefore the equality id for this predicate is 2, and the content of the equality delete file is value 5.

Once we read the delete values, we can build the ExprSet and add it to the existing remainingFilterExprSet_ in HiveDataSource. Then the expresionEvaluator_ in HiveDataSource will evaluate them after all relevant vectors are loaded. There are two ways to add the newly created ExprSet to the existing remainingFilterExprSet:

  1. By conjuncting with the Expr in remainingFilterExprSet_
  2. By adding the Expr to the array in remainingFilterExprSet_

Note that the current HiveDataSource assumes remainingFilterExprSet_ has only one Expr, and the owned SimpleExpressionEvaluator only evaluates the first Expr in an ExprSet. There're a couple of facts that we discover:

  1. SimpleExpressionEvaluator is only used in TableScan and HiveConnector
  2. The remainingFilterExprSet_ would always be a special kind of ExprSet that it only contains logical expressions.

While I think SimpleExpressionEvaluator should indeed evaluate all Exprs in the passed in ExprSet, I think we can alternative create a new LogicalExpressionEvaluator, in which we can have special logical expression evaluation improvements in the future. Then it seems that adding the new Expr to remainingFilterExprSet_ as an array element is the most clean and simple way.

Extraction of domain filters

When the equality delete file only has one field, we can extract it as a domain filter. Such filter can be pushed down to the readers and decoders, where performance savings could happen. In this case we will create a NOT IN filter for it. This is done in connector::hive::iceberg::FilterUtil::createNotInFilter(), which in turn would call into the utility functions in common:Filter.h/cpp. The values will be de-duplicated and nulls will be treated separately. Velox can optimize it into different kinds of filter, e.g. a range filter when there is only one value.

Note that we need to verify the field is not a sub-field, since Velox currently doesn't support pushing down filters to sub-fields. This restriction will be removed once Velox supports sub-field filter pushdowns.

An equality delete file with multiple fields cannot be pushed down as domain filters at this moment, no matter if there's a single row or multiple rows. E.g. this delete file can be interpreted as id <> 3 || name <> 'Grizzly'. Currently Velox does not support pushing down disjunctives but we may do it in the future.

equality_ids=[1, 3]
1: id | 2: category | 3: name
-------|-------------|---------
 3     | NULL        | Grizzly

Domain Filter Merge

A split may come with multiple equality delete files. Some of them may have the same schema. If they all have the same single field, the extracted domain filters will be deduped and merged with the existing one. E.g. Equality delete file 1

equality_ids=[2]
2: category 
---------------
    mouse

Equality delete file 2

equality_ids=[2]
2: category 
---------------
    bear
   mouse

The domain filter built from these 2 files will be category NOT IN {'bear', 'mouse'} This is using the mergeWith api in the Filter class.

Remaining Filter Function Merge

If the equality delete files have the same schema but not the single field, For example Equality delete file 1

equality_ids=[1, 3]
1: id | 2: category | 3: name
-------|-------------|---------
 3     | NULL        | Winnie

Equality delete file 2

equality_ids=[1, 3]
1: id | 2: category | 3: name
-------|-------------|---------
 4     | NULL        | Micky
3     | NULL        | Winnie

This will create 2 Expr in the final ExprSet:

 - (`id <> 3 || name <> 'Winnie')
 - (`id <> 3 || name <> 'Winnie') && (`id <> 4 || name <> 'Micky')

Today Velox supports common sub-expressions recognition in the ExpressionEvaluator, and such expression would be evaluated only once. In this example (`id <> 3 || name <> 'Winnie') evaluation result would be cached internally and does not need to be evaluated twice.

Logical Expression Simplification

As far as I understand, Velox can do logical expression flattening, but still can't automatically simplify the logical expression. For example, the expression a AND (b AND (c AND d)) would be flattened as AND(a,b,c,d), but a AND (a OR b) cannot be automatically simplified to a, therefore to evaluate a AND (a OR b), a and b will both be evaluated, and one AND and one OR operation need to be performed. While we hope to improve logical expression simplification in the future, we can still do some simple improvements for Iceberg now.

An Iceberg split can come with multiple equality delete files and their schemas could have overlaps. For example Equality delete file 1

equality_ids=[1, 2, 3]
1: id | 2: category | 3: name
-------|-------------|---------
 1      |   mouse   | Micky
 2      |   mouse   | Minnie
 3      |     bear     | Winnie
 4      |     bear     | Betty

Equality delete file 2

equality_ids=[2]
2: category 
---------------
   mouse

Equality delete file 3

equality_ids=[2, 3]
2: category  | 3: name
----------------|-------------
   bear           | Winnie

We see that equality delete file 2 is on the category column and would remove all tuples with value mouse. This means that the first two rows in equality delete file 1 are already contained and doesn’t need to be read or compiled. Similarly, the single row in file 3 contains row 3 in file 1, therefore row 3 in file 1 doesn’t need to be read or compiled. The simplified delete files are like the follows:

equality_ids=[1, 2, 3]
1: id | 2: category | 3: name
-------|-------------|---------
 4      |     bear     | Betty

and

equality_ids=[2]
2: category 
---------------
   mouse

and

equality_ids=[2, 3]
2: category  | 3: name
----------------|-------------
   bear           | Winnie

With this simplification, the resulted expression would be simpler and the evaluation cost will be reduced.

When the delete file only has one field, the domain filter built from it can be used as a filter when reading other equality delete files whose fields include this one. In the above example, category <> 'mouse' can be pushed to file 1, whose row 1 and 2 would be filtered out. This not only helps final expression evaluation, but also improve the read performance for reading file 1.

If the delete file has more than 1 field, the situation is more complex. In the above example, file 3 would be compiled to category <> 'bear' OR name <> 'Winnie, but it cannot be pushed to file 1 nor the base file directly because it's a disjunctive expression. So far Velox only supports domain filters in conjunctive expressions. So for now we will only use single field equality delete files to do the simplifications. For this, we will go over the equality ids from all equality delete files and pick all single field ones to read first. Then the filters will be pushed to the other equality file readers.

In the future, we can even implement disjunctive expression push downs. For example category <> 'bear' OR name <> 'Winnie can be pushed to the SelectiveColumnReaders, with the category and name columns as a ColumnGroup. This will save the cost of having to read all values out before applying the filter function as a remaining filter, and the selectivity vector can be reused among them. Moreover, the reduction of rows from applying this filter directly on this ColumnGroup would benefit the reading of other columns later.

Expression Caching

We know that a unique HiveDataSource object is created for a unique TableScan operator, and the splits received by a HiveDataSource instance belong to the same query and same table. Additionally for Iceberg splits, they must be reading the same snapshot of an Iceberg table. When the HiveDataSource receives a new Iceberg split with some equality delete files, it would create a new IcebergSplitReader, which would open the delete files. If the equality delete file can be interpreted into some domain filters or filter functions, the scanSpec_ and remainingFilterExprSet_ in HIveDataSource may need to be updated.

Currently, the Iceberg library selects the qualified data and delete files based on partitions and snapshot Ids or transaction sequence numbers. For a single transaction, the snapshot is fixed, and all delete files from the same partition would go with the base data files when the splits are enumerated. So we can assume for now that all splits received from the same partition are the same for a single HiveDataSource. However, the delete files for different partitions could be different, and the splits from multiple partitions could arrive out of order. If we updated the scanSpec_ and remainingFilterExprSet_ for previous partition, we will need to restore them back to the original before applying the current set of delete files. As the first implementation, we will make a copy of these objects in the IcebergSplitReader and restore them back when the IcebergSplitReader is destructed.

In some user's workloads, the deletions are quite frequent, and the number of delete files coming with a split for a subsequent SELECT query can be many. For all splits in a partition, the delete files may be the same. We don't want to repeatedly read such equality delete files for every split a HiveDataSource needs to handle. One way of overcoming this is to build an expression cache. There are 2 levels of the caching ideas:

  1. A hash table in HiveDataSource
  2. A process wide cache for all Iceberg scans.

In 1, the key of the hash table is <partition, snapshotId> and the values are the compiled filters and expressions. In 2, the key of the cache is <table, partition, snapshotId> and the values are the compiled filters and expressions. To avoid excessive contentions, we can divide the cache into multiple levels. The implementation will be adjusted with more experiments and observations of the customer workloads in the future.

If the Iceberg library changes its TableScan or FileScan in the future and can additionally prune the delete files based on each individual base data files, we will need to change the cache keys and add the information for the base file.

We will work on caching in the future when we understands the workloads better.

LogicalExpressionEvaluator Improvements

Currently the remaining filter is evaluated in HiveDataSource::evaluateRemainingFilter()

vector_size_t HiveDataSource::evaluateRemainingFilter(RowVectorPtr& rowVector) {
  … 
  expressionEvaluator_->evaluate(
      remainingFilterExprSet_.get(), filterRows_, *rowVector, filterResult_);
  auto res = exec::processFilterResults(
      filterResult_, filterRows_, filterEvalCtx_, pool_);
  return res;
}

This code evaluates the remainingFilterExprSet_ as a general expression instead of a special logical expression, and would put the result of the Expr's in remainingFilterExprSet_ in a FlatVector as bool type, then processFilterResults() would perform logical AND/OR on these vectors. This incurs additional memory copies. Moreover, it contains special handling of nulls, while the logical expressions would NOT produce NULLs at all, so this part of cost can be saved as well. Our newly introduced LogicalExpressionEvaluator will have its own evaluate() implementation that is more performant for logical expressions.

Testing

Prestissimo End To End Tests

In addition to unit tests we will have

Microbenchmarks

We will build microbenchmarks in Velox and Presto and cover both delete files.

yingsu00 commented 5 months ago

cc @tdcmeehan @nmahadevuni

Yuhta commented 5 months ago
  1. Agree that this must be done in worker/Velox not in coordinator
  2. We need to support multi-column ID pushdown, so build a hash table with multi-key support is needed. The remaining filter method only works for single column key. I can work on adding a common interface for multi-key hash table in mutation pushdown.
tdcmeehan commented 5 months ago

Quick note on equality delete files in intention and practice:

The underlying motivation for having a separate equality delete file type, in addition to position delete, is that they are used when the producer of the delete cannot do a table scan, or it's cost-prohibitive. There is a tradeoff associated with using equality deletes: in exchange for not having to do a table scan to find the position of the delete, you must do extra work during merge on read.

This is why batch and interactive systems such as Spark, Trino and Presto generate only positional delete files, whereas streaming systems like Flink and Iceberg tools may generate equality delete files, especially for change data capture (CDC) use cases. This means that in practice, one does not typically encounter equality delete files except when one is using Iceberg for CDC, and this is still considered an emergent feature of Iceberg.

Yuhta commented 5 months ago

@tdcmeehan Thanks for the explanation, so from the point of view of execution engine that can be used in both batch and streaming cases, equality deletion is used more frequently in row oriented formats with keys, e.g. Avro? And Avro only has single column key, so the priority is not high to support multi column key deletion.

However I think as a general execution engine, we need to get it right (at least at design level) in the first place, so that if someone comes with a row-oriented format with multi-column keys, we can cope with it without major change. In this sense I still think investment in proper support of multi-column keys is worth doing.

Also another point is in big company sometimes the producer of the data is not aware of the consumers. It's not uncommon that streaming data is stored in data warehouse and later read by a different tool. So on read side we need to be as flexible as possible.

tdcmeehan commented 5 months ago

@Yuhta I don't think it's necessarily correlated to the underlying file format, it's really whether or not the system generating the deletes wants or is capable of performing a table scan. This is because positional deletes require the position, which requires a table scan to determine the row that is to be deleted. I think this applies equally to any file format.

To be clear, I believe it is beneficial and useful for Velox to support reading equality deletes as it is consistent with Velox's mission to be a pervasive execution layer in query engines. Given Iceberg's popularity and momentum, it's important for Velox's Iceberg integration to have full support for both equality and position delete files. I am simply adding some context on when one typically sees equality delete files in real world settings.

yingsu00 commented 5 months ago

We need to support multi-column ID pushdown, so build a hash table with multi-key support is needed. The remaining filter method only works for single column key. I can work on adding a common interface for multi-key hash table in mutation pushdown.

@Yuhta The remaining filter approach already supports multi-column expressions, e.g. a <> 1 OR b <> 1. My draft PR https://github.com/facebookincubator/velox/pull/8728 already works for that case and there is a test for multiple column delete values.

And as I explained in "Build Hash Tables or Filters/FilterFunctions?" section, I believe the remaining filter approach is more advantageous than building additional mutation hash tables in all dimensions including performance, implementation easiness, and code cleanliness. I don't think you need to add any new interface for multi-key hash table if you just go with the remaining filter way.

Also, correct me if I understood this wrong: I think the "Disjunctive(OR) predicate pushdown" I mentioned is different than your "multi-column ID pushdown". The essential point is to push the OR predicates to the ColumnReaders and decoders as domain filters, so the benefit of filter pushdown can be honored. E.g. In a predicate like (a = 1 OR b = 1) AND (c = 1), today we can only push down c = 1, but my idea is that we can also push (a = 1 OR b = 1) down to the ColumnReader level in the future. Of course this needs fundamental changes in the current reader implementations. It was an idea emerged while I was working on Iceberg, and I don't think any other engines have it. I want to try it in the future but not the near future. Whereas the "multi-column ID pushdown" you mentioned seems to be AFTER all data is read since you mentioned you wanted to build hash tables in mutation object, thus won't have the benefit of filter pushdown.

yingsu00 commented 5 months ago

Thanks @tdcmeehan for the background introduction. While not being able to do a scan WAS the reason why some engines produce equality delete files, the performance benefit will be another major reason why users want to use equality deletes in the future. I believe equality deletes WILL out-perform the positional deletes after we implement it in Prestissimo, because 1) we will save a big scan and semi join in the delete query. 2) we can push down some of the equality delete as domain filters to Velox Column readers, plus some other optimizations that are specific to equality delete files only. Given the fact that many engines scan is not as efficient, making the "merge" happen in Velox/Presissimo scan will have better performance. Also the dynamic filter support is limited nowadays, but for equality delete, we can definitely pushdown domain filters while not worrying about the data size etc. So we will use equality delete in TPCDS publication and implement equality delete in Prestissimo in the next step.

Yuhta commented 5 months ago

@yingsu00 For single column key we can push it down to decoders, but I don't think you can do the same for multi-column keys due to the correlation between columns. Putting huge list of ORs in remaining filter would just destroy the performance. So

  1. For single-column key, we merge a filter to the corresponding column, which will be pushdown to decoder level
  2. For multi-column key, we need to build the hash table and filter after we read all the key columns (or can you show me how do you push down OR filter on multiple columns?)
yingsu00 commented 4 months ago

@yingsu00 For single column key we can push it down to decoders, but I don't think you can do the same for multi-column keys due to the correlation between columns. Putting huge list of ORs in remaining filter would just destroy the performance. So

  1. For single-column key, we merge a filter to the corresponding column, which will be pushdown to decoder level
  2. For multi-column key, we need to build the hash table and filter after we read all the key columns (or can you show me how do you push down OR filter on multiple columns?)

@Yuhta Thanks for your questions. This is a very preliminary idea now. It's essentially to pushdown the expression evaluation into ColumnReaders for some special expressions like logical expressions with non-overlapping columns. e.g. for expression (b=1 OR c=1) AND a=1, right now we will have a domain filter a=1which will be pushed down to the ColumnReader, and all rows passing a=1 would be decoded and extracted into a b vector and a c vector, then either build a hash table directly or utilize existing expression evaluation framework to evaluate (b=1 OR c=1). Note that this part is done relative less efficiently now, since the expression evaluation is aimed for all general cases, and calculating hash values for multiple columns is expensive. Also all rows, even those don't satisfy (b=1 OR c=1), would have to be decompressed, decoded and copied out in order to evaluate (b=1 OR c=1).

Now, if we can create a ColumnReader to read a first(as is done today), and a GroupColumnReader that contains ColumnReader for b and c, then after a was read, the GroupColumnReader's read() function would be

  1. The inner ColumnReader for b would produce a bitmap for b=1, and the inner ColumnReader for c would produce a bitmap for c=1, note that it doesn't extract or copy the data out at this moment. If we push down the filter to the encoded data, we don't even need to decode the data now.
  2. The GroupColumnReader can directly OR the two bitmaps and produce the rowset passing (b=1 OR c=1). Unlike the hash table or general expression evaluation, this doesn't need to allocate new memory and can be done really fast.
  3. If b or c are not required to be extracted, we are done. Otherwise extract the data with the ORed bitmap. This may need to decode more data than step 1, but now the data is all in memory and extracting them now may still be a lot faster than reading them all and filter later.

This has multiple benefits:

So I think it would benefit the performance generally, especially after we push the filters to ENCODED data. Even if we don't push the filters to ENCODED data, it may still be faster in a lot of cases.

You said " Putting huge list of ORs in remaining filter would just destroy the performance", I think you meant the case that the number of values in the expression is much smaller than the number of row to be read, such that the bitmap may be larger than the hash table. But remember, you have to extract and copy all rows first, which is mostly larger than the bitmap itself. For each batch, we read at most 10,000 rows, and the bitmap for it would just be around 1KB for each column. Actually, the RowSet itself for each ColumnReader nowadays may be larger than that. Even your hash table may be larger than several KBs itself. And most importantly, calculating hash values on all relevant columns for all rows may be much more expensive.

But I agree that in some cases building hash table or evaluating the remaining filter afterwards may be faster, e.g. when the equality delete file contains many many columns. So I think the execution strategy should be self-adapted at run time. And the criteria and policy shall be determined on extensive performance testing.

Anyways, this idea is preliminary and needs a lot of refinement and generalization. I may try prototyping something next year, but not in this Iceberg work. But your feedback and discussion are very welcome!

Yuhta commented 4 months ago

The inner ColumnReader for b would produce a bitmap for b=1, and the inner ColumnReader for c would produce a bitmap for c=1, note that it doesn't extract or copy the data out at this moment. If we push down the filter to the encoded data, we don't even need to decode the data now.

At this point you already go through all the key data and pay almost the full price of reading them (decompressing and decoding, the only saving is you do not copy the data out but using 1 bit, not a lot difference really, unless you keep these bitmaps also encoded, which will be further complex and spending more time when we do OR later). The saving from selective reading is mainly by skipping, in this case you cannot skip reading c according to the values you read in b, so it will not be much different from using a hash table, and the framework would be much more complex. The saving we are aiming for is mainly on payload data, so pushing down to key column readers does not seem worth doing it except for single column.

Also I don't see we can use this to speed up mutability. The method does not work well with tree of logic expression. How do you push down (b = 1 AND c = 1) OR (b = 2 AND c = 2)?

For the first implementation I would suggest we do the single column case only, since that is the thing everyone is agreed on, and covers most of the real world use cases.

yingsu00 commented 4 months ago

At this point you already go through all the key data and pay almost the full price of reading them (decompressing and decoding,

Not quite. The GroupColumnReader does need to decompress the whole ColumnChunk, but it does NOT necessarily need to decode all data, if we can evaluate the filter on ENCODED data.

you cannot skip reading c according to the values you read in b, so it will not be much different from using a hash table

I think there will be a difference, but I don't have time to try it now so we can forget it. Whether to try it or not also depends on the portion of the remaining filter or hash table mutation cost in the whole query. If it's already fast then no need to do it. We will send a PR for IcebergReadBenchmark and it will cover the equality delete case. Then we will have more insights on where the time is spent.

Also I don't see we can use this to speed up mutability. The method does not work well with tree of logic expression. How do you push down (b = 1 OR c = 1) AND (b = 2 OR c = 2)?

The naive way is to have two GroupColumnReaders, one for (b = 1 OR c = 1) and the other for (b = 2 OR c = 2). Thus we'll have to read b and c twice, but we can skip some rows for (b = 2 OR c = 2), and also use filter on ENCODED data to avoid decoding all data.

Then the improved version is to

  1. Get all distinct single columns and the predicates on them. In this example, b=1, b=2, c=1, c=2. Note that we don't need to care how they are joined together
  2. While reading a column, apply all relevant filters on the encoded data. In this example, we can apply b=1 and b=2 on b. The comparisons can be done with b loaded in some registers, so we don't need to load b twice. Then each of them will produce a SelectivityVector.
  3. Then perform the logical computations to get the final SelectivityVector, and extract the values if necessary.

In this approach, each row uses 4 comparisons and 3 logical ops.

This approach can also be applied to the LogicalExpressionEvaluator. The current general ExprSet evaluation can recogonize common expressions. If the expression is (b = 1 OR c = 1) AND (b = 1 OR c = 2), then b=1 is common and will only be executed once. But for (b = 1 OR c = 1) AND (b = 2 OR c = 2) there is no common expression, and both b and c would be read twice.

I agree this approach has big limitations, it only works for logical expressions, and doesn't work for complex functions and those involve multi-columns like x+y > 1. But it for Iceberg it's good.

Now let's consider the hash table approach for (b = 1 OR c = 1) AND (b = 2 OR c = 2), for this we'll need to build 4 hash tables b = 1, c =1, b = 2, c = 2, and the probe results will need to be ANDed and ORed to get the final result. To build the hash tables, you'll need to apply hash function on these 4 values(actually 2 distinct, 1 and 2, but you don't know if they're distinct). And to verify if a row satisfies, you will need to hash 2 values and use 4 probes. You can say if b=1 then don't need to probe b=2, but this if check is a perf killer. Alternatively we can convert it to disjunctive of conjunctives: (b=1 AND c=2) OR(c=1 AND b=2), then you need one hash table on b##c that contains two values 1##2 and 2##1. This looks better, and building the hash table requires 4 hash and some bit shift, and probing requires 2 hash and some bit shift for every row. If it's a match, you will need to compare again if the values really match. Let's be generous and don't cost the hash table build. Then for each row, it needs 2 hashes, 1 probe, and 2 comparisons. It may look faster than 4 comparisons and 3 logical ops, but hash is many times slower than comparison, and probe is also costly. While using the above approach, or the improved expression evaluation, the 4 comparisons and 3 logical ops can be done in a very simple loop and all in simple arrays. So for this simple case the hash table will be slower.

But when there are many many values, e.g. (b = 1 OR c = 1) AND (b = 2 OR c = 2) AND (b = 3 OR c = 3) ... AND (b = 1,000,000 OR c = 2,000,000), I would expect the hash table approach be faster.

For the first implementation I would suggest we do the single column case only, since that is the thing everyone is agreed on, and covers most of the real world use cases.

Yes this is exactly what was done in the PR https://github.com/facebookincubator/velox/pull/8728. It only pushes single column filters and the rest are evaluated as remaining filters. All rest optimizations are not included in this PR. Your review is much appreciated. I'll ping you when the tests pass.

Actually this particular optimization(push down disjunctives) is the last thing I may want to try, since it requires lots of code change and thus bigger risk. The other mentioned optimizations, e.g. logical expression simplifications and caching, will be tried first, if necessary. So we agree on this.

Yuhta commented 4 months ago

if we can evaluate the filter on ENCODED data.

That's the only way to achieve it. However I am not sure if the gain worth it, it might be still slower than just copying the key columns out and probing a hash table.

(b = 1 OR c = 1) AND (b = 1 OR c = 2)

Sorry I made a mistake, the relevant expression should be (b = 1 AND c = 1) OR (b = 2 AND c = 2). In this case we can use a single hash table supporting 2 column keys, but with the remaining filter approach we can only get an half-working filter on each column, and the filtering expression evaluation is still not avoidable.