apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.31k stars 2.41k forks source link

[SUPPORT] HoodieSnapshotExporter is broken with PR HUDI-712 #11640

Closed Gatsby-Lee closed 1 month ago

Gatsby-Lee commented 1 month ago

Describe the problem you faced

In HUDI-712, the "overwrite=false" was added.

When spark executors were terminated somehow, there are chances that the target path was created and already existed.

In this case, if the failed Spark "Task" is retried by other executors, it would fail since the target path already existed.

Expected behavior

The HoodieSnapshotExporter completes even when the failed "Tasks" are retried.

ad1happy2go commented 1 month ago

@Gatsby-Lee PR comment shows this was added to keep in find that user accidentally don't overwrite the data. Failed tasks retry should not fail this, but Job retry should fail which should be the expected behaviour.

Gatsby-Lee commented 1 month ago

@ad1happy2go I noticed that the Task level retry failed as well. ( not Job level )

( I don't know much about how Spark works ) But, the "overwrite=false" basically makes the existing path ( created by the terminated executors ) cause the Job's failure. ( I am on EMR on EKS with spot instance )

      FileUtil.copy(
          executorSourceFs,
          sourceFilePath,
          executorOutputFs,
          new Path(toPartitionPath, sourceFilePath.getName()),
          false,
          false,   <--- "this"
          executorOutputFs.getConf());
    }, parallelism);
Gatsby-Lee commented 1 month ago

@ad1happy2go I tested with the on-demand instance in EMR on EKS. In my case, if Tasks are not retried due to the terminated k8e Pods ( in spot instance ), then there is no issue.

So, I can say that the overwrite=false brought in the behavior changes.

ad1happy2go commented 1 month ago

@Gatsby-Lee Can you see two job attempts in your application?

ad1happy2go commented 1 month ago

@Gatsby-Lee Also please if possible try to paste the stack trace and spark UI.

Gatsby-Lee commented 1 month ago

@Gatsby-Lee Can you see two job attempts in your application?

sorry. I don't get you. do you mean if two different jobs ran simultaneously? NO At any given time, there is only one Spark Job. ( one application )

Gatsby-Lee commented 1 month ago

@

@Gatsby-Lee Also please if possible try to paste the stack trace and spark UI.

Unfortunately, our internal logging doesn't guarantee the log order from EKR on EKS. Although it is better than nothing, here it is.

at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3067)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3003)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3002)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3002)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1318)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1318)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1318)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3271)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3205)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3194)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1041)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2406)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2427)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2446)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2471)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$1(RDD.scala:1028)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:1026)
    at org.apache.spark.api.java.JavaRDDLike.foreach(JavaRDDLike.scala:352)
    at org.apache.spark.api.java.JavaRDDLike.foreach$(JavaRDDLike.scala:351)
    at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:45)
    at org.apache.hudi.client.common.HoodieSparkEngineContext.foreach(HoodieSparkEngineContext.java:155)
    at org.apache.hudi.utilities.HoodieSnapshotExporter.exportAsHudi(HoodieSnapshotExporter.java:222)
    at org.apache.hudi.utilities.HoodieSnapshotExporter.export(HoodieSnapshotExporter.java:143)
    at org.apache.hudi.utilities.HoodieSnapshotExporter.main(HoodieSnapshotExporter.java:292)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1075)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1167)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1176)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.hadoop.fs.PathExistsException: `s3://hell-world/20240717/dummy-db/dummy-dataset/span_source_service=model_serving/event_created_yearmonth=202307/ca48e606-9303-4157-bb95-6db62ad41193-0_0-132824-903108_20230729020005820.parquet': Target s3://hell-world/20240717/dummy-db/dummy-dataset/span_source_service=model_serving/event_created_yearmonth=202307/ca48e606-9303-4157-bb95-6db62ad41193-0_0-132824-903108_20230729020005820.parquet already exists
    at org.apache.hadoop.fs.FileUtil.checkDest(FileUtil.java:623)
    at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:466)
    at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:430)
    at org.apache.hudi.utilities.HoodieSnapshotExporter.lambda$exportAsHudi$43b4457d$1(HoodieSnapshotExporter.java:232)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$foreach$1(JavaRDDLike.scala:352)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$foreach$1$adapted(JavaRDDLike.scala:352)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1028)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1028)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2446)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:143)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:629)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:95)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:632)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)

24/07/17 05:52:59 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 499 in stage 5.0 failed 4 times, most recent failure: Lost task 499.4 in stage 5.0 (TID 2569) (10.100.144.194 executor 3): org.apache.hadoop.fs.PathExistsException: `s3://hell-world/20240717/dummy-db/dummy-dataset/span_source_service=model_serving/event_created_yearmonth=202307/ca48e606-9303-4157-bb95-6db62ad41193-0_0-132824-903108_20230729020005820.parquet': Target s3://hell-world/20240717/dummy-db/dummy-dataset/span_source_service=model_serving/event_created_yearmonth=202307/ca48e606-9303-4157-bb95-6db62ad41193-0_0-132824-903108_20230729020005820.parquet already exists
    at org.apache.hadoop.fs.FileUtil.checkDest(FileUtil.java:623)
    at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:466)
    at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:430)
    at org.apache.hudi.utilities.HoodieSnapshotExporter.lambda$exportAsHudi$43b4457d$1(HoodieSnapshotExporter.java:232)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$foreach$1(JavaRDDLike.scala:352)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$foreach$1$adapted(JavaRDDLike.scala:352)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1028)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1028)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2446)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:143)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:629)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:95)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:632)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)
ad1happy2go commented 1 month ago

@Gatsby-Lee I see from the stack trace that with a particular file only , it is kind of failing complaining file not exists. So it could have come from task failure only. Dont think "HUDI-712, the "overwrite=false" is impacting this though.

Gatsby-Lee commented 1 month ago

@Gatsby-Lee I see from the stack trace that with a particular file only , it is kind of failing complaining file not exists. So it could have come from task failure only. Dont think "HUDI-712, the "overwrite=false" is impacting this though.

so, the Job failed because the particular file ( or any file ) already existed when the Task was retried. Isn't it?

ad1happy2go commented 1 month ago

As per my understanding, that flag "overwrite=false" checks the base path when the job starts.

Let me tag @xushiyan author of the PR if he can ensure that.

xushiyan commented 1 month ago

@Gatsby-Lee good catch. it's actually not desirable to use overwrite=false here as it's applied to individual task. We should change it to allow overwrite, and add a check on the whole directory to ensure it's empty before the spark job start the parallel copying. Feel free to submit a PR to fix the behavior.

Gatsby-Lee commented 1 month ago

@xushiyan Actually, the logic checking the target path already exists.

This logic has helped prevent overwriting the existing target path. So, I think updating the logic to the "overwrite=true" might be enough change. What do you think?

  public void export(JavaSparkContext jsc, Config cfg) throws IOException {
    FileSystem outputFs = HadoopFSUtils.getFs(cfg.targetOutputPath, jsc.hadoopConfiguration());
    if (outputFs.exists(new Path(cfg.targetOutputPath))) {
      throw new HoodieSnapshotExporterException("The target output path already exists.");
    }
xushiyan commented 1 month ago

@Gatsby-Lee yup sg

Gatsby-Lee commented 1 month ago

@xushiyan @ad1happy2go Thank you. I made a PR. ( https://github.com/apache/hudi/pull/11668/files ) It would be great if this PR can be applied to 0.15.1