apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.24k stars 2.39k forks source link

[SUPPORT] ERROR BaseSparkCommitActionExecutor: Error upserting bucketType UPDATE for partition :13 #9119

Closed zyclove closed 4 months ago

zyclove commented 1 year ago

Describe the problem you faced

Occasionally, a schema null error occurs when writing data. By the way, when will the next version be released?

private void upsertAllAction(Dataset<Row> jsonDataSet, long maxUseMemory, String tempPath) {
        int dataKeepTime = 3 * 24 * 60 / config.getTriggerTime();
        jsonDataSet.write()
                .format("org.apache.hudi")
                .option(HoodieTableConfig.TYPE.key(), HoodieTableType.MERGE_ON_READ.name())
                .option(DataSourceWriteOptions.OPERATION().key(), WriteOperationType.UPSERT.value())
                .option(DataSourceWriteOptions.TABLE_TYPE().key(), HoodieTableType.MERGE_ON_READ.name())
                .option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), config.getIdName())
                .option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), Constants.DT)
                .option(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key(), true)
                .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), Constants.UPDATE_TIME)
                .option(HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key(), false)
                .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key(), 200)
                .option(HoodieWriteConfig.FINALIZE_WRITE_PARALLELISM_VALUE.key(), 200)
                .option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key(), DefaultHoodieRecordPayload.class.getName())
                .option(HoodieWriteConfig.AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE.key(), true)
                .option(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key(), true)
                .option(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key(), true)
                .option(HoodieCommonConfig.RECONCILE_SCHEMA.key(), true)
                .option(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), true)
                .option(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.toString())
                .option(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(), true)
                .option(HoodiePayloadConfig.PAYLOAD_CLASS_NAME.key(), DefaultHoodieRecordPayload.class.getName())
                .option(HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), dataKeepTime)
                .option(HoodieCleanConfig.AUTO_CLEAN.key(), true)
                .option(HoodieCleanConfig.CLEANER_INCREMENTAL_MODE_ENABLE.key(), true)
                .option(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), dataKeepTime + 1)
                .option(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), dataKeepTime * 3)
                .option(HoodieCompactionConfig.TARGET_IO_PER_COMPACTION_IN_MB.key(), 500 * 1024)
                .option(HoodieCleanConfig.CLEANER_POLICY.key(), HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS.name())
                .option(HoodieCleanConfig.CLEANER_HOURS_RETAINED.key(), 72)
                .option(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), 128 * 1024 * 1024)
                .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), 256 * 1024 * 1024)
                .option(HoodieCompactionConfig.INLINE_COMPACT.key(), true)
                .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), 0)
                .option(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(), tempPath)
                .option(HoodieMetadataConfig.ENABLE.key(), true)
                .option(HoodieMetadataConfig.MIN_COMMITS_TO_KEEP.key(), dataKeepTime + 1)
                .option(HoodieMetadataConfig.MAX_COMMITS_TO_KEEP.key(), dataKeepTime + 2)
                .option(HoodieMetadataConfig.CLEANER_COMMITS_RETAINED.key(), dataKeepTime)
                .option(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(), maxUseMemory)
                .option(HoodieMemoryConfig.MAX_MEMORY_FOR_COMPACTION.key(), maxUseMemory)
                .option(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, Constants.UPDATE_TIME)
                .option(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY, Constants.UPDATE_TIME)
                .option(HoodieTableConfig.NAME.key(), config.getName())
                .option(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), SimpleKeyGenerator.class.getName())
                .option(HoodieIndexConfig.INDEX_TYPE.key(), HoodieIndex.IndexType.SIMPLE.name())
                .option(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH().key(), true)
                .option(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key(), 20)
                .option(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key(), HoodieIndex.BucketIndexEngineType.SIMPLE.name())
                .option(HoodieLayoutConfig.LAYOUT_TYPE.key(), HoodieStorageLayout.LayoutType.DEFAULT.name())
                .option(HoodieLayoutConfig.LAYOUT_PARTITIONER_CLASS_NAME.key(), SparkBucketIndexPartitioner.class.getName())
                .option(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name())
                .option(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(), HoodieFailedWritesCleaningPolicy.LAZY.name())
                .option("hoodie.write.lock.provider", HiveMetastoreBasedLockProvider.class.getName())
                .option("hoodie.write.lock.hivemetastore.database", "bi_ods_real")
                .option("hoodie.write.lock.hivemetastore.table", getTableName())
                .mode(SaveMode.Append)
                .save(config.getSinkPath());
    }

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

Stacktrace

Add the stacktrace of the error.

3/07/04 06:19:18 WARN InternalKafkaConsumerPool: Pool exceeds its soft max size, cleaning up idle objects...
23/07/04 06:19:19 ERROR BaseSparkCommitActionExecutor: Error upserting bucketType UPDATE for partition :13
org.apache.avro.SchemaParseException: Cannot parse <null> schema
        at org.apache.avro.Schema.parse(Schema.java:1633)
        at org.apache.avro.Schema$Parser.parse(Schema.java:1430)
        at org.apache.avro.Schema$Parser.parse(Schema.java:1418)
        at org.apache.hudi.common.util.InternalSchemaCache.getInternalSchemaByVersionId(InternalSchemaCache.java:225)
        at org.apache.hudi.common.util.InternalSchemaCache.getInternalSchemaByVersionId(InternalSchemaCache.java:231)
        at org.apache.hudi.table.action.commit.HoodieMergeHelper.composeSchemaEvolutionTransformer(HoodieMergeHelper.java:183)
        at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:96)
        at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:372)
        at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:363)
        at org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:79)
        at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
        at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:251)
        at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
        at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
        at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
        at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:133)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.avro.SchemaParseException: Cannot parse <null> schema
        at org.apache.avro.Schema.parse(Schema.java:1633)
        at org.apache.avro.Schema$Parser.parse(Schema.java:1430)
        at org.apache.avro.Schema$Parser.parse(Schema.java:1418)
        at org.apache.hudi.common.util.InternalSchemaCache.getInternalSchemaByVersionId(InternalSchemaCache.java:225)
        at org.apache.hudi.common.util.InternalSchemaCache.getInternalSchemaByVersionId(InternalSchemaCache.java:231)
 at org.apache.hudi.common.util.InternalSchemaCache.getInternalSchemaByVersionId(InternalSchemaCache.java:231)
        at org.apache.hudi.table.action.commit.HoodieMergeHelper.composeSchemaEvolutionTransformer(HoodieMergeHelper.java:183)
        at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:96)
        at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:372)
        at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:363)
        at org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:79)
        at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
danny0405 commented 1 year ago

It is great if you can move the release 0.12.3, the release 0.14.0 may comeout about 2 weeks later.

zyclove commented 1 year ago

@danny0405 Do you mean this is a problem with 0.13.1? If so, Then I will go back to 0.12.3 and try. I took a look at the master branch and many bugs have been fixed, please hurry up and push the release of version 0.14, Looking forward to the next release.

Best regards.

danny0405 commented 1 year ago

Sorry for the unstability, we will be more conservative about code reviewing and merging in the future.

ad1happy2go commented 1 year ago

@zyclove 0.14 version should be coming soon probably end of this month.

zyclove commented 1 year ago

@danny0405 I revert to 0.12.3 with error " org.apache.hudi.exception.HoodieException: cannot find file schema for current commit " .

sertException: Error upserting bucketType UPDATE for partition :0
        at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
        at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
        at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
        at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
        at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
        at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:133)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieException: cannot find file schema for current commit 20230105032024
  at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:111)
        at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
        at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
        at org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:80)
        at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
ad1happy2go commented 1 year ago

@zyclove Can you try to confirm if this issue exists in master too. You can build the jar from master. I can help you on same if needed.

Also, if possible can you come up with a reproducible code for this issue.

zyclove commented 11 months ago

@zyclove 0.14 version should be coming soon probably end of this month.

Will the 0.14 be released at the end of this month? When is it postponed? Have you started beta testing yet? The 0.13 has too many problems.

danny0405 commented 11 months ago

We have started testing for 0.14.0 now.

zyclove commented 11 months ago

We have started testing for 0.14.0 now.

Excuse me, How long until version 0.14 can be officially released?

danny0405 commented 10 months ago

RC1 is out, guess we still have a RC2 there.

zyclove commented 9 months ago

@danny0405 This problem still exists in version 014 too, how to solve it?

ad1happy2go commented 4 months ago

Closing this as this was Fixed via: https://github.com/apache/hudi/pull/9984