apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.36k stars 2.42k forks source link

[SUPPORT]Spark Job Reading Null Data from Hudi COPY_ON_WRITE Table Due to Inflight Commit During Snapshot Query #11971

Open maabkhan opened 3 weeks ago

maabkhan commented 3 weeks ago

Describe the problem you faced

One of my Spark jobs is reading data from a Hudi COPY_ON_WRITE table using the snapshot query type. The job runs once a day, but the table is updated every hour. When the job reads the table while it is being updated, it loads instants up to an inflight commit, leading to all null data being read.

To Reproduce

Steps to reproduce the behavior:

  1. Create a Hudi COPY_ON_WRITE table.
  2. Schedule a Spark job to read from the table using the snapshot query type.
  3. Update the table every hour.
  4. Run the Spark job while the table is being updated.

Expected behavior

The Spark job should wait for the inflight commit to complete and read data from the latest completed commit or read from a clean_completed commit by referring to older commit, ensuring that no null data is read.

Environment Description

Additional context

The issue seems to be that the Spark job is loading instants up to an inflight commit instead of loading from clean_completed commit. This results in reading null data. I am looking for a Hudi configuration that can ensure the job waits for inflight commits to complete before reading the data or reads from older clean_completed commit.

Also same job runs fine , when the table being read in not getting updated. logs from job that runs fine , states - Loaded instants upto : Option{val=[20240917195827185cleanCOMPLETED20240917195837000]} whereas logs from job that has issue , states - Loaded instants upto : Option{val=[==>20240918193949292commitINFLIGHT20240918194221000]}

Stacktrace Logs of Job with Issue -


2024-09-18T19:44:14.082728611Z 24/09/18 19:44:14 INFO TablePathUtils: Getting table path from path : s3a://trusted-luna-prod/tmevents_hourly/topics/account_balance_events
2024-09-18T19:44:14.167799106Z 24/09/18 19:44:14 INFO DefaultSource: Obtained hudi table path: s3a://trusted-luna-prod/tmevents_hourly/topics/account_balance_events
2024-09-18T19:44:14.211559950Z 24/09/18 19:44:14 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from s3a://trusted-luna-prod/tmevents_hourly/topics/account_balance_events
2024-09-18T19:44:14.255635781Z 24/09/18 19:44:14 INFO HoodieTableConfig: Loading table properties from s3a://trusted-luna-prod/tmevents_hourly/topics/account_balance_events/.hoodie/hoodie.properties
2024-09-18T19:44:14.335476879Z 24/09/18 19:44:14 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from s3a://trusted-luna-prod/tmevents_hourly/topics/account_balance_events
2024-09-18T19:44:14.350218569Z 24/09/18 19:44:14 INFO DefaultSource: Is bootstrapped table => false, tableType is: COPY_ON_WRITE, queryType is: snapshot
2024-09-18T19:44:14.464630233Z 24/09/18 19:44:14 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[==>20240918193949292__commit__INFLIGHT__20240918194221000]}```
danny0405 commented 3 weeks ago

The snapshot reader or inc reader should always read complete commits, that is what we call it read-write snapshot isolation, which is a basic semantics for Table Format, so I guess the issue comes from other stuff.

HoodieActiveTimeline: Loaded instants upto

The full timeline always loads all the instants, but it does not mean the reader would read the inflights.

maabkhan commented 3 weeks ago

@danny0405 in this scenario the reader is reading the inflight commit only , the same job with same data source is working fine when write is not happening on that, whereas when the table read is also getting updated, it is just loading the inflight_commit and moving to other tasks. In the logs of the job with issue and no where it has loaded the clean_completed commit . When the job which run fine where concurrent write is not happening on the source table there we get in logs something like this - 2024-09-17T20:00:07.425377129Z 24/09/17 20:00:07 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[20240917195827185cleanCOMPLETED__20240917195837000]} which is not there in the logs of job with issue.

Also the result of the job with issue on completion shows that target is containing null data with correct schema. Where as when i re-run the same job when concurrent write is not happening on the source table, correct data is loaded in the target.

full logs of job with issue related to referred table-


2024-09-18T19:44:03.501578493Z  End date: '2024-09-18 18:30:00'
2024-09-18T19:44:03.502547582Z 2024-09-18 19:44:03,502 KF_APP.local.luna_etl.utils.sql_utils(120) INFO: Full load SQL for table with no created col: 
2024-09-18T19:44:03.502561841Z             SELECT *
2024-09-18T19:44:03.502566391Z             FROM lake_tmevents_hourly.account_balance_events;
2024-09-18T19:44:03.502570121Z             
2024-09-18T19:44:03.521776686Z 24/09/18 19:44:03 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.
2024-09-18T19:44:03.526379251Z 24/09/18 19:44:03 INFO SharedState: Warehouse path is 'file:/opt/spark/work-dir/spark-warehouse'.
2024-09-18T19:44:06.056057981Z 24/09/18 19:44:06 INFO HiveConf: Found configuration file null
2024-09-18T19:44:06.071544306Z 24/09/18 19:44:06 INFO HiveUtils: Initializing HiveMetastoreConnection version 2.3.9 using Spark classes.
2024-09-18T19:44:06.676999008Z 24/09/18 19:44:06 INFO HiveClientImpl: Warehouse location for Hive client (version 2.3.9) is file:/opt/spark/work-dir/spark-warehouse
2024-09-18T19:44:08.381899474Z 24/09/18 19:44:08 INFO AWSGlueClientFactory: Using region from ec2 metadata : ap-south-1
2024-09-18T19:44:09.749804085Z 24/09/18 19:44:09 INFO AWSGlueClientFactory: Using region from ec2 metadata : ap-south-1
2024-09-18T19:44:13.168847379Z 24/09/18 19:44:13 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
2024-09-18T19:44:13.226672142Z 24/09/18 19:44:13 INFO MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2024-09-18T19:44:13.228102228Z 24/09/18 19:44:13 INFO MetricsSystemImpl: s3a-file-system metrics system started
2024-09-18T19:44:14.047268479Z 24/09/18 19:44:14 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
2024-09-18T19:44:14.074185695Z 24/09/18 19:44:14 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
2024-09-18T19:44:14.081421948Z 24/09/18 19:44:14 INFO DataSourceUtils: Getting table path..
2024-09-18T19:44:14.082728611Z 24/09/18 19:44:14 INFO TablePathUtils: Getting table path from path : s3a://trusted-luna-prod/tmevents_hourly/topics/account_balance_events
2024-09-18T19:44:14.167799106Z 24/09/18 19:44:14 INFO DefaultSource: Obtained hudi table path: s3a://trusted-luna-prod/tmevents_hourly/topics/account_balance_events
2024-09-18T19:44:14.211559950Z 24/09/18 19:44:14 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from s3a://trusted-luna-prod/tmevents_hourly/topics/account_balance_events
2024-09-18T19:44:14.255635781Z 24/09/18 19:44:14 INFO HoodieTableConfig: Loading table properties from s3a://trusted-luna-prod/tmevents_hourly/topics/account_balance_events/.hoodie/hoodie.properties
2024-09-18T19:44:14.335476879Z 24/09/18 19:44:14 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from s3a://trusted-luna-prod/tmevents_hourly/topics/account_balance_events
2024-09-18T19:44:14.350218569Z 24/09/18 19:44:14 INFO DefaultSource: Is bootstrapped table => false, tableType is: COPY_ON_WRITE, queryType is: snapshot
2024-09-18T19:44:14.464630233Z 24/09/18 19:44:14 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[==>20240918193949292__commit__INFLIGHT__20240918194221000]}
2024-09-18T19:44:14.791957221Z 24/09/18 19:44:14 INFO TableSchemaResolver: Reading schema from s3a://trusted-luna-prod/tmevents_hourly/topics/account_balance_events/account_address=REVOLVE_LOAN_EMI_PRINCIPAL_BILLED/e5a4ae86-169a-4094-81b3-712545a959b4-0_220-55-5550_20240918183912828.parquet
2024-09-18T19:44:14.998331976Z 24/09/18 19:44:14 INFO S3AInputStream: Switching to Random IO seek policy
2024-09-18T19:44:15.507286054Z 24/09/18 19:44:15 INFO HoodieTableConfig: Loading table properties from s3a://trusted-luna-prod/tmevents_hourly/topics/account_balance_events/.hoodie/hoodie.properties
2024-09-18T19:44:15.596507505Z 24/09/18 19:44:15 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[==>20240918193949292__commit__INFLIGHT__20240918194221000]}
2024-09-18T19:44:15.596940823Z 24/09/18 19:44:15 INFO BaseHoodieTableFileIndex: Refresh table account_balance_events, spent: 121 ms
2
danny0405 commented 3 weeks ago

The reader would filter out all those inflight commits before reading, my speculation is mayabe there is something wrong which affect the input source to have null fields.

ad1happy2go commented 3 weeks ago

@maabkhan Can you share spark configs please. I tried and reader was not reading the inflight commit.

maabkhan commented 3 weeks ago

@ad1happy2go spark configs passed , rest configs will take default values - "sparkConf": { "spark.local.dir": "/tmp/spark-local-dir-shuffle-f2086f4d", "spark.serializer": "org.apache.spark.serializer.KryoSerializer", "spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension", "spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", "spark.sql.caseSensitive": "false", "spark.decommission.enabled": "true", "spark.sql.adaptive.enabled": "true", "spark.eventLog.rolling.enabled": "true", "spark.dynamicAllocation.enabled": "true", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog", "spark.sql.catalogImplementation": "hive", "spark.cleaner.periodicGC.interval": "1min", "spark.storage.decommission.enabled": "true", "spark.dynamicAllocation.maxExecutors": "200", "spark.dynamicAllocation.minExecutors": "1", "spark.kubernetes.allocation.batch.size": "10", "spark.kubernetes.driver.requestTimeout": "30000", "spark.sql.avro.datetimeRebaseModeInRead": "CORRECTED", "spark.dynamicAllocation.initialExecutors": "1", "spark.sql.avro.datetimeRebaseModeInWrite": "CORRECTED", "spark.sql.execution.arrow.sparkr.enabled": "true", "spark.kubernetes.driver.connectionTimeout": "30000", "spark.sql.execution.arrow.pyspark.enabled": "true", "spark.sql.parquet.datetimeRebaseModeInRead": "CORRECTED", "spark.sql.legacy.pathOptionBehavior.enabled": "true", "spark.sql.parquet.datetimeRebaseModeInWrite": "CORRECTED", "spark.storage.decommission.rddBlocks.enabled": "true", "spark.dynamicAllocation.executorAllocationRatio": "0.33", "spark.dynamicAllocation.shuffleTracking.enabled": "True", "spark.storage.decommission.shuffleBlocks.enabled": "true", "spark.kubernetes.allocation.driver.readinessTimeout": "120s", "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout": "60", "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version": "2" } "deps": { "jars": [ "https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.4-bundle_2.12/0.14.0/hudi-spark3.4-bundle_2.12-0.14.0.jar", "https://repo1.maven.org/maven2/org/apache/hive/hcatalog/hive-hcatalog-core/3.1.3/hive-hcatalog-core-3.1.3.jar" ] }

Hudi configs passed , rest will take default values - { "className": "org.apache.hudi", "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.write.precombine.field": "dms_timestamp", "hoodie.datasource.write.recordkey.field": "uuid", "hoodie.table.name": "users", "hoodie.consistency.check.enabled": "false", "hoodie.datasource.hive_sync.table": "users", "hoodie.datasource.hive_sync.database": "luna_lazypay", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.mode": "hms", "hoodie.datasource.hive_sync.support_timestamp": "true", "hoodie.datasource.write.reconcile.schema": "true", "path": "s3a://refined-luna-prod/luna_lazypay/users/", "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator", "hoodie.datasource.write.partitionpath.field": "year,month,day", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.partition_fields": "year,month,day", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.upsert.shuffle.parallelism": 40, "hoodie.datasource.write.operation": "upsert", "hoodie.cleaner.policy": "KEEP_LATEST_COMMITS", "hoodie.cleaner.commits.retained": 1 }

These configs are of the job details shared above which is trying to read from a table while it was getting updated. Also the table my job is reading is also a hudi table and that gets updated by similar kind of spark-hudi job but with some other configs.