apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.18k stars 2.15k forks source link

Use Min, Max, and NumOfNulls from Manifest Files for Spark Column Stats #10791

Open huaxingao opened 1 month ago

huaxingao commented 1 month ago

Feature Request / Improvement

I am adding Spark Column Stats in this pull request. Currently, in this PR, I only populate the NDV values. I plan to follow up with another PR that will retrieve the min, max, and numOfNulls values from the manifest files and use these to populate the Spark Column Stats.

Query engine

Spark

jeesou commented 1 month ago

Hi @huaxingao, so the follow up PR will only have min, max, and numOfNulls ? will avgLen and maxLen will be included later?

And for min, max, and numOfNulls are we gonna use the manifest (.avro) files? Like could you please let us know what is the plan for the remaining statistics (min, max, numOfNulls, avgLen and maxLen).

huaxingao commented 1 month ago

@jeesou It seems the manifest file doesn't have avgLen and maxLen, so I will not be able to have these two for now. Yes, I am thinking of using min, max, and numOfNulls from the manifet files

jeesou commented 1 month ago

Hi @huaxingao, could you please help us with the timeline for the subsequent PR to add the min, max, and numOfNulls into the puffin file?

guykhazma commented 2 weeks ago

@huaxingao @karuppayya @jeesou @aokolnychyi @alexjo2144 @findepi @manishmalhotrawork Continuing the discussion from the mailing list about whether to collect the statistics during run time here since my mail doesn't appear in the mailing list for some reason.

I wanted to revisit the discussion about using partition stats for min/max and null counts. It seems we might need to compute the null count at query time in any case. This is because, during manifest scanning, some data files may be filtered out based on query predicates. This could lead to a situation where the number of rows is less than the number of nulls for a partition or table if these counts are collected statically. In such cases, Spark might incorrectly estimate zero rows if an isNotNull predicate is used.

However, min/max values can still be pre-computed at the partition level, as they remain valid as lower and upper bounds even with additional filtering.

Any thoughts? If collecting null counts (and possibly min/max values) on the fly seems reasonable, I can open a PR to implement it.

huaxingao commented 2 weeks ago

If data files are filtered out by the query predicate, the pushed-down min/max/null counts are no longer accurate. Spark takes filter estimation into consideration when calculating stats for CBO, but I am not sure how accurate it is. Computing these stats on the fly is expensive.

Huaxin

On Tue, Sep 3, 2024 at 5:41 AM Guy Khazma @.***> wrote:

@huaxingao https://github.com/huaxingao @karuppayya https://github.com/karuppayya @jeesou https://github.com/jeesou @aokolnychyi https://github.com/aokolnychyi @alexjo2144 https://github.com/alexjo2144 @findepi https://github.com/findepi @manishmalhotrawork https://github.com/manishmalhotrawork Continuing the discussion from the mailing list https://lists.apache.org/thread/6kyvp5xk5g46325ztvzxx3jn7q99cc1o about whether to collect the statistics during run time here since my mail doesn't appear in the mailing list for some reason.

I wanted to revisit the discussion about using partition stats for min/max and null counts. It seems we might need to compute the null count at query time in any case. This is because, during manifest scanning, some data files may be filtered out based on query predicates. This could lead to a situation where the number of rows is less than the number of nulls for a partition or table if these counts are collected statically. In such cases, Spark might incorrectly estimate zero rows if an isNotNull predicate is used.

However, min/max values can still be pre-computed at the partition level, as they remain valid as lower and upper bounds even with additional filtering.

Any thoughts? If collecting null counts (and possibly min/max values) on the fly seems reasonable, I can open a PR to implement it.

— Reply to this email directly, view it on GitHub https://github.com/apache/iceberg/issues/10791#issuecomment-2326424328, or unsubscribe https://github.com/notifications/unsubscribe-auth/ADHWNQQQ54A27FK64S3JY5TZUWU55AVCNFSM6AAAAABLRKE3ROVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDGMRWGQZDIMZSHA . You are receiving this because you were mentioned.Message ID: @.***>

guykhazma commented 2 weeks ago

@huaxingao min/max will not stay accurate but still provide valid lower and upper bounds. The issue I am seeing with null counts is that when spark gets for example a combination of conditions such as:

o_orderdate > 5 and isnotnull(o_ordedate)

it will try to calculate the selectivity of the combined predicate by multiplying the selectivity for each predicate. The evaluation for isnotnull doesn't take into account the predicates (see here) and since the row count is using the filtered size it might lead to a case where the null count is falsely larger than the row count which would result in 0% selectivity and effect the rest of the estimation.

As for it being expensive, maybe it is something worth benchmarking? else, we would need a solution of the null handling.

huaxingao commented 2 weeks ago

I think we could introduce a property that allows users to choose whether to calculate the statistics on the fly.

findepi commented 2 weeks ago

. It seems we might need to compute the null count at query time in any case. This is because, during manifest scanning, some data files may be filtered out based on query predicates. This could lead to a situation where the number of rows is less than the number of nulls for a partition or table if these counts are collected statically. In such cases, Spark might incorrectly estimate zero rows if an isNotNull predicate is used.

if we have coarse null count stats and some data files are filtered out, we can derive (scale) null count, by assuming fixed percentage. That's what a query engine will do when estimating null counts after a filter.

guykhazma commented 2 weeks ago

@findepi I agree that this is a valid option, but it relies on the assumption that nulls are uniformly distributed, which may not always be the case.

A potential compromise could be to introduce a property that lets users decide whether to calculate the statistics on the fly, as @huaxingao suggested. If the property is set to false, it would then fall back to scaling the null count.

This approach could also be split into two separate options: one for collecting the null count on the fly and another for collecting the min/max values on the fly.

What do you think? I can submit a PR for the on-the-fly collection, and we can address scaling the null count once there's consensus on whether to gather stats at the partition level or the table level.