Closed Armelabdelkbir closed 5 months ago
@Armelabdelkbir This normally happens when the table gets entered into bad state,
In your timeline, I can see There were two rollbacks, One of the rollbacks kept trying to rollback a commit that was already rolled back by the other rollback instant. This normally happens when multi writers runs parallel without any concurrency control. The issue can be solved by manually removing the pending rollback instant and then resume the jobs.
@ad1happy2go thanks for replying, should i remove the last rollback inflight : 20230716231754202.rollback.inflight or the previous one : 20230716224727964.rollback.inflight , 20230716224727964.rollback ?
@Armelabdelkbir You should remove the last one i.e. pending rollback instant - 20230716231754202.rollback.inflight and 20230716231754202.rollback.requested
@ad1happy2go alright, thanks, i cleared also delta_commit for the corresponding ts_ms, i did also for another path with the same issue, there is a way to rollback automatically, cuz i have multiple jobs +30 it not production approach to clean failed jobs manually each time
@Armelabdelkbir I dont think there is any other automated way currently
@Armelabdelkbir I suggest to use concurrency control which will avoid many of such issues
@ad1happy2go i added some configuration to handle multiple writers, i have multiple micro batchs with spark structured streaming, and I have this issue only when my job crashes or restarts
"hoodie.write.concurrency.mode"->"optimistic_concurrency_control",
"hoodie.cleaner.policy.failed.writes" -> "LAZY",
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key -> "org.apache.hudi.hive.HiveMetastoreBasedLockProvider",
HoodieLockConfig.HIVE_TABLE_NAME.key -> (table.db_name + "." + table.table_name),
HoodieLockConfig.HIVE_DATABASE_NAME.key -> table.db_name
Now i see new folder under.hoodie called .hearbeat (what's the purpose of this folder ?)
i also fixed kafka.group.id and reduce kafka.session.timeout.ms" to 10000
I deploy my new release and wait if it reproduces
with this configuration i got some times errors:
23/07/26 07:48:10 ERROR streaming.MicroBatchExecution: Query [id = ade16080-efa5-42d5-8432-b01c3216a566, runId = 197a2b4e-5176-48da-b6f1-b40ad4620fd8] terminated with error
java.lang.IllegalArgumentException: ALREADY_ACQUIRED
at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:40)
at org.apache.hudi.hive.HiveMetastoreBasedLockProvider.acquireLock(HiveMetastoreBasedLockProvider.java:136)
at org.apache.hudi.hive.HiveMetastoreBasedLockProvider.tryLock(HiveMetastoreBasedLockProvider.java:112)
at org.apache.hudi.client.transaction.lock.LockManager.lock(LockManager.java:67)
at org.apache.hudi.client.transaction.TransactionManager.beginTransaction(TransactionManager.java:53)
at org.apache.hudi.client.BaseHoodieWriteClient.initTable(BaseHoodieWriteClient.java:1459)
at org.apache.hudi.client.BaseHoodieWriteClient.initTable(BaseHoodieWriteClient.java:1496)
at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:154)
at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:213)
at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:304)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:163)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
at com.ovh.ingest.Ingest$.test$1(Ingest.scala:132)
at com.ovh.ingest.Ingest$.$anonfun$new$8(Ingest.scala:113)
at com.ovh.ingest.Ingest$.$anonfun$new$8$adapted(Ingest.scala:113)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:586)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:584)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:584)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
As discussed in office hours, @Armelabdelkbir will try the zookeeper lock and see if he sees the similar error.
@ad1happy2go by switching to zookeeperLockProvider, my jobs run for few days, then i see some failed jobs : with error:
withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
... 1 more
Caused by: org.apache.hudi.exception.HoodieLockException: FAILED_TO_ACQUIRE lock atZkBasePath = /hudi_multiwriters, lock key = database_table
at org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider.tryLock(ZookeeperBasedLockProvider.java:101)
at org.apache.hudi.client.transaction.lock.LockManager.lock(LockManager.java:67)
... 56 more
with zookeeperLock my jobs run longer than hiveLock i will add some retries conf and test again:
HoodieLockConfig.LOCK_ACQUIRE_NUM_RETRIES.key -> "15",
HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.key -> "15",
HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.key -> "60000",
HoodieLockConfig.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS.key -> "60000",
HoodieLockConfig.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS.key -> "20000",
@Armelabdelkbir Were you able to try with retry configurations? Are you still facing this issue?
@ad1happy2go it seems good for me last months, sorry for the late response, thanks for helping
@Armelabdelkbir Closing this issue then. thanks a lot.
Describe the problem you faced
Hello, i'm using hudi in production as part of my cdc pipeline ( debezium, kafka, spark), i have multiple jobs but some jobs failed with errors failled to rollback ...
i don't have multiple writers at a time, just one spark structured streaming job by stream (database), i had this errors few times in the past, i clean everything and i relaunch, but is not production approach for my largest database Error message :
To Reproduce relaunch my production spark job
Expected behavior
Rollback the last delta_commit
Environment Description
Hudi version : 0.11.0
Spark version : 3.1.3
Hive version : 1.2.1000
Hadoop version : 2.7.3
Storage (HDFS/S3/GCS..) : HDFS
Running on Docker? (yes/no) : no
Hoodie timeline
Hudi conf
Stacktrace