apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.23k stars 2.39k forks source link

[SUPPORT] Custom payload records ordering is different for incremental and snapshot queries #8287

Open 15663671003 opened 1 year ago

15663671003 commented 1 year ago

I implemented a custom payload based on HoodieRecordPayload.java, but there were problems. When I use incremental queries, record_time is the value of the incremental payload (incorrect). When running a snapshot query, record_time is an old value (correct), which does not meet my expectations. Does the payload obtained by incremental queries differ from the snapshot query results? Please help me

 /* Omitted content */

public class CustomPayload extends OverwriteWithLatestAvroPayload {
  /* Omitted content */

  @Override
  public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException {
    if (recordBytes.length == 0) {
      return Option.empty();
    }

    GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema);
    if (!needUpdatingPersistedRecord(currentValue, incomingRecord, properties)) {
      return Option.of(currentValue);
    }

    /*custom code*/
    if (((GenericRecord) currentValue).get("record_time") != null) {
      incomingRecord.put("record_time", ((GenericRecord) currentValue).get("record_time"));
    }

    eventTime = updateEventTime(incomingRecord, properties);
    return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord);
  }

 /* Omitted content */

  protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue,
                                                IndexedRecord incomingRecord, Properties properties) {

     /* Omitted content */

    return (((Comparable) persistedOrderingVal).compareTo(incomingOrderingVal) < 0) && (
            ((GenericRecord) currentValue).get("valid").equals(true) || ((GenericRecord) incomingRecord).get("valid").equals(true)) && (
                    ((GenericRecord) currentValue).get("content_md5") == null
                            || !((GenericRecord) currentValue).get("content_md5").equals(((GenericRecord) incomingRecord).get("content_md5"))
    );
  }

}

Expected behavior snapshot query

>>> spark.read.format("hudi").load("*****").filter("*********").show(truncate=False)
+-------------------+----------------------------+----------------------------------------------------------------
+----------------------+------------------------------------------------------------------------+----                                 
|_hoodie_commit_time|_hoodie_commit_seqno        |_hoodie_record_key                                              
|_hoodie_partition_path|_hoodie_file_name                                                       
|valid|update_time        |record_time        |content_md5                     
|id                                                                                                                              |
+-------------------+----------------------------+----------------------------------------------------------------
+----------------------+------------------------------------------------------------------------+----                                 
|20230324152704225  |20230324152704225_13_2563860|7306da3dd0c41ff504447981c4e850949db69524154c0c5bf85e62758babf3cc
|                      |00000013-ea30-4f9d-9704-e4f82fceb940-0                                  |
false|2023-03-24 14:48:09|2023-03-01 21:40:42|df3c2a9f8eaf5b8eec26b363cc67003f
|7306da3dd0c41ff504447981c4e850949db69524154c0c5bf85e62758babf3cc|                                                  

incremental query

>>> df = spark.read.format("hudi").options(**{'hoodie.datasource.query.type': "incremental", "hoodie.datasource.read.begin.instanttime": '20230324151944584'}).load("******")
>>> df.filter("*********").show(truncate=False)
+-------------------+----------------------------+----------------------------------------------------------------
+----------------------+------------------------------------------------------------------------+----                                 
|_hoodie_commit_time|_hoodie_commit_seqno        |_hoodie_record_key                                              
|_hoodie_partition_path|_hoodie_file_name                                                       
|valid|update_time        |record_time        |content_md5                     
|id                                                                                                                              |
+-------------------+----------------------------+----------------------------------------------------------------
+----------------------+------------------------------------------------------------------------+----                                 
|20230324152704225  |20230324152704225_13_2563860|7306da3dd0c41ff504447981c4e850949db69524154c0c5bf85e62758babf3cc
|                      |00000013-ea30-4f9d-9704-e4f82fceb940-0                                  |
false|2023-03-24 14:48:09|2023-03-24 14:48:09|df3c2a9f8eaf5b8eec26b363cc67003f
|7306da3dd0c41ff504447981c4e850949db69524154c0c5bf85e62758babf3cc|          

I read kafka from spark structured streaming, write it to hudi use foreachBatch. There are two time fields, update_time and record_time. By default, the values of the two time fields are the same. Combined with the custom payload, it is implemented to determine whether to update. If updated, the persistent payload record_time overwrite new payload , I think in incremental queries, record_time should be the same as the snapshot query, but the result is different from what I expected. Why is this, pls help me

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

15663671003 commented 1 year ago

I want to maintain an incremental read program when valid=true or update_time != record_time, the record is taken out

ad1happy2go commented 1 year ago

@15663671003 Can you please post the table properties to help us triage better.

15663671003 commented 1 year ago

on hdfs hoodie.properities

#Updated at 2023-01-13T06:27:15.972Z
#Fri Jan 13 14:27:15 CST 2023
hoodie.table.timeline.timezone=LOCAL
hoodie.table.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
hoodie.table.precombine.field=update_time
hoodie.table.version=5
hoodie.database.name=
hoodie.datasource.write.hive_style_partitioning=true
hoodie.table.checksum=46617028
hoodie.partition.metafile.use.base.format=false
hoodie.archivelog.folder=archived
hoodie.table.name=test.hudi_custom_payload_test
hoodie.compaction.payload.class=org.apache.hudi.common.model.CustomPayload
hoodie.populate.meta.fields=true
hoodie.table.type=MERGE_ON_READ
hoodie.datasource.write.partitionpath.urlencode=false
hoodie.table.base.file.format=PARQUET
hoodie.datasource.write.drop.partition.columns=false
hoodie.table.metadata.partitions=files
hoodie.timeline.layout.version=1
hoodie.table.recordkey.fields=id
hoodie.table.partition.fields=

python job write config

{
    "hoodie.table.name": hive_table,
    "hoodie.datasource.write.recordkey.field": "id",
    "hoodie.datasource.write.table.name": hive_table,
    "hoodie.datasource.write.operation": "upsert",
    "hoodie.datasource.write.precombine.field": "update_time",
    "hoodie.datasource.write.table.type": "MERGE_ON_READ",
    "hoodie.upsert.shuffle.parallelism": 300,
    "hoodie.bulkinsert.shuffle.parallelism": 2000,
    "hoodie.insert.shuffle.parallelism": 2000,
    "hoodie.compact.inline": "false",
    "hoodie.compact.inline.max.delta.commits": 1,
    "hoodie.compact.schedule.inline": "true",
    "hoodie.cleaner.policy": "KEEP_LATEST_FILE_VERSIONS",
    "hoodie.cleaner.fileversions.retained": keep_max_file_version,
    "hoodie.keep.min.commits": keep_min_commits,
    "hoodie.keep.max.commits": keep_max_commits,
    "hoodie.parquet.max.file.size": 1024 * 1024 * 100,
    "hoodie.parquet.small.file.limit": 1024 * 1024 * 60,
    "hoodie.parquet.compression.codec": "snappy",
    "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.NonPartitionedExtractor",
    "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.NonpartitionedKeyGenerator",
    "hoodie.datasource.write.hive_style_partitioning": "true",
    "hoodie.datasource.hive_sync.enable": "true",
    "hoodie.datasource.hive_sync.database": db,
    "hoodie.datasource.hive_sync.table": table,
    "hoodie.datasource.hive_sync.mode": "hms",
    "hoodie.index.type": "BUCKET",
    "hoodie.bucket.index.num.buckets": 100,
    "hoodie.index.bucket.engine": "SIMPLE",  # CONSISTENT_HASHING
    "hoodie.storage.layout.partitioner.class": "org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner",
    "hoodie.datasource.write.payload.class": "org.apache.hudi.common.model.CustomPayload",
}

@ad1happy2go

ad1happy2go commented 1 year ago

@15663671003 Sorry for the delay in triaging this one. I was tried to reproduce and debug but couldn't do due to incomplete code. Can you share the entire code so I can triage it better.