apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.32k stars 2.41k forks source link

Failed to rollback s3://s3_bucket/xml commits 20221231041647333 #7590

Open koochiswathiTR opened 1 year ago

koochiswathiTR commented 1 year ago

Tips before filing an issue

We are facing an exception in our EMR Spark Streaming application. We consumes messages from kinesis stream, process the data and will store in hudi tables.

Failed to rollback s3://a206760-novusnorm-s3-qa-use1/novusnorm_xml commits 20221231041647333, at org.apache.hudi.client.BaseHoodieWriteClient.rollback(BaseHoodieWriteClient.java:783), at org.apache.hudi.client.BaseHoodieWriteClient.rollbackFailedWrites(BaseHoodieWriteClient.java:1193), at org.apache.hudi.client.BaseHoodieWriteClient.rollbackFailedWrites(BaseHoodieWriteClient.java:1176), at org.apache.hudi.client.BaseHoodieWriteClient.rollbackFailedWrites(BaseHoodieWriteClient.java:1164), at org.apache.hudi.client.BaseHoodieWriteClient.lambda$startCommitWithTime$97cdbdca$1(BaseHoodieWriteClient.java:964), at org.apache.hudi.common.util.CleanerUtils.rollbackFailedWrites(CleanerUtils.java:151), at org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:963), at org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:956), at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:303), at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:165), at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45), at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75), at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73), at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84), at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:115), at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107), at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232), at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110), at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135), at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107), at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232), at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135), at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253), at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134), at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775), at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68), at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:112), at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:108), at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519), at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:83), at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519), at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30), at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267), at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263), at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30), at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30), at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495), at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:108), at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:95), at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:93), at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:136), at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848), at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382), at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355), at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239), at com.tr.indigo.tacticalnovusingest.utils.NovusHudiOperations$.$anonfun$upsert$2(NovusHudiOperations.scala:326), at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23), at scala.util.Try$.apply(Try.scala:213), at com.tr.indigo.tacticalnovusingest.utils.NovusHudiOperations$.retry(NovusHudiOperations.scala:416),

[org.apache.hudi.timeline.service.TimelineService] [TimelineService]: Timeline server start failed on port 0. Attempting port 0 + 1. java.lang.RuntimeException: java.io .IOException: Too many open files at io.javalin.Javalin.start(Javalin.java:189 ) at io.javalin.Javalin.start(Javalin.java:151 ) ... ... at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692 ) at java.util.concurrent.ForkJoinWorkerThread.run (ForkJoinWorkerThread.java:175 ) Caused by: java.io .IOException: Too many open files

We tried to remove the complained commit file and the application is behaving well. Can you please help us here why this exception had come?

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace

Failed to rollback s3://a206760-novusnorm-s3-qa-use1/novusnorm_xml commits 20221231041647333, at org.apache.hudi.client.BaseHoodieWriteClient.rollback(BaseHoodieWriteClient.java:783), at org.apache.hudi.client.BaseHoodieWriteClient.rollbackFailedWrites(BaseHoodieWriteClient.java:1193), at org.apache.hudi.client.BaseHoodieWriteClient.rollbackFailedWrites(BaseHoodieWriteClient.java:1176), at org.apache.hudi.client.BaseHoodieWriteClient.rollbackFailedWrites(BaseHoodieWriteClient.java:1164), at org.apache.hudi.client.BaseHoodieWriteClient.lambda$startCommitWithTime$97cdbdca$1(BaseHoodieWriteClient.java:964), at org.apache.hudi.common.util.CleanerUtils.rollbackFailedWrites(CleanerUtils.java:151), at org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:963), at org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:956), at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:303), at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:165), at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45), at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75), at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73), at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84), at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:115), at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107), at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232), at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110), at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135), at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107), at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232), at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135), at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253), at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134), at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775), at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68), at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:112), at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:108), at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519), at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:83), at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519), at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30), at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267), at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263), at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30), at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30), at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495), at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:108), at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:95), at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:93), at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:136), at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848), at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382), at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355), at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239), at com.tr.indigo.tacticalnovusingest.utils.NovusHudiOperations$.$anonfun$upsert$2(NovusHudiOperations.scala:326), at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23), at scala.util.Try$.apply(Try.scala:213), at com.tr.indigo.tacticalnovusingest.utils.NovusHudiOperations$.retry(NovusHudiOperations.scala:416),

Please help us on this.

yihua commented 1 year ago

Hi @koochiswathiTR Thanks for raising the issue. Could you share the Hudi write configs for this job? It looks like that the timeline server failed to start due to the underlying Javalin server: java.lang.RuntimeException: java.io .IOException: Too many open files at io.javalin.Javalin.start(Javalin.java:189 ). This might be a Javalin issue which cannot start the server with a port. Retrying the same job should get around this.

Removing the complained commit file, in this case, may lead to uncommitted data served by queries, as such data has to be rolled back first.

koochiswathiTR commented 1 year ago

@yihua Please find our Hudi write configs

DataSourceWriteOptions.TABLE_TYPE.key() -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
DataSourceWriteOptions.INSERT_DROP_DUPS.key() ->"true",//deduplicate
DataSourceWriteOptions.SQL_INSERT_MODE.key() ->"strict",//deduplicate
DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "guid",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "collectionName",
DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "operationTime",
HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY.key() -> CompactionTriggerStrategy.TIME_ELAPSED.name,
HoodieCompactionConfig.INLINE_COMPACT_TIME_DELTA_SECONDS.key() -> String.valueOf(60 * 60),
HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key() -> "1248",
HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key() -> "1249",
HoodieCompactionConfig.MAX_COMMITS_TO_KEEP.key() -> "1300",
HoodieCompactionConfig.ASYNC_CLEAN.key() -> "false", 
HoodieCompactionConfig.INLINE_COMPACT.key() -> "true", 
HoodieMetricsConfig.TURN_METRICS_ON.key() -> "true",
HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key() -> MetricsReporterType.DATADOG.name(),
HoodieMetricsDatadogConfig.API_SITE_VALUE.key() -> "US",
HoodieMetricsDatadogConfig.METRIC_PREFIX_VALUE.key() -> "XXX.hudi",
HoodieMetricsDatadogConfig.API_KEY_SUPPLIER.key() -> "XXX",
HoodieMetadataConfig.ENABLE.key() -> "false",
HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.key() -> "false",
HoodieWriteConfig.COMBINE_BEFORE_INSERT.key() ->"true",
HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key() ->"true",
maikouliujian commented 1 year ago

I also face this case.

sunneebaby commented 1 year ago

I also face this case.