apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.34k stars 2.42k forks source link

[SUPPORT]0.12.3 upgrade to 0.14.0 data duplication #10407

Open zyclove opened 9 months ago

zyclove commented 9 months ago

Describe the problem you faced For a Hudi (0.12.3) table with existing data, upgrade to 0.14.0. After the upgrade, it is found that the data is duplicated. Check whether the old data file still exists and has not been converted to the new data file. What do I need to do now?

image

To Reproduce

Steps to reproduce the behavior:

1.0.12.3 hudi table

  1. upgrade to 0.14.0
  2. SELECT * FROM hudi_table_changes('bi_ods_real.online_offline_event_batch_rt', 'latest_state', 'earliest') where id ='6c60e6270272ea8499sygv';
  3. data duplication , not be merged

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

parisni commented 9 months ago

can you share with us the hoodie configs involved in your ingestion ?

beyond1920 commented 9 months ago

@zyclove Data deduplication caused by records with same primary key value are written into different file groups. It seems like the first commit use simple bucket index, because the file group id has an bucket id as prefix. However in the second commit, file group id does not has bucket id as prefix, it seems that the simple bucket index did not take effect during this write job.

zyclove commented 9 months ago

@parisni @beyond1920 Thank you very much for helping me take a look. The code changes before and after the upgrade are as follows. Is there any good way to merge layers into new formats now?

Before upgrading

 private void upsertAllAction(Dataset<Row> jsonDataSet, long maxUseMemory, String tempPath) {
        int dataKeepTime = 3 * 24 * 60 / config.getTriggerTime();
        jsonDataSet.write()
                .format("org.apache.hudi")
                .option(HoodieTableConfig.TYPE.key(), HoodieTableType.MERGE_ON_READ.name())
                .option(DataSourceWriteOptions.OPERATION().key(), WriteOperationType.UPSERT.value())
                .option(DataSourceWriteOptions.TABLE_TYPE().key(), HoodieTableType.MERGE_ON_READ.name())
                .option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), config.getIdName())
                .option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), Constants.DT)
                .option(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key(), true)
                .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), Constants.UPDATE_TIME)
                .option(HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key(), false)
                .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key(), config.getUpsertParallelism())
                .option(HoodieWriteConfig.FINALIZE_WRITE_PARALLELISM_VALUE.key(), config.getUpsertParallelism())
                .option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key(), DefaultHoodieRecordPayload.class.getName())
                .option(HoodieWriteConfig.AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE.key(), true)
                .option(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key(), true)
//                .option(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key(), true)
                .option(HoodieCommonConfig.RECONCILE_SCHEMA.key(), true)
                .option(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), true)
                .option(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.toString())
                .option(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(), true)
                .option(HoodiePayloadConfig.PAYLOAD_CLASS_NAME.key(), DefaultHoodieRecordPayload.class.getName())
                .option(HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), dataKeepTime)
                .option(HoodieCleanConfig.AUTO_CLEAN.key(), true)
                .option(HoodieCleanConfig.CLEANER_INCREMENTAL_MODE_ENABLE.key(), true)
                .option(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), dataKeepTime + 1)
                .option(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), dataKeepTime * 3)
                .option(HoodieCompactionConfig.TARGET_IO_PER_COMPACTION_IN_MB.key(), 500 * 1024)
                .option(HoodieCleanConfig.CLEANER_POLICY.key(), HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS.name())
                .option(HoodieCleanConfig.CLEANER_HOURS_RETAINED.key(), 72)
                .option(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), 128 * 1024 * 1024)
                .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), 256 * 1024 * 1024)
                .option(HoodieCompactionConfig.INLINE_COMPACT.key(), true)
                .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), 0)
                .option(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(), tempPath)
                .option(HoodieMetadataConfig.ENABLE.key(), true)
                .option(HoodieMetadataConfig.MIN_COMMITS_TO_KEEP.key(), dataKeepTime + 1)
                .option(HoodieMetadataConfig.MAX_COMMITS_TO_KEEP.key(), dataKeepTime + 2)
                .option(HoodieMetadataConfig.CLEANER_COMMITS_RETAINED.key(), dataKeepTime)
                .option(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(), maxUseMemory)
                .option(HoodieMemoryConfig.MAX_MEMORY_FOR_COMPACTION.key(), maxUseMemory)
                .option(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, Constants.UPDATE_TIME)
                .option(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY, Constants.UPDATE_TIME)
                .option(HoodieTableConfig.NAME.key(), config.getName())
//                .option(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), ComplexKeyGenerator.class.getName())
                .option(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), SimpleKeyGenerator.class.getName())
                .option(HoodieIndexConfig.INDEX_TYPE.key(), HoodieIndex.IndexType.SIMPLE.name())
//                .partitionBy(HoodieIndexConfig.INDEX_CLASS_NAME.key(), HoodieSparkConsistentBucketIndex.class.getName())
                .option(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH().key(), true)
                .option(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key(), 20)
                .option(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key(), HoodieIndex.BucketIndexEngineType.SIMPLE.name())
//                .option(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key(), HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING.name())
                .option(HoodieLayoutConfig.LAYOUT_TYPE.key(), HoodieStorageLayout.LayoutType.DEFAULT.name())
                .option(HoodieLayoutConfig.LAYOUT_PARTITIONER_CLASS_NAME.key(), SparkBucketIndexPartitioner.class.getName())
//                .option(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name())
                .option(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.SINGLE_WRITER.name())
                .option(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(), HoodieFailedWritesCleaningPolicy.LAZY.name())
//                .option("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider")
                .option("hoodie.write.lock.provider", HiveMetastoreBasedLockProvider.class.getName())
                .option("hoodie.write.lock.hivemetastore.database", "bi_ods_real")
                .option("hoodie.write.lock.hivemetastore.table", getTableName())
                .mode(SaveMode.Append)
                .save(config.getSinkPath());
    }

}

now the code is follow .

private void upsertAllAction(Dataset<Row> jsonDataSet, long maxUseMemory, String tempPath) {
        int dataKeepTime = 3 * 24 * 60 / config.getTriggerTime();
        jsonDataSet.write()
                .format("org.apache.hudi")
                .option(HoodieTableConfig.TYPE.key(), HoodieTableType.MERGE_ON_READ.name())
                .option(DataSourceWriteOptions.OPERATION().key(), WriteOperationType.UPSERT.value())
                .option(DataSourceWriteOptions.TABLE_TYPE().key(), HoodieTableType.MERGE_ON_READ.name())
                .option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), config.getIdName())
                .option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), Constants.DT)
                .option(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key(), true)
                .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), Constants.UPDATE_TIME)
                .option(HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key(), false)
                .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key(), config.getUpsertParallelism())
                .option(HoodieWriteConfig.FINALIZE_WRITE_PARALLELISM_VALUE.key(), config.getUpsertParallelism())
                .option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key(), DefaultHoodieRecordPayload.class.getName())
//                .option(HoodieWriteConfig.AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE.key(), true)
//                .option(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key(), true)
//                .option(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key(), true)
//                .option(HoodieCommonConfig.RECONCILE_SCHEMA.key(), true)
//                .option(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), true)
                .option(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.toString())
//                .option(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(), true)
                .option(HoodiePayloadConfig.PAYLOAD_CLASS_NAME.key(), DefaultHoodieRecordPayload.class.getName())
                .option(HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), dataKeepTime)
                .option(HoodieCleanConfig.AUTO_CLEAN.key(), true)
                .option(HoodieCleanConfig.CLEANER_INCREMENTAL_MODE_ENABLE.key(), true)
                .option(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), dataKeepTime + 1)
                .option(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), dataKeepTime * 3)
                .option(HoodieCompactionConfig.TARGET_IO_PER_COMPACTION_IN_MB.key(), 500 * 1024)
                .option(HoodieCleanConfig.CLEANER_POLICY.key(), HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS.name())
                .option(HoodieCleanConfig.CLEANER_HOURS_RETAINED.key(), 72)
                .option(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), 128 * 1024 * 1024)
                .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), 256 * 1024 * 1024)
                .option(HoodieCompactionConfig.INLINE_COMPACT.key(), true)
                .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), 0)
                .option(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(), tempPath)
                .option(HoodieMetadataConfig.ENABLE.key(), true)
                .option(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), dataKeepTime + 1)
                .option(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), dataKeepTime + 2)
                .option(HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), dataKeepTime)
                .option(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(), maxUseMemory)
                .option(HoodieMemoryConfig.MAX_MEMORY_FOR_COMPACTION.key(), maxUseMemory)
                .option(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, Constants.UPDATE_TIME)
                .option(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY, Constants.UPDATE_TIME)
                .option(HoodieTableConfig.NAME.key(), config.getName())
//                .option(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), ComplexKeyGenerator.class.getName())
                .option(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), SimpleKeyGenerator.class.getName())
                .option(HoodieIndexConfig.INDEX_TYPE.key(), HoodieIndex.IndexType.SIMPLE.name())
//                .option(HoodieIndexConfig.INDEX_TYPE.key(), HoodieIndex.IndexType.BUCKET.name())
//                .option(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key(), true)
//                .option(HoodieIndexConfig.INDEX_TYPE.key(), HoodieIndex.IndexType.RECORD_INDEX.name())
//                .partitionBy(HoodieIndexConfig.INDEX_CLASS_NAME.key(), HoodieSparkConsistentBucketIndex.class.getName())
                .option(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH().key(), true)
                .option(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key(), config.getBucketSize())
                .option(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key(), HoodieIndex.BucketIndexEngineType.SIMPLE.name())
//                .option(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key(), HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING.name())
                .option(HoodieLayoutConfig.LAYOUT_TYPE.key(), HoodieStorageLayout.LayoutType.DEFAULT.name())
                .option(HoodieLayoutConfig.LAYOUT_PARTITIONER_CLASS_NAME.key(), SparkBucketIndexPartitioner.class.getName())
//                .option(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name())
                .option(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.SINGLE_WRITER.name())
                .option(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(), HoodieFailedWritesCleaningPolicy.LAZY.name())
//                .option("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider")
//                .option("hoodie.write.lock.provider", HiveMetastoreBasedLockProvider.class.getName())
                .option("hoodie.write.lock.hivemetastore.database", "bi_ods_real")
                .option("hoodie.write.lock.hivemetastore.table", getTableName())
                .option(HoodieLockConfig.ZK_LOCK_KEY.key(), "bi_ods_real." + getTableName())
                .mode(SaveMode.Append)
                .save(config.getSinkPath());
    }
zyclove commented 9 months ago

image

@beyond1920 @parisni @danny0405 @nsivabalan

In addition, it is very strange that after this version is upgraded, the generated files start with 00000018 and 5f1c45ef. Is this abnormal? Is there a critical bug?

beyond1920 commented 9 months ago

@zyclove Please try to update the value of HoodieIndexConfig.INDEX_TYPE.key to BUCKET? I have upgrade 150+ Spark sql jobs internally which written to HUDI tables from version 010 to version 014, and not encountered similar issues. I guess this may not be a common issue.

ad1happy2go commented 9 months ago

@zyclove Also, Any reason why you are setting DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH. This config will extract the partition values from physical partition path.

zyclove commented 9 months ago

@beyond1920 @ad1happy2go I must set HoodieIndexConfig.INDEX_TYPE.key to BUCKET ? Previously it was SIMPLE mode.

DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PAT is always set to true. Will turning this on have an impact? Will there be no problem if it is set to false now? Is there any way to restore and merge the data files into a new format without rebuilding the table and running data without duplicate data?

zyclove commented 9 months ago

@beyond1920 @ad1happy2go Directly setting HoodieIndexConfig.INDEX_TYPE.key to BUCKET, the following error will be reported.

Caused by: java.lang.Exception: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20240102194716196
    at com.tuya.HuDiMergeFunction.call(HuDiMergeFunction.java:117)
    at com.tuya.HuDiMergeFunction.call(HuDiMergeFunction.java:41)
    at org.apache.spark.sql.streaming.DataStreamWriter.$anonfun$foreachBatch$1(DataStreamWriter.scala:489)
    at org.apache.spark.sql.streaming.DataStreamWriter.$anonfun$foreachBatch$1$adapted(DataStreamWriter.scala:489)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
    at org.apache.spark.sql.execution.streaming.OneTimeExecutor.execute(TriggerExecutor.scala:39)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
    ... 1 more
Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20240102194716196
    at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:70)
    at org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExecutor.execute(SparkUpsertDeltaCommitActionExecutor.java:45)
    at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:98)
    at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:88)
    at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:142)
    at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:224)
    at org.apache.hudi.HoodieSparkSqlWriterInternal.liftedTree1$1(HoodieSparkSqlWriter.scala:504)
    at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:502)
    at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:204)
    at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:121)
    at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
    at com.tuya.HuDiMergeFunction.upsertAllAction(HuDiMergeFunction.java:236)
    at com.tuya.HuDiMergeFunction.call(HuDiMergeFunction.java:110)
    ... 28 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 11.0 failed 4 times, most recent failure: Lost task 8.3 in stage 11.0 (TID 7503) (172.30.17.80 executor 15): java.lang.RuntimeException: java.lang.NumberFormatException: For input string: "b1861d1d"
    at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
    at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:44)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:224)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: For input string: "b1861d1d"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Integer.parseInt(Integer.java:580)
    at java.lang.Integer.parseInt(Integer.java:615)
    at org.apache.hudi.index.bucket.BucketIdentifier.bucketIdFromFileId(BucketIdentifier.java:79)
    at org.apache.hudi.index.bucket.HoodieSimpleBucketIndex.lambda$loadBucketIdToFileIdMappingForPartition$0(HoodieSimpleBucketIndex.java:60)
    at java.util.ArrayList.forEach(ArrayList.java:1259)
    at org.apache.hudi.index.bucket.HoodieSimpleBucketIndex.loadBucketIdToFileIdMappingForPartition(HoodieSimpleBucketIndex.java:56)
    at org.apache.hudi.index.bucket.HoodieSimpleBucketIndex$1.computeNext(HoodieSimpleBucketIndex.java:94)
    at org.apache.hudi.index.bucket.HoodieSimpleBucketIndex$1.computeNext(HoodieSimpleBucketIndex.java:87)
    at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
    ... 25 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
    at org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:366)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
    at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:366)
    at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:314)
    at org.apache.hudi.data.HoodieJavaPairRDD.countByKey(HoodieJavaPairRDD.java:105)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.buildProfile(BaseSparkCommitActionExecutor.java:200)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:174)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:86)
    at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:63)
    ... 69 more
Caused by: java.lang.RuntimeException: java.lang.NumberFormatException: For input string: "b1861d1d"
    at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
    at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:44)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:224)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: For input string: "b1861d1d"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Integer.parseInt(Integer.java:580)
    at java.lang.Integer.parseInt(Integer.java:615)
    at org.apache.hudi.index.bucket.BucketIdentifier.bucketIdFromFileId(BucketIdentifier.java:79)
    at org.apache.hudi.index.bucket.HoodieSimpleBucketIndex.lambda$loadBucketIdToFileIdMappingForPartition$0(HoodieSimpleBucketIndex.java:60)
    at java.util.ArrayList.forEach(ArrayList.java:1259)
    at org.apache.hudi.index.bucket.HoodieSimpleBucketIndex.loadBucketIdToFileIdMappingForPartition(HoodieSimpleBucketIndex.java:56)
    at org.apache.hudi.index.bucket.HoodieSimpleBucketIndex$1.computeNext(HoodieSimpleBucketIndex.java:94)
    at org.apache.hudi.index.bucket.HoodieSimpleBucketIndex$1.computeNext(HoodieSimpleBucketIndex.java:87)
    at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
    ... 25 more
beyond1920 commented 9 months ago

@zyclove Guess the previous writer jobs used simple bucket index, and the latest writer jobs did not. It leads to data deduplication, because records with same primary key value are written into different file groups. And the reason of exception in your last message is there already existed file groups which do not adhere to the simple bucket index rules. The first 8 characters are not bucket id. Could you show the old file groups before you upgrade, are those file groups contains 8 characters which represents the bucket id as the prefix? Or show the logs of old writer jobs. It could help us verify whether our previous guess is correct or not.

zyclove commented 9 months ago
image image
                .option(HoodieIndexConfig.INDEX_TYPE.key(), HoodieIndex.IndexType.SIMPLE.name())

Why are there two types of logs during the execution of this task? This has been set HoodieIndexConfig.INDEX_TYPE.key() to HoodieIndex.IndexType.SIMPLE.name(). @beyond1920