apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.36k stars 2.42k forks source link

[SUPPORT]Hudi Failed to read MARKERS file #6900

Open yhyyz opened 2 years ago

yhyyz commented 2 years ago

Environement

EMR 6.7.0 Spark 3.2.1 Hudi 0.11.0

Job Info

Hudi table type: MOR Storage: AWS S3 Spark Structured Streaming foreachBatch multiple task write different hudi tables。An error occurs after the job runs for a few hours, Failed to read MARKERS Because file not found, But why does this error occur?

Hudi Config

hoodie.upsert.shuffle.parallelism=20
hoodie.insert.shuffle.parallelism=20
hoodie.keep.min.commits=6
hoodie.keep.max.commits=7
hoodie.parquet.small.file.limit=0
hoodie.index.type=GLOBAL_BLOOM
hoodie.datasource.write.payload.class=org.apache.hudi.common.model.DefaultHoodieRecordPayload
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
hoodie.metadata.enable=false
hoodie.metadata.enable=false
hoodie.cleaner.commits.retained=5
hoodie.compact.inline=false
hoodie.compact.inline.max.delta.commits=4
hoodie.compact.schedule.inline=true

code

object Debezium2Hudi {
  case class TableInfoList(tableInfo: List[TableInfo])
  private val log = LoggerFactory.getLogger("debezium2hudi")
  def main(args: Array[String]): Unit = {
    log.info(args.mkString)
    Logger.getLogger("org").setLevel(Level.WARN)
    val params = Config.parseConfig(Debezium2Hudi, args)
    val tableInfoList = JsonUtil.mapper.readValue(params.tableInfoJson, classOf[TableInfoList])
    val ss = SparkHelper.getSparkSession(params.env)
    import ss.implicits._
    val df = ss
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", params.brokerList)
      .option("subscribe", params.sourceTopic)
      .option("startingOffsets", params.startPos)
      .option("failOnDataLoss", false)
      .option("maxOffsetsPerTrigger",params.maxOffset.toLong)
      .option("kafka.consumer.commit.groupid", params.consumerGroup)
      .load()
      .repartition(Integer.valueOf(params.partitionNum))

    ss.streams.addListener(new StreamingQueryListener {
      override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = log.debug(s"QueryStarted [id = ${event.id}, name = ${event.name}, runId = ${event.runId}]")

      override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = log.warn(s"QueryProgress ${event.progress}")

      override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = log.debug(s"QueryTerminated [id = ${event.id}, runId = ${event.runId}, error = ${event.exception}]")
    })

    val listener = new KafkaOffsetCommitterListener()
    ss.streams.addListener(listener)

    val pool = Executors.newFixedThreadPool(50)
    implicit val xc: ExecutionContextExecutor = ExecutionContext.fromExecutor(pool)

    val partitionFormat: (String => String) = (arg: String) => {
      val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'")
      val parFormatter = DateTimeFormatter.ofPattern("yyyyMM")
      parFormatter.format(formatter.parse(arg))
    }
    val sqlPartitionFunc = udf(partitionFormat)

    val ds = df.selectExpr("CAST(value AS STRING)").as[String]
    val query = ds
      .writeStream
      .queryName("debezium2hudi")
      .option("checkpointLocation", params.checkpointDir)
      // if set 0, as fast as possible
      .trigger(Trigger.ProcessingTime(params.trigger + " seconds"))
      .foreachBatch { (batchDF: Dataset[String], batchId: Long) =>
        log.warn("current batch: "+batchId.toString)

        val newsDF = batchDF.map(cdc => DebeziumParser.apply().debezium2Hudi(cdc))
          .filter(_ != null)
        if (!newsDF.isEmpty) {
          val tasks = Seq[Future[Unit]]()
          for (tableInfo <- tableInfoList.tableInfo) {
            val insertORUpsertDF = newsDF
              .filter($"database" === tableInfo.database && $"table" === tableInfo.table)
              .filter($"operationType" === HudiOP.UPSERT || $"operationType" === HudiOP.INSERT)
              .select($"data".as("jsonData"))
            if (!insertORUpsertDF.isEmpty) {
              val json_schema = ss.read.json(insertORUpsertDF.select("jsonData").as[String]).schema
              val cdcDF = insertORUpsertDF.select(from_json($"jsonData", json_schema).as("cdc_data"))
              val cdcPartitionDF = cdcDF.select($"cdc_data.*")
                .withColumn(tableInfo.hudiPartitionField, sqlPartitionFunc(col(tableInfo.partitionTimeColumn)))
              params.concurrent match {
                case "true" => {
                  val runTask = HudiWriteTask.run(cdcPartitionDF, params, tableInfo)(xc)
                  tasks :+ runTask
                }
                case _ => ....
              }
            }
          }
          if (params.concurrent == "true" && tasks.nonEmpty) {
            Await.result(Future.sequence(tasks), Duration(60, MINUTES))
            ()
          }

        }
      }.start()
    query.awaitTermination()
  }

}
object HudiWriteTask {

  def run(df: DataFrame, params: Config, tableInfo: TableInfo)(implicit xc: ExecutionContext): Future[Unit] = Future {
    log.warn("current thread name: "+ Thread.currentThread().getName +" current thread id: "+ 
     Thread.currentThread().getId.toString +" table name: "+tableInfo.table)
    val props = setHudiConfig(params, tableInfo)
    df.write.format("org.apache.hudi")
      .options(props)
      .mode(SaveMode.Append)
      .save(params.hudiEventBasePath + tableInfo.database + "/" + tableInfo.table + "/")
  }
}

ERROR

22/10/09 08:03:12 INFO ShuffleBlockFetcherIterator: Started 8 remote fetches in 0 ms
22/10/09 08:03:12 INFO Executor: Finished task 0.0 in stage 25180.0 (TID 172844). 2512 bytes result sent to driver
22/10/09 08:03:12 ERROR Executor: Exception in task 0.0 in stage 25178.0 (TID 172843)
org.apache.hudi.exception.HoodieIOException: Failed to read MARKERS file s3://app-util/hudi-bloom/opti-tmp-102/cdc_test_db/dhdata_02/.hoodie/.temp/20221009080301290/MARKERS0
    at org.apache.hudi.common.util.MarkerUtils.readMarkersFromFile(MarkerUtils.java:210)
    at org.apache.hudi.common.util.MarkerUtils.lambda$readTimelineServerBasedMarkersFromFileSystem$141c8e72$1(MarkerUtils.java:185)
    at org.apache.hudi.common.fs.FSUtils.lambda$parallelizeFilesProcess$1f9929d5$1(FSUtils.java:736)
    at org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$mapToPair$786cea6a$1(HoodieSparkEngineContext.java:149)
    at org.apache.spark.api.java.JavaPairRDD$.$anonfun$pairFunToScalaFun$1(JavaPairRDD.scala:1073)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
    at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
    at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
    at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
    at scala.collection.AbstractIterator.to(Iterator.scala:1431)
    at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
    at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
    at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
    at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
    at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2255)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:133)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.FileNotFoundException: No such file or directory 's3://app-util/hudi-bloom/opti-tmp-102/cdc_test_db/dhdata_02/.hoodie/.temp/20221009080301290/MARKERS0'
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:521)
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:932)
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:924)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:906)
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:194)
    at org.apache.hudi.common.util.MarkerUtils.readMarkersFromFile(MarkerUtils.java:207)
    ... 31 more
22/10/09 08:03:13 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 172858
22/10/09 08:03:13 INFO Executor: Running task 6.0 in stage 25185.0 (TID 172858)
danny0405 commented 2 years ago

@yihua Can you take a look ?

yihua commented 2 years ago

@yhyyz to mitigate the issue, have you tried to switch to the direct markers by setting hoodie.write.markers.type=direct and see if the structured streaming can make progress?

yhyyz commented 2 years ago

@yihua

  1. tested set config hoodie.write.markers.type=DIRECT hoodie.embed.timeline.server=false hoodie.clean.automatic=false, no markers issue. but new issue org.apache.hudi.exception.HoodieIOException: Could not read commit details from s3://app-util/hudi-bloom/emrfs-disalbe-cache-direct-101/cdc_test_db/dhdata_02/.hoodie/20221014024801458.deltacommit.requested occured. The application log link: https://dxs9dnjebzm6y.cloudfront.net/tmp/application-log.txt

  2. I use offline compaction and clean by independent job, set hoodie.compact.inline=false and hoodie.clean.automatic=false in streaming job. Do I have to use lock provider?

  3. the new issue did not cause the application to terminate, application has been running for 10 hours. but some table can not write data, which has rollback request. I delete the rollback files, data can be written to the table, but the table rollback again.

    2022-10-14 03:59:17       1574 hudi-bloom/emrfs-disalbe-cache-direct-101/cdc_test_db/dhdata_13/.hoodie/20221014035914613.rollback
    2022-10-14 03:59:16          0 hudi-bloom/emrfs-disalbe-cache-direct-101/cdc_test_db/dhdata_13/.hoodie/20221014035914613.rollback.inflight
    2022-10-14 03:59:16       1680 hudi-bloom/emrfs-disalbe-cache-direct-101/cdc_test_db/dhdata_13/.hoodie/20221014035914613.rollback.requested
    2022-10-14 04:18:17       1682 hudi-bloom/emrfs-disalbe-cache-direct-101/cdc_test_db/dhdata_13/.hoodie/20221014041814868.rollback.requested
    2022-10-14 03:29:28       1725 hudi-bloom/emrfs-disalbe-cache-direct-101/cdc_test_db/dhdata_14/.hoodie/20221014032923642.rollback
    2022-10-14 03:29:25          0 hudi-bloom/emrfs-disalbe-cache-direct-101/cdc_test_db/dhdata_14/.hoodie/20221014032923642.rollback.inflight
    2022-10-14 03:29:25       1678 hudi-bloom/emrfs-disalbe-cache-direct-101/cdc_test_db/dhdata_14/.hoodie/20221014032923642.rollback.requested
  4. i read a requested file use aws s3 cp s3://xxxx/xxxxx/xxxx.rollabck.requested - , the file content:

    Objavro.schema�{"type":"record","name":"HoodieRollbackPlan","namespace":"org.apache.hudi.avro.model","fields":[{"name":"instantToRollback","type":["null",{"type":"record","name":"HoodieInstantInfo","fields":[{"name":"commitTime","type":{"type":"string","avro.java.string":"String"}},{"name":"action","type":{"type":"string","avro.java.string":"String"}}]}],"doc":"Hoodie instant that needs to be rolled back","default":null},{"name":"RollbackRequests","type":["null",{"type":"array","items":{"type":"record","name":"HoodieRollbackRequest","fields":[{"name":"partitionPath","type":{"type":"string","avro.java.string":"String"}},{"name":"fileId","type":["null",{"type":"string","avro.java.string":"String"}],"default":null},{"name":"latestBaseInstant","type":["null",{"type":"string","avro.java.string":"String"}],"default":null},{"name":"filesToBeDeleted","type":{"type":"array","items":{"type":"string","avro.java.string":"String"},"default":[]},"default":[]},{"name":"logBlocksToBeDeleted","type":["null",{"type":"map","values":"long","avro.java.string":"String"}],"default":null}]}}],"default":null},{"name":"version","type":["int","null"],"default":1}]}�+Y�H
                                                                                                                                                  ��1�F"20221014032915679deltacommit�+Y�H
                                                                                                                                                                                         ��1�
  5. Failed to archive commits, because deltacommit.inflight file not found. Why it happens?

    ERROR HoodieTimelineArchiver: Failed to archive commits, .commit file: 20221014120201577.deltacommit.inflight
    org.apache.hudi.exception.HoodieIOException: Could not read commit details from s3://app-util/hudi-bloom/emrfs-disalbe-cache-direct-101/cdc_test_db/dhdata_01/.hoodie/20221014120201577.deltacommit.inflight
    at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.readDataFromPath(HoodieActiveTimeline.java:761)
    at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.getInstantDetails(HoodieActiveTimeline.java:266)
    at org.apache.hudi.common.table.timeline.HoodieDefaultTimeline.isEmpty(HoodieDefaultTimeline.java:377)
    at org.apache.hudi.client.HoodieTimelineArchiver.archive(HoodieTimelineArchiver.java:593)
    at org.apache.hudi.client.HoodieTimelineArchiver.archiveIfRequired(HoodieTimelineArchiver.java:169)
    at org.apache.hudi.client.BaseHoodieWriteClient.archive(BaseHoodieWriteClient.java:907)
    at org.apache.hudi.client.BaseHoodieWriteClient.autoArchiveOnCommit(BaseHoodieWriteClient.java:629)
    at org.apache.hudi.client.BaseHoodieWriteClient.postCommit(BaseHoodieWriteClient.java:534)
    at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:236)
    at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:122)
    at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:678)
    at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:313)
    at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:165)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:115)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:112)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:108)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:83)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:108)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:95)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:93)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:136)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
    at com.aws.analytics.util.HudiWriteTask$.$anonfun$run$1(HudiWriteTask.scala:25)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
    at scala.util.Success.$anonfun$map$1(Try.scala:255)
    at scala.util.Success.map(Try.scala:213)
    at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
    at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
    at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
    Caused by: java.io.FileNotFoundException: No such file or directory 's3://app-util/hudi-bloom/emrfs-disalbe-cache-direct-101/cdc_test_db/dhdata_01/.hoodie/20221014120201577.deltacommit.inflight'
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:521)
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:932)
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:924)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:906)
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:194)
    at org.apache.hudi.common.fs.HoodieWrapperFileSystem.open(HoodieWrapperFileSystem.java:460)
    at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.readDataFromPath(HoodieActiveTimeline.java:758)
    ... 59 more
    22/10/14 12:20:45 INFO MultipartUploadOutputStream: close closed:false s3://app-util/hudi-bloom/emrfs-disalbe-cache-direct-101/cdc_test_db/dhdata_01/.hoodie/archived/.commits_.archive.148_1-0-1
    22/10/14 12:20:46 INFO MultipartUploadOutputStream: close closed:false s3://app-util/hudi-bloom/emrfs-disalbe-cache-direct-101/cdc_test_db/dhdata_01/.hoodie/archived/.commits_.archive.148_1-0-1
    22/10/14 12:20:46 INFO S3N
nsivabalan commented 1 year ago

this is similar to another issue I have debugged recently. As of now, you can't disable auto clean and thats a limitation.

If you disable auto clean, cleaning is essentially not happening. But archival assumes that cleaning is enabled and makes progress. And it will result in dangling data files where timeline files are archived, but data files are left dangling since the cleaner could not run.

If you wish to run cleaner as a separate process, can you disable archival as well w/ regular writer. I have filed a tracking jira here https://issues.apache.org/jira/browse/HUDI-5054

nsivabalan commented 1 year ago

and yes, if you using a diff process to do compaction and cleaning, you need to configure lock provider for sure. If not, you might end up with exceptions.

nsivabalan commented 1 year ago

Another suggestion is. if you feel having cleaner inline is causing some perf hit, you can relax cleaner to run only once in every N commits, using hoodie.clean.max.commits. What this config means is, even to attempt whether something needs to be cleaned, will happen once every N where hoodie.clean.max.commits=N.

Do not confuse this w/ hoodie.cleaner.commits.retained. Let say you se hoodie.cleaner.commits.retained = 10, but hoodie.clean.max.commits=2.

Every 2 commits, hudi cleaner will check if there are more than 10 commits in active timeline and clean the data files. IF you are ok to give some leeway, you can increase the value for hoodie.clean.max.commits to 5 or 10. So, only once every 5 commits even clean scheduling will be attempted.

nsivabalan commented 1 year ago

Hope that helps.

umehrot2 commented 1 year ago

Latest stack trace, when using structured streaming with the following properties:

hoodie.datasource.hive_sync.enable=false
hoodie.upsert.shuffle.parallelism=20
hoodie.insert.shuffle.parallelism=20
hoodie.keep.min.commits=6
hoodie.keep.max.commits=7
hoodie.parquet.small.file.limit=52428800
hoodie.index.type=GLOBAL_BLOOM
hoodie.datasource.write.payload.class=org.apache.hudi.common.model.DefaultHoodieRecordPayload
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
hoodie.metadata.enable=true
hoodie.cleaner.commits.retained=3
hoodie.clean.max.commits=5
hoodie.clean.async=false
hoodie.clean.automatic=true
hoodie.archive.async=false
hoodie.datasource.compaction.async.enable=true
hoodie.write.markers.type=DIRECT
hoodie.embed.timeline.server=true
hoodie.embed.timeline.server.async=false
hoodie.compact.schedule.inline=false
hoodie.compact.inline.max.delta.commits=2

Stacktrace:

22/10/19 15:36:18 ERROR UpsertPartitioner: Error trying to compute average bytes/record 
org.apache.hudi.exception.HoodieIOException: Could not read commit details from s3://app-util/hudi-bloom/multi-stream-105/cdc_test_db/dhdata_15/.hoodie/20221019152438682.commit
    at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.readDataFromPath(HoodieActiveTimeline.java:761)
    at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.getInstantDetails(HoodieActiveTimeline.java:266)
    at org.apache.hudi.common.table.timeline.HoodieDefaultTimeline.getInstantDetails(HoodieDefaultTimeline.java:372)
    at org.apache.hudi.table.action.commit.UpsertPartitioner.averageBytesPerRecord(UpsertPartitioner.java:373)
    at org.apache.hudi.table.action.commit.UpsertPartitioner.assignInserts(UpsertPartitioner.java:162)
    at org.apache.hudi.table.action.commit.UpsertPartitioner.<init>(UpsertPartitioner.java:95)
    at org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitPartitioner.<init>(SparkUpsertDeltaCommitPartitioner.java:50)
    at org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.getUpsertPartitioner(BaseSparkDeltaCommitActionExecutor.java:69)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.getPartitioner(BaseSparkCommitActionExecutor.java:217)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:163)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:85)
    at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:57)
    at org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExecutor.execute(SparkUpsertDeltaCommitActionExecutor.java:46)
    at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:89)
    at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:76)
    at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:157)
    at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:213)
    at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:304)
    at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$2(HoodieStreamingSink.scala:91)
    at scala.util.Try$.apply(Try.scala:213)
    at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$1(HoodieStreamingSink.scala:90)
    at org.apache.hudi.HoodieStreamingSink.retry(HoodieStreamingSink.scala:166)
    at org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:89)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
Caused by: java.io.FileNotFoundException: No such file or directory 's3://app-util/hudi-bloom/multi-stream-105/cdc_test_db/dhdata_15/.hoodie/20221019152438682.commit'
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:521)
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:932)
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:924)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:906)
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:194)
    at org.apache.hudi.common.fs.HoodieWrapperFileSystem.open(HoodieWrapperFileSystem.java:460)
    at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.readDataFromPath(HoodieActiveTimeline.java:758)
    ... 52 more
yhyyz commented 1 year ago

@nsivabalan Thanks for you help.

  1. using structured streaming multiple stream query instead of forEachBatch and with the following properties. Application running for 19 hours without any errors. But if set hoodie.embed.timeline.server=true, error occurred UpsertPartitioner: Error trying to compute average bytes/record,... Caused by: java.io.FileNotFoundException: No such file or directory ..../.hoodie/....commit.
    hoodie.datasource.hive_sync.enable=false
    hoodie.upsert.shuffle.parallelism=20
    hoodie.insert.shuffle.parallelism=20
    hoodie.keep.min.commits=6
    hoodie.keep.max.commits=7
    hoodie.parquet.small.file.limit=52428800
    hoodie.index.type=GLOBAL_BLOOM
    hoodie.datasource.write.payload.class=org.apache.hudi.common.model.DefaultHoodieRecordPayload
    hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
    hoodie.metadata.enable=true
    hoodie.cleaner.commits.retained=3
    hoodie.clean.async=false
    hoodie.clean.automatic=true
    hoodie.archive.async=false
    hoodie.datasource.compaction.async.enable=true
    hoodie.write.markers.type=DIRECT
    hoodie.embed.timeline.server=false
    hoodie.embed.timeline.server.async=false
  2. using forEachBatch with multiple thread,enable inline compaction instead of offline compaction and with following properties, error occurred UpsertPartitioner: Error trying to compute average bytes/record,... Caused by: java.io.FileNotFoundException: No such file or directory ..../.hoodie/....commit, but application still runs. I will set hoodie.embed.timeline.server=false to test again, any new information I will sync here.
    hoodie.datasource.hive_sync.enable=false
    hoodie.upsert.shuffle.parallelism=20
    hoodie.insert.shuffle.parallelism=20
    hoodie.keep.min.commits=6
    hoodie.keep.max.commits=7
    hoodie.parquet.small.file.limit=52428800
    hoodie.index.type=GLOBAL_BLOOM
    hoodie.datasource.write.payload.class=org.apache.hudi.common.model.DefaultHoodieRecordPayload
    hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
    hoodie.metadata.enable=true
    hoodie.cleaner.commits.retained=3
    hoodie.clean.max.commits=5
    hoodie.clean.async=false
    hoodie.clean.automatic=true
    hoodie.archive.async=false
    hoodie.compact.inline=true
    hoodie.datasource.compaction.async.enable=false
    hoodie.write.markers.type=DIRECT
    hoodie.embed.timeline.server=true
    hoodie.embed.timeline.server.async=false
    hoodie.compact.schedule.inline=false
    hoodie.compact.inline.max.delta.commits=2
nsivabalan commented 1 year ago

we might need to inspect the timeline to see whats happening. may be metadata table is corrupt. we might need to inspect that.

Can you run our validation tool against your table and let us know what you see.

https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java

enable all these --validate-latest-file-slices: validate latest file slices for all partitions.

yhyyz commented 1 year ago
  1. using the following command, the log file: https://dxs9dnjebzm6y.cloudfront.net/tmp/metadata-table-validator.log
    basePath=s3://xxxxx/hudi-bloom/multi-thread-105/cdc_test_db/dhdata_13/
    spark-submit \
    --class org.apache.hudi.utilities.HoodieMetadataTableValidator \
    --master yarn \
    --driver-memory 2g \
    --executor-memory 4g \
    /usr/lib/hudi/hudi-utilities-bundle_2.12-0.11.0-amzn-0.jar \
    --base-path $basePath \
    --validate-latest-file-slices \
    --validate-latest-base-files \
    --validate-all-file-groups
  2. set hoodie.embed.timeline.server=false, same error occurs.
nsivabalan commented 1 year ago

yeah. looks like metadata table is out of sync w/ data table. you may need to disbale metadata for few commits and then re-enable. I will follow up to see how did we end up in this state.

nsivabalan commented 1 year ago

hey @yhyyz : any updates for us. we might close this due to no activity. let us know if you need more help from us

zaza commented 1 year ago

This is definitely still an issue, we were hit by an error that looks identical to what @umehrot2 reported a while ago:

ERROR UpsertPartitioner: Error trying to compute average bytes/record 
org.apache.hudi.exception.HoodieIOException: Could not read commit details from s3://.../.hoodie/20230714152804208.commit
        at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.readDataFromPath(HoodieActiveTimeline.java:824) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
        at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.getInstantDetails(HoodieActiveTimeline.java:310) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
        at org.apache.hudi.common.table.timeline.HoodieDefaultTimeline.getInstantDetails(HoodieDefaultTimeline.java:438) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
        at org.apache.hudi.table.action.commit.UpsertPartitioner.averageBytesPerRecord(UpsertPartitioner.java:380) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
        at org.apache.hudi.table.action.commit.UpsertPartitioner.assignInserts(UpsertPartitioner.java:169) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
        at org.apache.hudi.table.action.commit.UpsertPartitioner.<init>(UpsertPartitioner.java:98) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
        at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.getUpsertPartitioner(BaseSparkCommitActionExecutor.java:404) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
        at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.getPartitioner(BaseSparkCommitActionExecutor.java:224) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
        at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:170) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
        at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:83) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
        at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:68) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
        at org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:44) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
        at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:107) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
        at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:96) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
        at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:140) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
        at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:214) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
        at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:372) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
        at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:104) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:101) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:626) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:179) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:626) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:602) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:97) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:84) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:82) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:125) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
        ...
Caused by: java.io.FileNotFoundException: No such file or directory 's3://tasktop-data-platform-dev-analytical-data/simulator/workstreams/.hoodie/20230714152804208.commit'
        at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:529) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
        at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:940) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
        at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:932) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
        at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:983) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
        at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:197) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
        at org.apache.hudi.common.fs.HoodieWrapperFileSystem.open(HoodieWrapperFileSystem.java:476) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]
        at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.readDataFromPath(HoodieActiveTimeline.java:821) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.1.jar:0.13.1]

We're on EMR 6.11.0

What additional information do you need in order to troubleshoot this? Are there any configuration settings that would help us mitigate the problem?

danny0405 commented 1 year ago

We're on EMR 6.11.0

What the Hudi version are you using?

zaza commented 1 year ago

0.13.1 as in org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1

zaza commented 1 year ago

Is this issue currently within your awareness? Is there a possibility of it being addressed in the near future?

fanfanAlice commented 3 months ago

I have the same problem. Is there any solution to this problem? I use the hudi1 inner join hive1 table to write to another hudi2 table, and then I get an error. I also have a flink task writing data to this hudi2 table

24/06/28 15:54:20 WARN HoodieWriteConfig: Embedded timeline server is disabled, fallback to use direct marker type for spark 24/06/28 15:55:15 WARN HoodieWriteConfig: 24/06/28 15:55:15 WARN HoodieBackedTableMetadata: Cannot initialize metadata table as operation(s) are in progress on the dataset: [=-) Embedded timeline server is disabled, fallback to use direct marker type for spark 24/06/28 15:55:15 WARN HoodieBackedTableMetadata: Metadata table was not found at path hdfs://hacluster/user/kylin/flink/data/streaming_rdss_r86__deltacommit_INFLIGHT) 24/06/28 15:55:15 WARN HoodieBackedTableMetadata: Metadata table was not found at path hdfs://hacluster/user/kylin/flink/data/streaming_rdss_r_hoodie/metadata 24/06/28 16:00:09 ERROR UpsertPartitioner: Error trying to compute average bytes/record org.apache.hudi.exception.HoodieIOException: Could not read commit details from hdfs://hacluster/user/kylin/flink/data/streaming_rdss_rcsp_lab/2024062815382133 at org.apache.hudi.commit.table.timeline.HoodieActiveTimeline.readDataFromPath(HoodieActiveTimeline.java:824) at org.apache.hudi.common.table.timeline.HoodieTimeline.getInstantDetails(HoodieActiveTimeline.java:310) at org.apache.hudi.common.table.timeline.HoodieDefaultTimeline.getInstantDetails(HoodieDefaultTimeline.java:438) at org.apache.hudi.table.action.commit.UpsertPartitioner.averageBytesPerRecord(UpsertPartitioner.java:380) at org.apache.hudi.table.action.commit.UpsertPartitioner.assignInserts(UpsertPartitioner.java:169) at org.apache.hudi.table.action.commit.UpsertPartitioner.(UpsertPartitioner.java:98) at org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitPartitioner.(SparkUpsertDeltaCommitPartitioner.java:49) at org.apache.hudi.table.action.commit.BaseSparkDeltaCommitActionExecutor.getUpsertPartitioner(BaseSparkDeltaCommitActionExecutor.java:408) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.getPartitioner(BaseSparkCommitActionExecutor.java:226) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.(BaseSparkCommitActionExecutor.java:40B) // 注意:这里可能是打印错误,应该是 408 at org.apache.hudi.table.action.compact.BaseSparkCompactionActionExecutor.execute(BaseSparkCompactionActionExecutor.java:83) at org.apache.hudi.table.HoodieSparkMergeOnReadTable.(HoodieSparkMergeOnReadTable.java:80) at org.apache.hudi.client.SparkRDDWriteClient.insert(SparkRDDWriteClient.java:165) at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:207) at org.apache.hudi.HoodieSparkSqlWriter.write(HoodieSparkSqlWriter.java:363)

danny0405 commented 3 months ago

So have concurrent write from different engines into one table, did you have chance to config the concurerncy control lock provider and policy?

fanfanAlice commented 3 months ago

yes , I use concurerncy control lock

danny0405 commented 3 months ago

the dataset: [=-) Embedded timeline server is disabled

Did you disable the embedded timeline server?

fanfanAlice commented 3 months ago

yes set hoodie.embed.timeline.server=false

danny0405 commented 3 months ago

Could not read commit details from hdfs://hacluster/user/kylin/flink/data/streaming_rdss_rcsp_lab/2024062815382133

Is this a real file on storage? Did you check the integrity of it?