apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.36k stars 2.42k forks source link

[SUPPORT] Setting hoodie.datasource.insert.dup.policy to drop still upserts the record in 0.14 #10650

Open keerthiskating opened 8 months ago

keerthiskating commented 8 months ago

Describe the problem you faced

If my incoming dataset already has a record which already exists in the hudi table, hudi is still updating the commit time and treating it as update even after setting 'hoodie.datasource.insert.dup.policy': 'drop',

To Reproduce

Steps to reproduce the behavior:

recordkey = "id,name"
precombine = "uuid"
method = "upsert"
table_type = "COPY_ON_WRITE"

hudi_options = {
    'hoodie.table.name': table_name,
    'hoodie.datasource.write.recordkey.field': recordkey,
    'hoodie.datasource.insert.dup.policy': 'drop',
    'hoodie.datasource.write.table.name': table_name,
    'hoodie.datasource.write.operation': method,
    'hoodie.datasource.write.precombine.field': precombine,
    'hoodie.table.cdc.enabled':'true',
    'hoodie.table.cdc.supplemental.logging.mode': 'data_before_after',
}

spark_df = spark.createDataFrame(
    data=[
    (1, "John",  1, False),
    (2, "Doe",  2, False),
], 
schema=["id", "name", "val", "_hoodie_is_deleted"])

from pyspark.sql.functions import sha2, concat_ws

record_key_col_array = recordkey.split(",")
record_key_col_array
spark_df = spark_df.withColumn("uuid", sha2(concat_ws("||", *record_key_col_array), 256))

spark_df.write.format("hudi"). \
    options(**hudi_options). \
    mode("overwrite"). \
    save(path)

df = spark. \
      read. \
      format("hudi"). \
      load(path)

df.select(['_hoodie_commit_time', 'id', 'name', 'val']).show()

+-------------------+---+----+---+
|_hoodie_commit_time| id|name|val|
+-------------------+---+----+---+
|  20240211155820562|  1|John|  1|
|  20240211155820562|  2| Doe|  2|
+-------------------+---+----+---+

spark_df = spark.createDataFrame(
    data=[
    (1, "John",  1, False)
], 
    schema=["id", "name", "val", "_hoodie_is_deleted"])
spark_df = spark_df.withColumn("uuid", sha2(concat_ws("||", *record_key_col_array), 256))

spark_df.write.format("hudi"). \
    options(**hudi_options). \
    mode("append"). \
    save(path)

# read latest data

df = spark. \
      read. \
      format("hudi"). \
      load(path)

df.select(['_hoodie_commit_time', 'id', 'name', 'val']).show()

+-------------------+---+----+---+
|_hoodie_commit_time| id|name|val|
+-------------------+---+----+---+
|  20240211155914976|  1|John|  1| ---> Commit time has updated even though record did not change.
|  20240211155820562|  2| Doe|  2|
+-------------------+---+----+---+

# query cdc data
cdc_read_options = {
    'hoodie.datasource.query.incremental.format': 'cdc',
    'hoodie.datasource.query.type': 'incremental',
    'hoodie.datasource.read.begin.instanttime': latest_commmit_ts
    # 'hoodie.datasource.read.end.instanttime': 20240208210952160,
}
df=spark.read.format("hudi"). \
    options(**cdc_read_options). \
    load(path)

df.show(2,False)

+---+-----------------+--------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------+
|op |ts_ms            |before                                                                                                                                      |after                                                                                                                                       |
+---+-----------------+--------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------+
|u  |20240211155914976|{"id": 1, "name": "John", "val": 1, "_hoodie_is_deleted": false, "uuid": "46ca69f145f50f414b7a8cd59656f4935a5162798f093edc708a1ba21c0e9c26"}|{"id": 1, "name": "John", "val": 1, "_hoodie_is_deleted": false, "uuid": "46ca69f145f50f414b7a8cd59656f4935a5162798f093edc708a1ba21c0e9c26"}|
+---+-----------------+--------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------+

Expected behavior

Since no updates were made to any records, hudi should not report any updates when performing cdc query

Environment Description

ad1happy2go commented 8 months ago

@keerthiskating This setting is only applicable when operation type is insert.

image
keerthiskating commented 8 months ago

@keerthiskating This setting is only applicable when operation type is insert.

image

Any idea how do I achieve this when doing upsert operation? I want hudi to ignore records that already exist in hudi table and not update those record's commit time.

ad1happy2go commented 8 months ago

@keerthiskating You may need to write your own Custom payload for the same. Also, We can contribute this feature to hudi code too.

One of the example here - https://gist.github.com/bhasudha/7ea07f2bb9abc5c6eb86dbd914eec4c6

keerthiskating commented 8 months ago

@ad1happy2go I do not have the bandwidth to contribute. @codope Any idea this will be supported / Do you think this is a valid use case?

jmnatzaganian commented 7 months ago

Despite the initial report being with upsert, I can confirm that the new hoodie.datasource.insert.dup.policy option does not drop dupes as expected with the insert write operation. The deprecated fields work as desired. I have a small example hudi_insert_no_dupes.py demonstrating the behavior. In the interim, I will be using the deprecated fields as a workaround.

jmnatzaganian commented 7 months ago

@keerthiskating - If you do not intend to update records, but instead merely want to drop them, then you should simply use insert instead of upsert. upsert is designed to update records. If; however, the intention is to upsert when certain fields have changed, but drop otherwise, then as @ad1happy2go mentioned you'll need to roll your own logic. Functionally, the data will be valid with upsert even if you see the changed field. So you can continue as-is with the understanding that you'll have some extra records. Note that with CDC you can compare the original and new and drop before ingesting into the next system.

ad1happy2go commented 7 months ago

Thanks @jmnatzaganian . We were made aware of that recently and we are working on document update. For datasource writer we still need to use the old config and this new config only works for sql.