apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.24k stars 2.39k forks source link

[SUPPORT] Update few columns & insert all columns during merge #9368

Closed praneethh closed 11 months ago

praneethh commented 11 months ago

I have an existing table with these columns: record key is emp_id, precombine is log_ts and partition is log_dt

emp_id  emp_name  log_ts                               load_ts                             log_dt
1                neo             2023-08-04 12:00:00  2023-08-04 12:00:00    2023-08-04

The incoming batch will look like this( emp_id 1 is updated, emp_id 2 is brand new record)

emp_id  emp_name       log_ts                               
1                neo                2023-08-05 14:00:00 
2               trinity             2023-08-05 14:00:00

My desired output should look like below. For the first record, log_ts & log_dt is updated, load_ts shouldn't be changed because it captures when was the first time the record was loaded. For the second record, it should be a insert along with the current load_ts.

emp_id  emp_name  log_ts                               load_ts                               log_dt
1                neo             2023-08-05 14:00:00   2023-08-04 12:00:00     2023-08-05
2               trinity          2023-08-05 14:00:00   2023-08-05 15:00:00     2023-08-05

Can PartialUpdateAvroPayload be used to achieve both the scenarios ( new insert with all columns, upsert with only few columns)? If yes, how to set the properties to achieve this?

ad1happy2go commented 11 months ago

@praneethh you can use PartialUpdateAvroPayload, just need to configure the payload class in tableProperties if you are using spark-sql.

Sample code for your use case which I tried -

CREATE TABLE test5 using hudi
TBLPROPERTIES ( type = 'cow', primaryKey = 'emp_id', preCombineField = 'log_ts',
'hoodie.table.name' = 'test1','hoodie.table.partition.fields' = 'log_dt',
'hoodie.datasource.write.payload.class' = 'org.apache.hudi.common.model.PartialUpdateAvroPayload')
AS (select 1 as emp_id, 'neo' as emp_name, current_timestamp as log_ts, current_timestamp as load_ts, current_date as log_dt)
UNION
select 3 as emp_id, 'neo' as emp_name, null as log_ts, null as load_ts, null as log_dt
;

INSERT INTO test5
(SELECT 1 as emp_id, 'neo' as emp_name, current_timestamp as log_ts, null as load_ts, current_date as log_dt)
UNION
(SELECT 2 as emp_id, 'trinity' as emp_name, current_timestamp as log_ts, current_timestamp as load_ts, current_date as log_dt)

Also the better way I guess should be using MERGE INTO command.

praneethh commented 11 months ago

@ad1happy2go @codope @danny0405 Yes, MERGE INTO will be a better way to implement in this scenario as it's difficult to identify if the incoming record is true insert or update. Also, I couldn't find any documentation on how to use MERGE INTO in Hudi. Looks like it's missing https://hudi.apache.org/docs/quick-start-guide/#mergeinto

praneethh commented 11 months ago

@ad1happy2go @codope @danny0405 I tried merge into but getting the below error:

res3.registerTempTable("incremental_data")

scala> val sqlPartialUpdate =
     |       s"""
     |        | merge into praneeth.hudi_test as target
     |        | using (
     |        |   select * from incremental_data
     |        | ) source
     |        | on  target.emp_id = source.emp_id
     |        | when matched then
     |        |   update set target.log_ts = source.log_ts, target.log_dt = source.log_dt
     |        | when not matched then insert *
     |        """.stripMargin

spark.sql(sqlPartialUpdate)
java.lang.UnsupportedOperationException: MERGE INTO TABLE is not supported temporarily
ad1happy2go commented 11 months ago

@praneethh Looks like you opened https://github.com/apache/hudi/issues/9469 one for this error. So we can track there, closing out this.

pravin1406 commented 2 months ago

@codope I wanted to do something similar using merge into. I want to only provide certain set of columns in source table. But due to overriding options in MergeIntoHudiCommand, i'm unable to set reconcile schema to true. I guess it was true, then then merge would have ran.

Any specific reason to forcefully set reconcile to false?

I'm trying to do this where merge_source has only few target columns.

spark.sql("merge into partial_update_rt as target using merge_source as source on target."+recordkey +" = source."+recordkey+" when matched then update set when not matched then insert ")