apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.19k stars 2.38k forks source link

[SUPPORT] - Partial update of the MOR table after compaction with Hudi Streamer #11348

Open kirillklimenko opened 1 month ago

kirillklimenko commented 1 month ago

Describe the problem you faced

I'm using Hudi Streamer to ingest data from Kafka to Hudi MOR table.

If I produce the {"sha512": "hash", "column1": "before", "column2": "before", "update_dt": "2024-01-01T00:00:00.000000"} to the Kafka topic, I will get the following result in the Hudi table (after compaction):

sha512 column1 column2 update_dt
hash before before 2024-01-01T00:00:00.000000

My goal is to update column2, without affecting column1 by producing {"sha512": "hash", "column2": "after", "update_dt": "2024-02-02T00:00:00.000000"} to the Kafka topic (after compaction).

What I expect as a result (after compaction):

sha512 column1 column2 update_dt
hash before after 2024-02-02T00:00:00.000000

What I have as a result (after compaction):

sha512 column1 column2 update_dt
hash null after 2024-02-02T00:00:00.000000

Since I have PartialUpdateAvroPayload in my settings below, I expect a partial update, but I suspect it doesn't work because of "default": null keyword in my schema definition.

Could you suggest how can I update column2, without affecting column1 with the current setup using Hudi Streamer?

My spark-submit command:

spark-submit \
    --jars /usr/lib/hudi/hudi-aws-bundle.jar \
    --class org.apache.hudi.utilities.streamer.HoodieStreamer \
    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
    --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \
    --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
    --conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar \
    /usr/lib/hudi/hudi-utilities-bundle.jar \
    --hoodie-conf bootstrap.servers=$KAFKA_BROKERS \
    --op UPSERT \
    --props /app/config/hudi.properties \
    --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
    --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
    --source-ordering-field updated_dt \
    --table-type MERGE_ON_READ \
    --target-base-path s3://$S3_BUCKET/bronze \
    --target-table bronze \
    --transformer-class org.apache.hudi.utilities.transform.SqlFileBasedTransformer

My hudi.properties file:

auto.offset.reset=earliest
hoodie.index.type=GLOBAL_BLOOM
hoodie.bloom.index.update.partition.path=false

hoodie.datasource.write.recordkey.field=sha512
hoodie.datasource.write.partitionpath.field=year,month,day
hoodie.datasource.write.precombine.field=updated_dt
hoodie.datasource.write.hive_style_partitioning=true
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
hoodie.datasource.write.payload.class=org.apache.hudi.common.model.PartialUpdateAvroPayload

hoodie.streamer.source.kafka.topic=hudi-stream
hoodie.streamer.transformer.sql.file=/app/sql/transformer.sql
hoodie.streamer.schemaprovider.source.schema.file=/ulh-data-streamer/app/schema/source.avsc
hoodie.streamer.schemaprovider.target.schema.file=/ulh-data-streamer/app/schema/target.avsc

My transformer.sql file:

SELECT
    t.*,
    DATE_FORMAT(t.updated_dt, 'yyyy') AS year,
    DATE_FORMAT(t.updated_dt, 'MM') AS month,
    DATE_FORMAT(t.updated_dt, 'dd') AS day
FROM
    <SRC> t;

My source.avsc file:

{
  "type": "record",
  "name": "source_schema",
  "fields": [
    {"name": "sha512", "type": "string"},
    {"name": "column1", "type": ["null", "string"], "default": null},
    {"name": "column2", "type": ["null", "string"], "default": null},
    {"name": "updated_dt", "type": "string"}
  ]
}

My target.avsc file:

{
  "type": "record",
  "name": "target_schema",
  "fields": [
    {"name": "sha512", "type": "string"},
    {"name": "column1", "type": ["null", "string"], "default": null},
    {"name": "column2", "type": ["null", "string"], "default": null},
    {"name": "updated_dt", "type": "string"},
    {"name": "year", "type": "string"},
    {"name": "month", "type": "string"},
    {"name": "day", "type": "string"}
  ]
}

Environment Description

ad1happy2go commented 1 month ago

@kirillklimenko Normally PartialUpdateAvroPayload should avoid updating the columns which have null value.

kirillklimenko commented 1 month ago

@kirillklimenko Normally PartialUpdateAvroPayload should avoid updating the columns which have null value.

This is what I expect, however, it does not happen, which is strange.

ad1happy2go commented 1 month ago

@kirillklimenko I tried to mimic similar scenario but it is avoiding columns with null values. Can you come up with reproducible script.

kirillklimenko commented 1 month ago

@ad1happy2go, here is everything I have in my streamer: hudi-upsert-issue.zip

I'm running an application on AWS EMR 7.1 (Hudi 0.14.1) with AWS Glue sync:

  1. Produced {"sha512": "hash", "column1": "before", "column2": "before", "update_dt": "2024-01-01T00:00:00.000000"} to the Kafka topic
  2. Produced {"sha512": "hash", "column2": "after", "update_dt": "2024-02-02T00:00:00.000000"} to the Kafka topic

Tested with:

  1. AWS Glue
    use default;
    select sha512, column1, column2 from default.bronze_ro; -- hash, before, before
    select sha512, column1, column2 from default.bronze_rt; -- hash, null, after
  2. Spark SQL
    spark.sql("select sha512, column1, column2 from default.bronze_ro").show()  # hash, before, before
    spark.sql("select sha512, column1, column2 from default.bronze_rt").show()  # hash, null, after
ad1happy2go commented 3 weeks ago

@kirillklimenko We will look into it. Thanks for the details.

soumilshah1995 commented 3 weeks ago

I was reading thread would this helphttps://soumilshah1995.blogspot.com/2023/05/hands-on-lab-unleashing-efficiency-and.html


+-----------+------------------+--------------+------------------+---------------------------+------+-------------------+
|customer_id|name              |state         |city              |created_at                 |salary|_hoodie_commit_time|
+-----------+------------------+--------------+------------------+---------------------------+------+-------------------+
|1          |Mackenzie Martinez|Maryland      |Lake Nicole       |tdean@example.org          |34853 |20230518073201741  |
|0          |Carolyn Calderon  |South Carolina|West Geraldborough|porterchristina@example.org|69285 |20230518073201741  |
|4          |Jacob Barnes      |Arizona       |Michellehaven     |lewisjennifer@example.net  |48149 |20230518073201741  |
|3          |Elizabeth Jones   |Michigan      |Clarkborough      |maurice87@example.net      |null  |20230518073201741  |
|2          |Sydney Ayala      |Rhode Island  |Port Patrickstad  |kevin20@example.com        |52640 |20230518073201741  |
+-----------+------------------+--------------+------------------+---------------------------+------+-------------------+

NEW RECORDS

+-----------+----+-----+--------+----------+-----+------+
|customer_id|name|state|    city|created_at|email|salary|
+-----------+----+-----+--------+----------+-----+------+
|          1|null|   CT|stamford|      null| null|  null|
+-----------+----+-----+--------+----------+-----+------+

AFTER UPSERT
+-----------+------------------+--------------+------------------+---------------------------+------+-------------------+
|customer_id|name              |state         |city              |created_at                 |salary|_hoodie_commit_time|
+-----------+------------------+--------------+------------------+---------------------------+------+-------------------+
|1          |Mackenzie Martinez|CT            |stamford          |tdean@example.org          |34853 |20230518073305092  |
|0          |Carolyn Calderon  |South Carolina|West Geraldborough|porterchristina@example.org|69285 |20230518073201741  |
|4          |Jacob Barnes      |Arizona       |Michellehaven     |lewisjennifer@example.net  |48149 |20230518073201741  |
|3          |Elizabeth Jones   |Michigan      |Clarkborough      |maurice87@example.net      |null  |20230518073201741  |
|2          |Sydney Ayala      |Rhode Island  |Port Patrickstad  |kevin20@example.com        |52640 |20230518073201741  |
+-----------+------------------+--------------+------------------+---------------------------+------+-------------------+

I have not tried streamer route I assume should be similar

kirillklimenko commented 3 weeks ago

I have not tried streamer route I assume should be similar

@soumilshah1995, I can confirm that org.apache.hudi.common.model.PartialUpdateAvroPayload works as expected with Spark Structured Streaming, but doesn't work with Hudi Streamer.

I decided to stop trying to upsert with Hudi Streamer and rewrote everything to Spark.

But I still hope that the Hudi Streamer will be fixed, since this is the simplest and low-code way to stream to Hudi sources.