delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
1.98k stars 365 forks source link

feat: improve merge performance by using predicate non-partition columns min/max for prefiltering #2513

Open JonasDev1 opened 1 month ago

JonasDev1 commented 1 month ago

Description

This pr improves the merging performance by adding min/max filters to the early filter. The number of files scanned from the target file table is reduced by using the table statistics. I have extended the early filter for this purpose. This filter is responsible for pre-filtering the target table. Previously, the early filter only consisted of partition columns by filtering for all unique values from the source. Now the non-partition columns are also used by aggregating the min/max values from the source and adding a between expression to the early filter.

It is also automatically part of the conflict detection based on the predicate.

I added a property extended_early_filter to make this advanced filtering optional. I don't know if this is important, and maybe we can replace the bool with an enum. What do you think about this?

Example:

Merge into table t with partition date

Predicate: source.date = target.date and source.timestamp = target.timestamp and source.id = target.id and frob > 42

Early filter before: date = '2024-‚05-14' and frob > 42 Early filter now: date = '2024-05-14' and timestamp BETWEEN '…15:00' AND '…15:05' and id BETWEEN 'A' AND 'B' and frob > 42

github-actions[bot] commented 1 month ago

ACTION NEEDED

delta-rs follows the Conventional Commits specification for release automation.

The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification.

JonasDev1 commented 1 month ago

2411

ion-elgreco commented 1 month ago

@JonasDev1 why did you make the advanced filtering optional?

If this provides better performance across the board, we should enable it always (so also for python bindings)

JonasDev1 commented 1 month ago

My concern was if you want to do e.g. merges via columns with null this would not work, but I think that it would not work without the advanced filtering either as is equal for null is not defined in sql.

Spark has an extra null safe operator <=> for this, which is not in Datafusion available yet.

JonasDev1 commented 1 month ago

What about the review?

I can of course also remove the flag again

ion-elgreco commented 4 weeks ago

My concern was if you want to do e.g. merges via columns with null this would not work, but I think that it would not work without the advanced filtering either as is equal for null is not defined in sql.

Spark has an extra null safe operator <=> for this, which is not in Datafusion available yet.

My main issue is that it might work or not work based on the contents of the data. I think that's a bit tricky because a person needs to be aware of the contents of their data they are trying to write.

<=> for this, which is not in Datafusion available yet. @JonasDev1 can you maybe raise an issue about this upstream in datafusion, so we get this first

thomasfrederikhoeck commented 2 weeks ago

@JonasDev1 do you know if the issue was raised in DataFusion? I would love to see this feature to really enhance large table performance in the cases where partition is not meaningful due to high cardinality so it is very nice that you have initiated it.

JonasDev1 commented 2 weeks ago

@ion-elgreco I have tested the merge with null values in merge predicate with extend filtering and without. In both cases the merge doesn't work as expected and will lead to duplicate rows. Therefore this pull request will create no changes and the behaviour is simillar to Spark.

The null-safe operation would be a nice extension, but is not bounded to this pull request