apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.19k stars 2.38k forks source link

[SUPPORT] MOR hudi 0.14, Bloom Filters are not being used on query time #10511

Open bk-mz opened 5 months ago

bk-mz commented 5 months ago

Describe the problem you faced

We encountered an issue with MOR table that utilizes metadata bloom filters and Parquet bloom filters, and has enabled statistics. When attempting to query data, the system does not seem to utilize these bloom filters effectively. Instead, all requests result in a full partition scan, regardless of the applied filters.

To Reproduce

Steps to reproduce the behavior:

  1. Create a MOR table and write data using both Parquet bloom filters and metadata bloom filters.
  2. Attempt to query the data by applying a filter to one of the columns that participate in bloom filtering. Ensure that the filter narrows down the dataset size, making the bloom filters more likely to be effective.
  3. Observe that the Spark SQL User Interface (UI) displays a full partition scan.
  4. Compare the query latency time for the column with bloom filters (BF) to the latency time for the column without bloom filters (non-BF).

Expected behavior

The expected behavior is that querying the column with bloom filters (BF) should be significantly more efficient than querying the column without bloom filters (non-BF).

Environment Description

Additional context

Table write hudi params:

hoodie.bloom.index.filter.type=DYNAMIC_V0
hoodie.bloom.index.prune.by.ranges=false
hoodie.bloom.index.use.metadata=true
hoodie.clean.async=true
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.compact.inline.max.delta.commits=5
hoodie.datasource.hive_sync.database=db_name
hoodie.datasource.hive_sync.enable=true
hoodie.datasource.hive_sync.mode=hms
hoodie.datasource.hive_sync.partition_fields=year,month,day,hour
hoodie.datasource.hive_sync.table=table_name
hoodie.datasource.hive_sync.use_jdbc=false
hoodie.datasource.write.hive_style_partitioning=true
hoodie.datasource.write.partitionpath.field=year,month,day,hour
hoodie.datasource.write.path=s3://s3_path/table
hoodie.datasource.write.precombine.field=date_updated_epoch
hoodie.datasource.write.recordkey.field=id
hoodie.datasource.write.streaming.checkpoint.identifier=main_writer
hoodie.datasource.write.table.type=MERGE_ON_READ
hoodie.enable.data.skipping=true
hoodie.index.type=BLOOM
hoodie.metadata.enable=true
hoodie.metadata.index.async=true
hoodie.metadata.index.bloom.filter.column.list=id,account_id
hoodie.metadata.index.bloom.filter.enable=true
hoodie.metadata.index.column.stats.column.list=id,account_id
hoodie.metadata.index.column.stats.enable=true
hoodie.metricscompaction.log.blocks.on=true
hoodie.table.name=table_name
hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.write.lock.dynamodb.partition_key=table_name
hoodie.write.lock.dynamodb.region=us-east-1
hoodie.write.lock.dynamodb.table=hudi-lock
hoodie.write.lock.num_retries=30
hoodie.write.lock.provider=org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider
hoodie.write.lock.wait_time_ms=30000
hoodie.write.lock.wait_time_ms_between_retry=10000

Hadoop parquet properties:

parquet.avro.write-old-list-structure=false
parquet.bloom.filter.enabled#account_id=true
parquet.bloom.filter.enabled#id=true

If I download the file from s3 and then use parquet cli, it will show that BF on column is actually used:

parquet bloom-filter fe97585b-8a07-4a74-8445-16b898d1bb2b-0_191-4119-834504_20240116135428462.parquet -c account_id -v account_id1

Row group 0:
--------------------------------------------------------------------------------
value account_id1 NOT exists.

parquet bloom-filter fe97585b-8a07-4a74-8445-16b898d1bb2b-0_191-4119-834504_20240116135428462.parquet -c account_id -v account_id2

Row group 0:
--------------------------------------------------------------------------------
value account_id2 maybe exists.

Read part:

$ spark-sql \
    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
    --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \
    --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
    --jars=/usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/hudi/hudi-aws-bundle.jar \
    --conf spark.executor.cores=8 \
    --conf spark.executor.memory=27G \
    --conf spark.driver.cores=8 \
    --conf spark.driver.memory=27G```

spark-sql (default)> select count(1) as cnt from table_with_bfs where year = 2024 and month = 1 and day = 5 and account_id = 'id1';
82
Time taken: 34.962 seconds, Fetched 1 row(s)
spark-sql (default)> select count(1) as cnt from table_no_bfs where year = 2024 and month = 1 and day = 5 and account_id = 'id2';
82
Time taken: 26.463 seconds, Fetched 1 row(s)

In this particular case table_no_bfs does not have any bloom filters for this day and for some reason takes more time that table w/o BFs.

Number of rows in the table for this partition:

spark-sql (default)> select count(1) as cnt from table_with_bfs where year = 2024 and month = 1 and day = 5;
406188272
Time taken: 22.944 seconds, Fetched 1 row(s)

Spark SQL UI for BF table: image

Amount of parquet files in the partition:

aws s3 ls --recursive s3://s3_path_bf/table/year=2024/month=1/day=5/ | grep parquet | wc -l
     508

Spark SQL UI for Non-BF table: image

Amount of parquet files:

aws s3 ls --recursive s3://s3_path_non_bf/table/year=2024/month=1/day=5/ | grep parquet | wc -l
     526
KnightChess commented 5 months ago

@bk-mz yes, mor not support parquet native bloom filter, because log file will merge on read, so native bloom filter is not the latest, is not accurate, only cow or mor read_optimized can use it.

And in version 0.14.0, bloom filter in hudi only be used in write to tag record.

bk-mz commented 5 months ago

mor read_optimized can use it.

can i set spark-sql to use read_optimized to test it out?

KnightChess commented 5 months ago

@bk-mz yes, set hoodie.datasource.query.type = read_optimized

bk-mz commented 5 months ago

Okay so let's compare. For clean experiment, I created 2 separate sessions for queries below.

scala> spark.time({
     |   val df = spark.read
     |     .format("org.apache.hudi")
     |     .option("hoodie.datasource.query.type", "read_optimized")
     |     .load("s3://path/table/")
     |
     |   val count = df.filter(
     |     (df("year") === 2024) &&
     |     (df("month") === 1) &&
     |     (df("day") === 16) &&
     |     (df("account_id") === "id1")
     |   ).count()
     |
     |   println(s"Count: $count")
     | })
Count: 47
Time taken: 30477 ms
scala> spark.time({
     |   val df = spark.read
     |     .format("org.apache.hudi")
     |     .option("hoodie.datasource.query.type", "snapshot")
     |     .load("s3://path/table/")
     |
     |   val count = df.filter(
     |     (df("year") === 2024) &&
     |     (df("month") === 1) &&
     |     (df("day") === 16) &&
     |     (df("account_id") === "id1")
     |   ).count()
     |
     |   println(s"Count: $count")
     | })
24/01/18 10:06:51 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
Count: 47
Time taken: 22594 ms

It's just super confusing as it contradicts the logic. So read_optimized actually takes more time to load same data as it's done with snapshot.

Can we say for sure that use of bloom filters on parquet native filters is bluntly not effective for hudi?

KnightChess commented 5 months ago

@bk-mz the cache of the operating system may also have an impact, can you provide detailed metrics for spark ui?

bk-mz commented 5 months ago

Sure, but anything specific you want to see?

KnightChess commented 5 months ago

@bk-mz you can see scan rdd the number of output rows in spark sql tag ui.

bk-mz commented 5 months ago

for snapshot: 441,483,112, query time 28141ms for read-optimized: 22,887,045, query time 26054ms.

read-optimized snapshot

scala> spark.time({
     |   val df = spark.read
     |     .format("org.apache.hudi")
     |     .option("hoodie.datasource.query.type", "read_optimized")
     |     .load("s3://table/")
     |
     |   val count = df.filter(
     |     (df("year") === 2024) &&
     |     (df("month") === 1) &&
     |     (df("day") === 16) &&
     |     (df("account_id") === "id1")
     |   ).count()
     |
     |   println(s"Count: $count")
     | })
24/01/22 09:05:38 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
Count: 47
Time taken: 26054 ms
scala> spark.time({
     |   val df = spark.read
     |     .format("org.apache.hudi")
     |     .option("hoodie.datasource.query.type", "snapshot")
     |     .load("s3://table/")
     |
     |   val count = df.filter(
     |     (df("year") === 2024) &&
     |     (df("month") === 1) &&
     |     (df("day") === 16) &&
     |     (df("account_id") === "id1")
     |   ).count()
     |
     |   println(s"Count: $count")
     | })
24/01/22 09:09:03 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
Count: 47
Time taken: 28141 ms

Okay, your point stands, the number of output rows are indeed different.

Though, how can we explain same query times?

KnightChess commented 5 months ago

@bk-mz can you see the cost time in this point?

KnightChess commented 5 months ago

image

KnightChess commented 5 months ago

we can only analyse the scan rdd. A query contains time consumption in various aspects. the result I think is normal.

bk-mz commented 5 months ago

WholeStageCodegen (1) duration: total (min, med, max )13.4 m (79 ms, 1.5 s, 3.4 s ) for snapshot. WholeStageCodegen (1) duration: total (min, med, max )6.5 m (249 ms, 552 ms, 5.9 s ) for read-optimized

bk-mz commented 5 months ago

@KnightChess Did I understand you correctly, you are claiming that bloom filters actually work correctly?

KnightChess commented 5 months ago

@bk-mz yes, according to the indicators, it is work

bk-mz commented 5 months ago

how can we clarify that the difference is not cause by read-optimized and snapshot paths excluding any bloom filters on indexes?

bk-mz commented 5 months ago

I.e. it's caused by a RO reader just reading different files?

KnightChess commented 5 months ago

There will be a variety of factor leading to the difference time in the query, like IO、cpu、dick load... in spark, like parallelism, the expand time of executor..., in hudi, snapshot reading should be slow than read-optimized theoretically, and they use diff reader to read diff file( ro base or rt base+log file). And there is another problem, does parquet file with bloom filter will faster than without bloom filter in reading? I don't think it is certain, you need to look at its actual production effect. In spark query, the difference between 2S cannot explain the slow problem. What do you think about, this is my shallow cognition, maybe others have better opinion.

bk-mz commented 5 months ago

What do you think about,

TBH a bit of mixed emotions here.

With 0.14 there is practically no way in understanding how indexing or statistical means are affecting queries apart from "output number of rows" in Spark SQL dataframe, i.e. are they used at all and if they are, how effectively?

This issue could be closed, from out end we'll move further with assumption that indexing and statistical means in hudi are ineffective, though we'd enable them on our critical fields in case further releases of hudi would implement performance improvements.

ad1happy2go commented 5 months ago

@bk-mz Why do you think "indexing and statistical means in hudi are ineffective" when number of output rows with bloom is clearly lot less than number of output rows without bloom. You can also try column stats indexing also in this case. That will optimise your read queries.

bk-mz commented 5 months ago

when number of output rows with bloom is clearly lot less than number of output rows without bloom.

@ad1happy2go

The query performance is same for both ro and snapshot cases, therefore I'm making that statement. Just having one number smaller than other number is cryptic.

You can also try column stats indexing also in this case.

As you can see, they are enabled:

hoodie.metadata.index.bloom.filter.column.list=id,account_id
hoodie.metadata.index.bloom.filter.enable=true
hoodie.metadata.index.column.stats.column.list=id,account_id
hoodie.metadata.index.column.stats.enable=true

My concern with Hudi and in this ticket specifically, that today Hudi does not allow you to introspect and figure out that any statistical or indexing solution is actually improving performance.

We can't tie hudi configurations with actual results, they are logically not connected as seen from queries above.

I.e. I can't say "ok I removed that configuration and my query started to lag", nor vice-versa, I also can't say "I added that column in statistics config and my queries are faster now", because there are no metrics nor practical evidences from anywhere helping to understand the cause.

bhasudha commented 5 months ago

Hi @bk-mz . Wanted to add to this thread. Query latency may not be the only metric to measure like explained in the above threads. The runs with parquet native bloom filters enabled and still taking similar time could be dominated by few factors: the need to still open all files to load the parquet native bloom filter, S3 throttling etc.

One way I would try testing this is to remove Hudi from the picture and take the same parquet dataset, and run it with and without parquet native bloom filter enabled. You should be able to see the output rows reduced, but the query time may not be that improved due to the need to load each of these files to read the bloom filters.

The Column stats in Hudi's metadata table helps to reduce the number of files scanned (unlike parquet native bloom filters). With data skipping enabled, Hudi uses the column stats stored in the metadata table instead of scanning the metadata in each parquet file, so Hudi can better plan the query with such stats and the predicates by scanning/reading fewer files when possible (see this blog for more details on data skipping in Hudi). This is particularly helpful on cloud storage as cloud storage requests have constant overhead and are subject to rate limiting.

You bring valid feedback that we will take and work on - better showcasing the impact of using these indexes so the users can easily spot them. Will update you back on how we are incorporating this shortly.

parisni commented 4 months ago

Hi @bk-mz thanks for the interest in parquet bloom filter. We have an open documentation PR about bloom filters which states:

So bloom would be useful in either case (at the parquet file level) :

  • the column has no duplicates
  • the column number of unique values is more than 40k

I would add to this, that the benefit of bloom is when the predicate can be filtered out by the bloom. If it is the case, then you could also tune the NDV (number of distinct values) to decrease the probability of false positive match.

If your column is not in this case, then parquet bloom would only add overhead, and would slow down a given query.

There is also benchmarks on spark side that could be of interest