apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.41k stars 2.43k forks source link

[SUPPORT]hudi upsert data Caused by: org.apache.hadoop.fs.PathIsNotEmptyDirectoryException #9029

Open zyclove opened 1 year ago

zyclove commented 1 year ago

upsert data code, but I check the temp dir is empty.

image

private void upsertAllAction(Dataset<Row> jsonDataSet, long maxUseMemory, String tempPath) {
        int dataKeepTime = 3 * 24 * 60 / config.getTriggerTime();
        jsonDataSet.write()
                .format("org.apache.hudi")
                .option(HoodieTableConfig.TYPE.key(), HoodieTableType.MERGE_ON_READ.name())
                .option(DataSourceWriteOptions.OPERATION().key(), WriteOperationType.UPSERT.value())
                .option(DataSourceWriteOptions.TABLE_TYPE().key(), HoodieTableType.MERGE_ON_READ.name())
                .option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), config.getIdName())
                .option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), Constants.DT)
                .option(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key(), true)
                .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), Constants.UPDATE_TIME)
                .option(HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key(), false)
                .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key(), 200)
                .option(HoodieWriteConfig.FINALIZE_WRITE_PARALLELISM_VALUE.key(), 200)
                .option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key(), DefaultHoodieRecordPayload.class.getName())
                .option(HoodieWriteConfig.AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE.key(), true)
                .option(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key(), true)
                .option(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key(), true)
                .option(HoodieCommonConfig.RECONCILE_SCHEMA.key(), true)
                .option(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), true)
                .option(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.toString())
                .option(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(), true)
                .option(HoodiePayloadConfig.PAYLOAD_CLASS_NAME.key(), DefaultHoodieRecordPayload.class.getName())
                .option(HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), dataKeepTime)
                .option(HoodieCleanConfig.AUTO_CLEAN.key(), true)
                .option(HoodieCleanConfig.CLEANER_INCREMENTAL_MODE_ENABLE.key(), true)
                .option(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), dataKeepTime + 1)
                .option(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), dataKeepTime * 3)
                .option(HoodieCompactionConfig.TARGET_IO_PER_COMPACTION_IN_MB.key(), 500 * 1024)
                .option(HoodieCleanConfig.CLEANER_POLICY.key(), HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS.name())
                .option(HoodieCleanConfig.CLEANER_HOURS_RETAINED.key(), 72)
                .option(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), 128 * 1024 * 1024)
                .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), 256 * 1024 * 1024)
                .option(HoodieCompactionConfig.INLINE_COMPACT.key(), true)
                .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), 0)
                .option(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(), tempPath)
                .option(HoodieMetadataConfig.ENABLE.key(), true)
                .option(HoodieMetadataConfig.MIN_COMMITS_TO_KEEP.key(), dataKeepTime + 1)
                .option(HoodieMetadataConfig.MAX_COMMITS_TO_KEEP.key(), dataKeepTime + 2)
                .option(HoodieMetadataConfig.CLEANER_COMMITS_RETAINED.key(), dataKeepTime)
                .option(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(), maxUseMemory)
                .option(HoodieMemoryConfig.MAX_MEMORY_FOR_COMPACTION.key(), maxUseMemory)
                .option(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, Constants.UPDATE_TIME)
                .option(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY, Constants.UPDATE_TIME)
                .option(HoodieTableConfig.NAME.key(), config.getName())
                .option(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), ComplexKeyGenerator.class.getName())
                .option(HoodieIndexConfig.INDEX_TYPE.key(), HoodieIndex.IndexType.SIMPLE.name())
                .option(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH().key(), true)
                .option(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key(), 20)
                .option(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key(), HoodieIndex.BucketIndexEngineType.SIMPLE.name())
                .option(HoodieLayoutConfig.LAYOUT_TYPE.key(), HoodieStorageLayout.LayoutType.DEFAULT.name())
                .option(HoodieLayoutConfig.LAYOUT_PARTITIONER_CLASS_NAME.key(), SparkBucketIndexPartitioner.class.getName())
                .option(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name())
                .option(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(), HoodieFailedWritesCleaningPolicy.LAZY.name())
                .option("hoodie.write.lock.provider", HiveMetastoreBasedLockProvider.class.getName())
                .option("hoodie.write.lock.hivemetastore.database", "bi_ods_real")
                .option("hoodie.write.lock.hivemetastore.table", getTableName())
                .mode(SaveMode.Append)
                .save(config.getSinkPath());
    }

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

3/06/21 07:52:04 WARN HiveConf: HiveConf of name hive.server2.thrift.url does not exist
23/06/21 07:52:04 WARN BaseTableMetadata: Metadata record for dt=202306 encountered some files to be deleted which was not added before. Ignoring the spurious deletes as the `_hoodie.metadata.ignore.spurious.deletes` config is set to true
23/06/21 07:52:06 WARN HiveConf: HiveConf of name hive.server2.thrift.url does not exist
23/06/21 07:52:13 ERROR HoodieTimelineArchiver: Failed to archive commits, .commit file: 20220429184323.compaction.requested
org.apache.hudi.exception.HoodieIOException: `s3a://xxdata/hudi/tables/ods_api_test_case/.hoodie/.temp/20220429184323': Directory is not empty
        at org.apache.hudi.common.fs.FSUtils.deleteDir(FSUtils.java:750)
        at org.apache.hudi.table.marker.DirectWriteMarkers.deleteMarkerDir(DirectWriteMarkers.java:82)
        at org.apache.hudi.client.HoodieTimelineArchiver.deleteAnyLeftOverMarkers(HoodieTimelineArchiver.java:680)
        at org.apache.hudi.client.HoodieTimelineArchiver.archive(HoodieTimelineArchiver.java:654)
        at org.apache.hudi.client.HoodieTimelineArchiver.archiveIfRequired(HoodieTimelineArchiver.java:175)
        at org.apache.hudi.client.BaseHoodieTableServiceClient.archive(BaseHoodieTableServiceClient.java:580)
        at org.apache.hudi.client.BaseHoodieWriteClient.archive(BaseHoodieWriteClient.java:783)
        at org.apache.hudi.client.BaseHoodieWriteClient.autoArchiveOnCommit(BaseHoodieWriteClient.java:573)
        at org.apache.hudi.client.BaseHoodieWriteClient.mayBeCleanAndArchive(BaseHoodieWriteClient.java:539)
        at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:245)
        at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:102)
        at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:954)
        at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:381)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
        at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:115)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
        at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:112)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:108)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:112)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:108)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:83)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:108)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:95)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:93)
        at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:136)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
        at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
        at com.tuya.HuDiMergeFunction.upsertAllAction(HuDiMergeFunction.java:234)
        at com.tuya.HuDiMergeFunction.call(HuDiMergeFunction.java:115)
        at com.tuya.HuDiMergeFunction.call(HuDiMergeFunction.java:46)
        at org.apache.spark.sql.streaming.DataStreamWriter.$anonfun$foreachBatch$1(DataStreamWriter.scala:489)
        at org.apache.spark.sql.streaming.DataStreamWriter.$anonfun$foreachBatch$1$adapted(DataStreamWriter.scala:489)
        at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
        at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
      at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
        at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
Caused by: org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: `s3a://xxx/hudi/tables/ods_api_test_case/.hoodie/.temp/20220429184323': Directory is not empty
        at org.apache.hadoop.fs.s3a.S3AFileSystem.innerDelete(S3AFileSystem.java:1806)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1751)
        at org.apache.hudi.common.fs.HoodieWrapperFileSystem.lambda$delete$14(HoodieWrapperFileSystem.java:357)
        at org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeMetrics(HoodieWrapperFileSystem.java:114)
        at org.apache.hudi.common.fs.HoodieWrapperFileSystem.delete(HoodieWrapperFileSystem.java:356)
        at org.apache.hudi.common.fs.FSUtils.deleteDir(FSUtils.java:745)
        ... 84 more
zyclove commented 1 year ago

image

So, It‘s this cause? Do you have a solution?

ad1happy2go commented 1 year ago

@zyclove Do you know what created this directory? Did cleaning this dir and resuming the job works?

zyclove commented 1 year ago

It looks like it was created automatically by S3, manual cleanup is ok.

ad1happy2go commented 1 year ago

@zyclove Was it one off case or you getting this issue again.

ad1happy2go commented 1 year ago

@zyclove Were you able to resolve this issue or are you hitting it again? If resolved can you share the resolution to help the community?

blackcheckren commented 11 months ago

I also encountered this problem, but the corresponding directory on S3 could not be deleted after dozens of manual attempts, and the log showed that a folder with the same name was automatically created after deletion. I do not know why this problem occurs. Is the http request sent by the marker repeated for many times? 微信图片_20231127094305 微信图片_20231127094353