apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.34k stars 2.42k forks source link

Hudi Partial Update not working by using MERGE statement on Hudi External Table #6055

Closed rishabhbandi closed 1 year ago

rishabhbandi commented 2 years ago

Describe the problem you faced

Scenario #1:

1)created a dataframe(targetDf) and using the below statement to write it in GCS Bucket location (for ex - locA) targetDF.write.format(org.apache.hudi).options(hudiWriteConf).mode(SaveMode.Overwrite).save(locA)

2)then we are creating an external hudi table on locA. lets call it ext_hudi_tbl_on_locA

3)next we have our dataframe which contains record that has columns to be updated. lets call it updDf.

4)we are creating a spark table on top of updDf in spark session, lets call it upd_spark_tbl.

5)then we run the merge command using spark.sql() way on ext_hudi_tbl_on_locA using upd_spark_tbl, the statement finishes without any error but it does not update any record.

NOTE: we checked that there is no data issue, if we join the tables ext_hudi_tbl_on_locA and upd_spark_tbl it works and give the joined data result.

Scenario #2

1)we create an managed hudi table. lets call int_hudi_tbl

2)we insert data from targetDf into the above hudi table. using spark.sql() way.

3)next we have our dataframe which contains record that has columns to be updated lets call it updDf.

4)we are creating a spark table on top of updDf in spark session, lets call it upd_spark_tbl

5)then we run the merge command using spark.sql() way on int_hudi_tbl using upd_spark_tbl, the statement finishes without any error but this time it updates the data.

CONCLUSION Scenario #1: no error thrown and update does not works, Scenario #2: no error thrown and update works.

Please advise why its not working in Scenario #1.

Environment Description

yihua commented 2 years ago

@rishabhbandi could you provide the Hudi configs you use to write and update the tables?

@YannByron @xiarixiaoyao @XuQianJin-Stars could any of you help check if there is a problem?

rishabhbandi commented 2 years ago

Hudi Config "hoodie.datasource.write.recordkey.field" = "a,b" "hoodie.datasource.write.partitionpath.field" = "c,d" "hoodie.datasource.write.precombine.field" = "e" "hoodie.datasource.write.operation" = "upsert" "hoodie.datasource.write.table.type" = "COPY_ON_WRITE" "hoodie.table.name" = "dbname.tablename" "hoodie.datasource.write.keygenerator.class" = "org.apache.hudi.keygen.ComplexKeyGenerator" "hoodie.datasource.write.hive_style_partitioning" = "true" "hoodie.datasource.hive_sync.support_timestamp" = "true" "hoodie.cleaner.commits.retained" = 2 "hoodie.datasource.query.type" = "snapshot"

Spark Shell spark-shell --jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.22.2.jar,/edge_data/code/svcordrdats/pipeline-resources/hudi-support-jars/hudi-spark-bundle_2.12-0.11.0.jar --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.kryoserializer.buffer.max=512m --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.sql.catalogImplementation=hive'

hassan-ammar commented 2 years ago

@rishabhbandi can you please share the correct config to set table path ? My table is on s3.

I am trying your scenario #2 (merging by spark.sql with managed HUDI table) and getting this error : An error occurred while calling o89.sql. Hoodie table not found in path file:/tmp/spark-warehouse/[table_name]/.hoodie

And how to set Hudi config properties ? for spark dataframes we can add the hudi configurations as options but how to do the same with sprk.sql(...)

rishabhbandi commented 2 years ago

@hassan-ammar can we have one working session if possible? please let me know your good time.

hassan-ammar commented 2 years ago

@rishabhbandi we can talk now

hassan-ammar commented 2 years ago

Logging off for today. @rishabhbandi It would be really great if you share how to set the configs. I have tried the following spark = SparkSession.builder.config('hoodie.base.path','s3://[bucket path]/') .config('BASE_PATH.key','s3://[bucket path]/') Also tried: spark.sql("set hoodie.base.path=s3://[bucket path]/[table_name]/")

rishabhbandi commented 2 years ago

@hassan-ammar below command being used to create the spark shell

spark-shell --jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.22.2.jar,/edge_data/code/svcordrdats/pipeline-resources/hudi-support-jars/hudi-spark-bundle_2.12-0.11.0.jar --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.kryoserializer.buffer.max=512m --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.sql.catalogImplementation=hive'

you can save the hudi config as mentioned in my jira as a hudiConf.conf file and use that conf file in the options method.

hassan-ammar commented 2 years ago

For Scenario 1, _hoodie_commit_time is getting updated for rows which satisfies the merge criteria but other column values are not getting updated. For Scenario 2 I am still getting hoodie table not found error.

I am using aws glue along with hudi connecter for glue

xushiyan commented 2 years ago

cc @fengjian428

fengjian428 commented 2 years ago

@voonhous

voonhous commented 2 years ago

@rishabhbandi I don't quite understand the steps between:

1)created a dataframe(targetDf) and using the below statement to write it in GCS Bucket location (for ex - locA)
targetDF.write.format(org.apache.hudi).options(hudiWriteConf).mode(SaveMode.Overwrite).save(locA)

2)then we are creating an external hudi table on locA. lets call it ext_hudi_tbl_on_locA

and

1)we create an managed hudi table. lets call int_hudi_tbl

2)we insert data from targetDf into the above hudi table. using spark.sql() way.

Can you please provide a coded example instead, thanks.

nsivabalan commented 2 years ago

@rishabhbandi : can you respond to the clarifications when you get a chance please

nsivabalan commented 2 years ago

@rishabhbandi : do you mind sharing a reproducible scripts. would help investigate faster.

nsivabalan commented 2 years ago

@rishabhbandi : gentle ping.

nsivabalan commented 1 year ago

hey @rishabhbandi @hassan-ammar : were you folks able to resolve the issue. Did any fix go into hudi on this regard. can you guys help me understand is the issue still persists.

rishabhbandi commented 1 year ago

Hi Team, we changed the approach and created a separate custom java class instead to perform the partial update. Therefore closing this issue from my side.