apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.36k stars 2.42k forks source link

[SUPPORT] Partial Update with Partition column is not working as expected. #9217

Open ankur334 opened 1 year ago

ankur334 commented 1 year ago

Partial Update with some partition column/key is not working as expected.

Let's suppose I currently have the following event/message.

{
  "id": 1,
  "language": "python",
   "created": "2023-07-12",
   "updated": "2023-07-12"
}

primaryKey = id deDupKey/preCombine = updated partition = created

I am applying UPSERT as a writeOperation type.

Now I want to apply the partial update when receiving a record from my source system/producer.

The new incoming event is as follows.

{
   "id": 1,
   "language": "scala",
   "updated": "2023-07-13"
}

Now after a partial update, I want to update only columns like language & updated column. But after applying the partial update, we are getting null in the CREATED column.

The expected result after the merge/partial update should be

{
  "id": 1,
  "language": "scala",
  "created": "2023-07-12",
  "updated": "2023-07-13"
}

But it is coming as

{
   "id": 1,
   "language": "scala",
   "created": null,
   "updated": "2023-07-13"
}

Which is actually wrong. Will you please help us here? Are we doing something wrong?

Environment Description

Hudi version : 0.13.1 Spark version: 3.1 Hive version: 3.1 Storage (HDFS/S3/GCS..) : GCS Running on Docker? (yes/no) : No, running on Dataproc

Hudi Configs

val hudiConfigs: Map[String, String] = Map(
  "hoodie.datasource.write.hive_style_partitioning" -> "true",
  "hoodie.datasource.write.drop.partition.columns" -> "true",
  "hoodie.partition.metafile.use.base.format" -> "true",
  "hoodie.metadata.enable" -> "true",
  "hoodie.datasource.write.reconcile.schema" -> "true",
  "hoodie.schema.on.read.enable" -> "true",
  "hoodie.upsert.shuffle.parallelism" -> "1000",
  "hoodie.bloom.index.parallelism" -> "1000",
  "hoodie.index.type" -> "GLOBAL_BLOOM",
  "hoodie.datasource.write.payload.class" -> "org.apache.hudi.common.model.PartialUpdateAvroPayload"
 )

To Reproduce

Steps to reproduce the behaviour:

  1. Initiate the spark session & pass the hudi configs mentioned above
  2. Choose ID as the primary Key, created a partition column & updated as deDup/preCombine field.
  3. First insert the record by supplying all the columns.
  4. In a partial update, don't pass the created column, pass the schema which will make the created column as null.
ankur334 commented 1 year ago

created column is not null actually but when I am receiving the new event from Kafka then I am not receiving the created column information. I am only getting columns which are updated from the source with the primary key and deDup key i.e. updated field.

But before writing to Hudi's table. I am applying the schema to my DataFrame and that schema contains created field.

Due to applying of schema to DataFrame, columns which are not coming from incoming events are changing into null.

jonvex commented 1 year ago

I was able to reproduce with

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord

val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  option("hoodie.index.type", "GLOBAL_BLOOM").
  mode(Overwrite).
  save(basePath)

val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.drop("partitionpath").write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  option("hoodie.index.type", "GLOBAL_BLOOM").
  option("hoodie.datasource.write.payload.class", "org.apache.hudi.common.model.PartialUpdateAvroPayload").
  option("hoodie.datasource.write.reconcile.schema", "true").
  mode(Append).
  save(basePath)

spark.read.format("hudi").load(basePath).show(false)
jonvex commented 1 year ago

Setting "hoodie.bloom.index.update.partition.path" to "false" should make this work as expected, provided that you don't intend on changing the partition path for records

jonvex commented 1 year ago

@xushiyan should this be the expected behavior?

ad1happy2go commented 1 year ago

@ankur334 Does this solution worked? Please let us know.

zhangxiongbiao commented 2 months ago

I also faced a similar problem. When an incoming event has no partition field, org.apache.hudi.sink.partitioner.BucketAssignFunction#processRecord will use the default HIVE_DEFAULT_PARTITION. This will cause the incoming event to be reassigned to the default partition, emitting a delete record for the old partition path and then emitting a new insert record. Consequently, the partition field will be missing after the insert. If you receive an event with a partition field, then PartialUpdateAvroPayload will work correctly.

Therefore, I think you should not process events without a partition field.

The configuration hoodie.bloom.index.update.partition.path will discard events with a different partition, which may not suit the scenario.

ad1happy2go commented 2 months ago

@zhangxiongbiao PartialUpdateAvroPayload currently doesn't support the records without partition key even with the GLOBAL_BLOOM index.