apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.36k stars 2.42k forks source link

Data duplication #8981

Open jjtjiang opened 1 year ago

jjtjiang commented 1 year ago

could you please help to see this problem. thanks the hudi table's have duplication data. and the key is contain filed name , For data from the same table, some _hoodie_record_key fields contain the primary key field name, and some do not. For example, some have id:32 and some have just 32 .

_hoodie_commit_time _hoodie_commit_seqno _hoodie_record_key 20230613014158100 20230613014158192_3_1768737 2026,412,533,770
20230615022555500 20230615022555550_9_9474 invoice_no:20264,invoice_type:125,proj_no:33770

.hoodie/hoodie.properties hoodie.table.timeline.timezone=LOCAL hoodie.table.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator hoodie.table.precombine.field=cdc_action_time_long hoodie.table.version=5 hoodie.database.name= hoodie.datasource.write.hive_style_partitioning=true hoodie.partition.metafile.use.base.format=false hoodie.archivelog.folder=archived hoodie.table.name=ods_ec_p1ec_refund_ec_hudi hoodie.compaction.payload.class=org.apache.hudi.common.model.OverwriteWithLatestAvroPayload hoodie.populate.meta.fields=true hoodie.table.type=MERGE_ON_READ hoodie.datasource.write.partitionpath.urlencode=false hoodie.table.base.file.format=PARQUET hoodie.datasource.write.drop.partition.columns=false hoodie.timeline.layout.version=1 hoodie.table.partition.fields= hoodie.table.recordkey.fields=invoice_no,invoice_type,proj_no hoodie.table.checksum=338357838

danny0405 commented 1 year ago

How did you write the table then?

jjtjiang commented 1 year ago

Referenc

The possible cause is that some hudi0.10.2 data is residual in hive metadata. I used the same table name earlier on hudi-0.10, but I have removed the table. the code is as below: val optionStr = """ |{ | "hoodie.metadata.enable": "false", | "hoodie.datasource.compaction.async.enable": "true", | "hoodie.compact.inline": "true", | "hoodie.clean.automatic": "true", | "hoodie.clean.async": "false", | "hoodie.cleaner.commits.retained": "40", | "hoodie.archive.automatic": "true", | "hoodie.archive.async": "false", | "hoodie.keep.max.commits": "50", | "hoodie.keep.min.commits": "45", | "hoodie.datasource.hive_sync.support_timestamp": "false", | "hoodie.datasource.write.hive_style_partitioning": "true", | "hoodie.datasource.hive_sync.mode": "HMS", | "hoodie.finalize.write.parallelism": "6" | } |""".stripMargin val options = JSON.parseObject(optionStr, classOf[java.util.Map[String, String]]).asScala upsertDs.toDF().write.format(hudiOrgPro) .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "") .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key(), classOf[NonpartitionedKeyGenerator].getName) .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key(), "") .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key(), classOf[NonPartitionedExtractor].getName) .option(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key(), "false") .option(DataSourceWriteOptions.TABLE_TYPE.key(), "") .option(HoodieWriteConfig.TBL_NAME.key(), "MERGE_ON_READ") .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "invoice_no,invoice_type,proj_no") .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "invoice_no,invoice_type,proj_no") .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED.key(), "true") .option(DataSourceWriteOptions.HIVE_DATABASE.key(), "ods_ca") .option(DataSourceWriteOptions.HIVE_TABLE.key(), "ods_ec_p1ec_refund_ec_hudi") .option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "30") .option(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key, "30") .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "30") .option(HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key, "30") .option(HoodieIndexConfig.INDEX_TYPE.key(), HoodieIndex.IndexType.BLOOM.name()) .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE.key(), "true") .option(DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE.key(), value = true) //merge compaction .option(HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_MERGE.key(), "0.8") .option(HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION.key(), "0.8") //metadata config .option(HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "50") //compaction .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "5") .option(HoodieStorageConfig.PARQUET_OUTPUT_TIMESTAMP_TYPE.key(), "TIMESTAMP_MICROS") .options(options) // .option(DataSourceWriteOptions.HIVE_SYNC_MODE.key(),"hms") .mode(SaveMode.Append) .save(path)

jjtjiang commented 1 year ago
  1. record key fields, the format is field1:value1,field2:value2,... it's genrerate by upsert
  2. multiple record key fields the format of record key is value1,value2,... is's generate by bulksert.
ad1happy2go commented 1 year ago

@jjtjiang Did this upsert ran using some older hudi version? It's a bit weird why hudi will write _hoodie_record_key as key:value with the latest version.

Did you manually deleted the directory before running using new hudi version?

There are high chances that this was an external table and in case you only dropped table it wouldn't have deleted the hudi directory. Please confirm if you have just dropped the table and not cleaned up the directory.

jjtjiang commented 1 year ago

@jjtjiang Did this upsert ran using some older hudi version? It's a bit weird why hudi will write _hoodie_record_key as key:value with the latest version.

Did you manually deleted the directory before running using new hudi version?

There are high chances that this was an external table and in case you only dropped table it wouldn't have deleted the hudi directory. Please confirm if you have just dropped the table and not cleaned up the directory.

@ad1happy2go these bulkset and upsert all ran using hudi 0.12 . the rigth _hoodie_record_key should only contain values ,not contain key , right ?

yes , i had manually deleted the directory before running using new hudi version . i use bulksert to init table ,then use upsert to sync increment data. i have see the file create time ,it is newly generated

the upsert and bulksert params is the same, except the param "hoodie.datasource.write.operation".

ad1happy2go commented 1 year ago

@jjtjiang I was able to reproduce this issue with version 0.12.2. Bulk insert is not writing rows properly, as this is composite key so it must write with invoice_no:20264,invoice_type:125,proj_no:33770.

This issue got fixed in 0.13.0. can you use either 0.13.0 or 0.13.1

Below code can be used to reproduce -

val dt = spark.sql("""
  select cast(1 as bigint) as invoice_no,cast(1 as bigint) as proj_no,cast(1 as bigint) as invoice_type, 1 as _hudi_last_update, '123' as _hudi_partition
""")

val options = Map[String, String]()
dt.write.format("hudi")
.option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "")
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key(), classOf[NonpartitionedKeyGenerator].getName)
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key(), "")
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key(), classOf[NonPartitionedExtractor].getName)
.option(DataSourceWriteOptions.TABLE_TYPE.key(), "COPY_ON_WRITE")
.option(HoodieWriteConfig.TBL_NAME.key(), "MERGE_ON_READ")
.option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "invoice_no,proj_no,invoice_type")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "_hudi_last_update")
.mode(SaveMode.Append)
.save(path)

spark.read.format("hudi").load(path).show()
ad1happy2go commented 1 year ago

@jjtjiang Did you got a chance to try out with hudi 0.13.X version. Let us know in case you need any other support on this.

jjtjiang commented 1 year ago

I will try out with hudi 0.13.x. thankjs

aizain commented 1 year ago

I also encountered it in 0.12.3. COW table

mark

ad1happy2go commented 1 year ago

@jjtjiang @aizain Did you got a chance to try with version 0.13.X.