apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.32k stars 2.41k forks source link

Inconsistency in Hudi Table Configuration between Initial Insert and Subsequent Merges #10626

Closed prashant462 closed 3 months ago

prashant462 commented 7 months ago

Issue Summary

When using dbt Spark with Hudi to create a Hudi format table, there is an inconsistency in the Hudi table configuration between the initial insert and subsequent merge operations. The properties provided in the options of the dbt model are correctly fetched and applied during the first run. However, during the second run, when executing the merge operation, Hudi fetches a subset of the properties from the Hudi catalog table, leading to the addition of default properties and changes in configuration.

Steps to Reproduce

Expected Behavior

Hudi should consistently set all specified properties in every run, irrespective of whether it is the initial insert or a subsequent merge operation. The properties passed in the options of the dbt model should be retained and applied consistently across all operations.

Environment Description

Additional context

In the second run MergeIntohoodieTableCommand.scala executes InsertIntoHoodieTableCommand.run() in this case hudi fetch the props from hudicatalog table where it fetches tableConfigs and catalog properties. But they are not all that complete properties which I passed in the first run using dbt options. Due to which hudi add some other default properties in the hoodie props which are not fetched in the hudicatalog props . Seems due to this many properties are changing. Below i have attached some images of properties fetched in subsequent merge operations

MicrosoftTeams-image (21) Screenshot 2024-02-05 at 10 00 20 PM
pravin1406 commented 7 months ago

@codope @nsivabalan can you guys help out here, we are kind of stuck to integrate dbt with hudi in our production use case.

ad1happy2go commented 7 months ago

@pravin1406 Just to ask one question , you are using GLOBAL_SIMPLE but you don't have any partition column defined. Can you post your table properties.

What all configurations you are not seeing that are missing in MERGE INTO ?

ad1happy2go commented 7 months ago

I just noticed that it is using Non Partitioned key Generator only in your case. (in debug diagram). So that may be the reason its using SIMPLE instead of GLOBAL_SIMPLE as the later doesn't make sense.

prashant462 commented 7 months ago

Hello @ad1happy2go , I am printing the hoodie configs in hoodie code before inserting records . I am attaching the first run and second run conf we got .

DBT model executed

{{ config( materialized = 'incremental', file_format= 'hudi', pre_hook="SET spark.sql.legacy.allowNonEmptyLocationInCTAS = true", location_root="file:///Users/B0279627/Downloads/Hudi", unique_key="id", partition_by="name", incremental_strategy="merge", options={ 'preCombineField': 'id2', 'hoodie.index.type':"GLOBAL_SIMPLE", 'hoodie.datasource.write.partitionpath.field': 'name', 'hoodie.datasource.hive_sync.partition_fields': 'name', 'hoodie.datasource.hive_sync.table': 'hudi_test_two', 'hoodie.datasource.hive_sync.database':'qultyzn1_prepd', 'hoodie.simple.index.update.partition.path':'true', 'hoodie.keep.min.commits':'145', 'hoodie.keep.max.commits':'288', 'hoodie.cleaner.policy':'KEEP_LATEST_BY_HOURS', 'hoodie.cleaner.hours.retained':'72', 'hoodie.cleaner.fileversions.retained':'144', 'hoodie.cleaner.commits.retained':'144', 'hoodie.upsert.shuffle.parallelism':'200', 'hoodie.insert.shuffle.parallelism':'200', 'hoodie.bulkinsert.shuffle.parallelism':'200', 'hoodie.delete.shuffle.parallelism':'200', 'hoodie.parquet.compression.codec':'zstd', 'hoodie.datasource.hive_sync.support_timestamp':'true', 'hoodie.datasource.write.reconcile.schema':'true', 'hoodie.enable.data.skipping':'true', 'hoodie.datasource.write.payload.class':'org.apache.hudi.common.model.DefaultHoodieRecordPayload', } ) }}

-- first run select 1 as id, 2 as id2,"test2" as name

-- second run select 1 as id, 2 as id2,"test" as name

Screenshot 2024-02-06 at 10 49 26 PM

second_run_conf.txt first_run_conf.txt

pravin1406 commented 7 months ago

https://github.com/apache/hudi/issues/9342 @ad1happy2go We were facing this issue as well. So basically we know which datasource options we want to use, but we want to use them with spark sql support given by hudi. In the second run , one of the property that was also changing was "hoodie.datasource.write.payload.class" . As seen in the issue I mentioned, it has been fixed in the 0.13.1 version release for InsertInto. But for MergeInto command, it will still override the PAYLOAD_CLASS_NAME to ExpressionPayload, as that is part of the overriding options in buildMergeIntoConfig method in MergeIntoHoodieTableCommand .scala class

Our original requirement is we want to UPSERT on COW/MOR table while using Hudi DefaultHoodieRecordPayload. On first run we do CreateTable -> InsertInto On Second run we do MergeInto. Here the match condition look somewhat like this.

 when matched then update set * 
 when not matched then insert *

It would be great if you can articulate it better here for our understanding. Should we move to 0.13 or higher version will that solve the issue. Should we use InsertInto with some additional insert into behaviour properties?

ad1happy2go commented 7 months ago

@prashant462 I tried exact same model and it is working as expected with 0.14.1 version.

After first run (select 2 as id, 2 as id2,"test" as name) -

image

After second run (select 2 as id, 2 as id2,"test2" as name)

image

Can you please try with Hudi version 0.14.X

nsivabalan commented 7 months ago

yes, w/ pre 0.14.0, hudi expects to pass in all write configs w/ every write. from 0.14.0, atleast for table properties, hudi tries to reuse from the properties already serialized as table props. this is not applicable to write properties btw. those are not serialized anywhere.

prashant462 commented 7 months ago

@nsivabalan @ad1happy2go I tried with hudi 0.14.1 hudi configs seems to be working now ,thanks . But I am facing one other issue with below property 'hoodie.simple.index.update.partition.path':'true',

I am running a dbt model with below config.

{{ config( materialized = 'incremental', file_format= 'hudi', pre_hook="SET spark.sql.legacy.allowNonEmptyLocationInCTAS = true", location_root="file:///Users/B0279627/Downloads/Hudi", unique_key="id", partition_by="name", incremental_strategy="merge", options={ 'hoodie.datasource.write.precombine.field': 'id2', 'hoodie.index.type':"GLOBAL_SIMPLE", 'hoodie.datasource.write.partitionpath.field': 'name', 'hoodie.datasource.hive_sync.partition_fields': 'name', 'hoodie.datasource.hive_sync.table': 'hudi_test_five', 'hoodie.datasource.hive_sync.database':'qultyzn1_prepd', 'hoodie.simple.index.update.partition.path':'true', 'hoodie.keep.min.commits':'145', 'hoodie.keep.max.commits':'288', 'hoodie.cleaner.policy':'KEEP_LATEST_BY_HOURS', 'hoodie.cleaner.hours.retained':'72', 'hoodie.cleaner.fileversions.retained':'144', 'hoodie.cleaner.commits.retained':'144', 'hoodie.upsert.shuffle.parallelism':'200', 'hoodie.insert.shuffle.parallelism':'200', 'hoodie.bulkinsert.shuffle.parallelism':'200', 'hoodie.delete.shuffle.parallelism':'200', 'hoodie.parquet.compression.codec':'zstd', 'hoodie.datasource.hive_sync.support_timestamp':'true', 'hoodie.datasource.write.reconcile.schema':'true', 'hoodie.enable.data.skipping':'true', 'hoodie.spark.sql.insert.into.operation': 'upsert', 'hoodie.datasource.write.payload.class':'org.apache.hudi.common.model.DefaultHoodieRecordPayload', } ) }}

1st run -- select 1 as id, 4 as id2, "test5" as name

2nd run -- select 1 as id, 2 as id2, "test4" as name

But parition path is not updating for the record key.

I am attaching the table result.

Screenshot 2024-02-09 at 10 43 11 AM
prashant462 commented 6 months ago

@nsivabalan @ad1happy2go I tried with hudi 0.14.1 hudi configs seems to be working now ,thanks . But I am facing one other issue with below property 'hoodie.simple.index.update.partition.path':'true',

I am running a dbt model with below config.

{{ config( materialized = 'incremental', file_format= 'hudi', pre_hook="SET spark.sql.legacy.allowNonEmptyLocationInCTAS = true", location_root="file:///Users/B0279627/Downloads/Hudi", unique_key="id", partition_by="name", incremental_strategy="merge", options={ 'hoodie.datasource.write.precombine.field': 'id2', 'hoodie.index.type':"GLOBAL_SIMPLE", 'hoodie.datasource.write.partitionpath.field': 'name', 'hoodie.datasource.hive_sync.partition_fields': 'name', 'hoodie.datasource.hive_sync.table': 'hudi_test_five', 'hoodie.datasource.hive_sync.database':'qultyzn1_prepd', 'hoodie.simple.index.update.partition.path':'true', 'hoodie.keep.min.commits':'145', 'hoodie.keep.max.commits':'288', 'hoodie.cleaner.policy':'KEEP_LATEST_BY_HOURS', 'hoodie.cleaner.hours.retained':'72', 'hoodie.cleaner.fileversions.retained':'144', 'hoodie.cleaner.commits.retained':'144', 'hoodie.upsert.shuffle.parallelism':'200', 'hoodie.insert.shuffle.parallelism':'200', 'hoodie.bulkinsert.shuffle.parallelism':'200', 'hoodie.delete.shuffle.parallelism':'200', 'hoodie.parquet.compression.codec':'zstd', 'hoodie.datasource.hive_sync.support_timestamp':'true', 'hoodie.datasource.write.reconcile.schema':'true', 'hoodie.enable.data.skipping':'true', 'hoodie.spark.sql.insert.into.operation': 'upsert', 'hoodie.datasource.write.payload.class':'org.apache.hudi.common.model.DefaultHoodieRecordPayload', } ) }}

1st run -- select 1 as id, 4 as id2, "test5" as name

2nd run -- select 1 as id, 2 as id2, "test4" as name

But parition path is not updating for the record key.

I am attaching the table result. Screenshot 2024-02-09 at 10 43 11 AM

@ad1happy2go @nsivabalan It seems like it is taking preCombineField into consideration , is it valid case ?. 'hoodie.simple.index.update.partition.path':'true', honours preCombineField ?

because I changed 2nd run to -- select 1 as id, 4(or any greater value) as id2, "test4" as name it updated the partition.

ad1happy2go commented 6 months ago

@prashant462 Yes, this is expected as precombine(ordering) field is for that cause only. Any upsert will happen only if precombine is more than existing data value.

ad1happy2go commented 6 months ago

@prashant462 Feel free to close this issue in case you are all good. Or let us know how can we help on this more. Thanks a lot.