apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.32k stars 2.41k forks source link

[SUPPORT] Incremental read query performance #10334

Open noahtaite opened 8 months ago

noahtaite commented 8 months ago

Describe the problem you faced

I'm running an application that reads from 4 medium-sized (few hundred GB) Hudi MoR tables which are compacted weekly.

When running incremental queries to look over 3~5 commits with hoodie.datasource.read.incr.fallback.fulltablescan.enable -> true, I am seeing a 5 hour gap in my Spark History Server: image

Incremental read config:

Scan MergeOnReadIncrementalRelation(
org.apache.spark.sql.SQLContext@3b457d8a,Map(
    hoodie.datasource.read.incr.fallback.fulltablescan.enable -> true,
    path -> s3://lake/names.all_hudi,
    hoodie.write.lock.zookeeper.url -> ip-10-80-36-225.ec2.internal,
    hoodie.write.lock.zookeeper.base_path -> /hudi,
    hoodie.metadata.enable -> true,
    hoodie.datasource.hive_sync.jdbcurl -> jdbc:hive2://ip-10-xx-xx-xxx.ec2.internal:10000,
    hoodie.datasource.read.begin.instanttime -> 20231207170214000,
    hoodie.datasource.query.type -> incremental,
    hoodie.cleaner.policy.failed.writes -> EAGER,
    hoodie.write.lock.zookeeper.port -> 2181,
    hoodie.write.lock.provider -> org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider,
    hoodie.write.concurrency.mode -> single_writer),HoodieTableMetaClient{basePath='s3://lake/names.all_hudi',
    metaPath='s3://lake/names.all_hudi/.hoodie',
    tableType=MERGE_ON_READ},None,None
)
Job time: 2023/12/12 00:47:50
=> 5 commits for incremental read
number of output rows: 19,926,780

During the gap, my driver kills all 40 of my executor instances and I see the following logs on driver: log.txt

When I use snapshot query + filter on _hoodie_commit_time, the gap reduces to ~20mins: image

Snapshot read config:

Scan MergeOnReadSnapshotRelation(org.apache.spark.sql.SQLContext@24a4d185,
Map(
    path -> s3://lake/names.all_hudi,
    hoodie.write.lock.zookeeper.url -> ip-xx-xx-xxx.ec2.internal,
    hoodie.write.lock.zookeeper.base_path -> /hudi,
    hoodie.metadata.enable -> true,
    hoodie.datasource.hive_sync.jdbcurl -> jdbc:hive2://ip-10-xx-xx-xxx.ec2.internal:10000,
    hoodie.datasource.query.type -> snapshot,
    hoodie.cleaner.policy.failed.writes -> EAGER,
    hoodie.write.lock.zookeeper.port -> 2181,
    hoodie.write.lock.provider -> org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider,
    hoodie.write.concurrency.mode -> single_writer),
    HoodieTableMetaClient{basePath='s3://lake/names.all_hudi',
    metaPath='s3://lake/names.all_hudi/.hoodie',
    tableType=MERGE_ON_READ},
    List(),
    None,
    None
)

When I do incremental read with hoodie.datasource.read.incr.fallback.fulltablescan.enable -> false, I see the incremental query gap go from 5 hours -> 5 mins 😮 : image

Incremental read config (no fulltable fallback):

Scan MergeOnReadIncrementalRelation(org.apache.spark.sql.SQLContext@dcb6efd,
Map(
    path -> s3://lake/names.all_hudi,
    hoodie.write.lock.zookeeper.url -> ip-10-xx-xx-xxx.ec2.internal,
    hoodie.write.lock.zookeeper.base_path -> /hudi,
    hoodie.metadata.enable -> true,
    hoodie.datasource.hive_sync.jdbcurl -> jdbc:hive2://ip-10-xx-xx-xxx.ec2.internal:10000,
    hoodie.datasource.read.begin.instanttime -> 20231212232646000,
    hoodie.datasource.query.type -> incremental,
    hoodie.cleaner.policy.failed.writes -> EAGER,
    hoodie.write.lock.zookeeper.port -> 2181,
    hoodie.write.lock.provider -> org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider,
    hoodie.write.concurrency.mode -> single_writer),
    HoodieTableMetaClient{basePath='s3://lake/names.all_hudi',
    metaPath='s3://lake/names.all_hudi/.hoodie',
    tableType=MERGE_ON_READ},
    None,
    None
)
number of output rows: 12,541,221

This leads me to believe the incremental query was degenerating into a full table scan when it shouldn't... (all my commits are active and uncleaned): https://github.com/apache/hudi/blob/7a6543958368540d221ddc18e0c12b8d526b6859/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala#L142-L151

Wondering if someone can help me understand the following:

  1. Why is the incremental load degenerating into a full table scan when I can run the same job with the flag disabled and do not run into File Not Found?
  2. Is the full table scan expected to happen on only one instance (the driver?)
  3. Can I safely leave this flag false, and expect to only run into FileNotFound if running cleaner?

Expected behavior

Only fall back to full table scan in incremental load when necessary (file not found issue) and not all the time.

Environment Description

danny0405 commented 8 months ago

cc @linliu-code , maybe he could give some investigations of spark inc read.

noahtaite commented 8 months ago

Thanks @danny0405 for response. @linliu-code let me know if you would share some insight or need details. tks