apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.32k stars 2.41k forks source link

[SUPPORT]java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes #7653

Open maikouliujian opened 1 year ago

maikouliujian commented 1 year ago

Tips before filing an issue

Describe the problem you faced when I write hudi cow table to aws s3 concurrently by spark api, the except (java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes) happen. I use hudi ZookeeperBasedLockProvider occ. A clear and concise description of the problem.

To Reproduce

Steps to reproduce the behavior:

1.Run 0.11.0 version Hudi write concurrently to aws s3. 2.use the below configuration. resultDF.write.format("hudi") .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE.key(), DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "uq_id,_track_id,event,_flush_time") .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "process_time") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "p_day,p_hour,p_region,p_type") .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key(), classOf[ComplexKeyGenerator].getName) .option(HoodieWriteConfig.TBL_NAME.key(), sinkHudiTable) .option(DataSourceWriteOptions.HIVE_URL.key(), "") .option(DataSourceWriteOptions.HIVE_DATABASE.key(), "default") .option(DataSourceWriteOptions.HIVE_TABLE.key(), sinkHudiTable) .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key(), "p_day,p_hour,p_region,p_type") .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getName) .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED.key(), "true") .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE.key(), "true") .option(HoodieIndexConfig.INDEX_TYPE.key(), HoodieIndex.IndexType.GLOBAL_BLOOM.name()) .option(HoodieCompactionConfig.CLEANER_POLICY.key(), HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name) .option(HoodieCompactionConfig.ASYNC_CLEAN.key(), "true") .option(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key(), "240") .option(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key(), "250") .option(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP.key(), "260") .option(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY.key(), HoodieFailedWritesCleaningPolicy.LAZY.name()) .option(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()) .option("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider") .option("hoodie.write.lock.zookeeper.url", "") .option("hoodie.write.lock.zookeeper.port", "2181") .option("hoodie.write.lock.zookeeper.lock_key", sinkHudiTable) .option("hoodie.write.lock.zookeeper.base_path", "/hudi_multiwriter") .option("hoodie.insert.shuffle.parallelism", "2") .option("hoodie.upsert.shuffle.parallelism", "2") .mode(SaveMode.Append) .save(s3outPath) 3. Expected behavior

Exception in thread "main" org.apache.hudi.exception.HoodieWriteConflictException: java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes

A clear and concise description of what you expected to happen.

Environment Description

Additional context

Add any other context about the problem here. the aws emr version is 6.7.0

Stacktrace

Add the stacktrace of the error. Exception in thread "main" org.apache.hudi.exception.HoodieWriteConflictException: java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes at org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy.resolveConflict(SimpleConcurrentFileWritesConflictResolutionStrategy.java:102) at org.apache.hudi.client.utils.TransactionUtils.lambda$resolveWriteConflictIfAny$0(TransactionUtils.java:85) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742) at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742) at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647) at org.apache.hudi.client.utils.TransactionUtils.resolveWriteConflictIfAny(TransactionUtils.java:79) at org.apache.hudi.client.SparkRDDWriteClient.preCommit(SparkRDDWriteClient.java:475) at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:233) at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:122) at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:678) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:313) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:165) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:115) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:112) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:108) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:83) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:108) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:95) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:93) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:136) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:303) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239) at EtlHudi2hudi.writeData(EtlHudi2hudi.scala:113) at EtlHudi2hudi.run(EtlHudi2hudi.scala:53) at EtlHudi2hudi$.main(EtlHudi2hudi.scala:32) at EtlHudi2hudi.main(EtlHudi2hudi.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1000) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1089) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1098) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes ... 64 more

danny0405 commented 1 year ago

Can you job recover automatically from the failure?

maikouliujian commented 1 year ago

Can you job recover automatically from the failure?

In my case,the exception is not root exception, so when this exception happens, my job is always running. so I can not catch the failure of the job. Do you have any other suggestions? thanks very much.

fengjian428 commented 1 year ago

this should be expected behavior when multiple writers write records into the same File group. What behavior do you want? @maikouliujian

maikouliujian commented 1 year ago

this should be expected behavior when multiple writers write records into the same File group. What behavior do you want? @maikouliujian

In my case,when this exception happens,my job is not failed,but always running . however ,the job can not finish . so how can i know the job runs correctly.

nsivabalan commented 1 year ago

how are you writing to hudi. Can you give us some reproducible script. Is it spark-datasource, or spark streaming, detalstreamer, spark-sql. atleast from spark-shell, when you are using spark-ds writer, we know the command fails on conflicts.

nsivabalan commented 1 year ago

looks like its a spark datasource write. how do you claim that the job does not fail? are you executing it from spark-shell and the command to write to hudi is just stuck?

nsivabalan commented 1 year ago

I tried multi-writers from two diff spark-shells, and one of them fails while writing to hudi.


scala> df2.write.format("hudi").
     |   options(getQuickstartWriteConfigs).
     |   option(PRECOMBINE_FIELD_OPT_KEY, "ts").
     |   option(RECORDKEY_FIELD_OPT_KEY, "uuid").
     |   option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
     |   option(TABLE_NAME, tableName).
     |   option("hoodie.write.concurrency.mode","optimistic_concurrency_control").
     |   option("hoodie.cleaner.policy.failed.writes","LAZY").
     |   option("hoodie.write.lock.provider","org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider").
     |   option("hoodie.write.lock.zookeeper.url","localhost:2181").
     |   option("hoodie.write.lock.zookeeper.port","2181").
     |   option("hoodie.write.lock.zookeeper.lock_key","locks").
     |   option("hoodie.write.lock.zookeeper.base_path","/tmp/locks/.lock").
     |   mode(Append).
     |   save(basePath)
warning: there was one deprecation warning; re-run with -deprecation for details
[Stage 14:>                                                         (0 + 3) / 3]# WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.]
23/01/23 10:00:20 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-hbase.properties,hadoop-metrics2.properties
org.apache.hudi.exception.HoodieWriteConflictException: java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes
  at org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy.resolveConflict(SimpleConcurrentFileWritesConflictResolutionStrategy.java:102)
  at org.apache.hudi.client.utils.TransactionUtils.lambda$resolveWriteConflictIfAny$0(TransactionUtils.java:85)
  at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
  at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742)
  at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742)
  at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
  at org.apache.hudi.client.utils.TransactionUtils.resolveWriteConflictIfAny(TransactionUtils.java:79)
  at org.apache.hudi.client.SparkRDDWriteClient.preCommit(SparkRDDWriteClient.java:491)
  at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:234)
  at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:126)
  at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:698)
  at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:343)
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145)
  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:696)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:305)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:291)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:249)
  ... 75 elided
Caused by: java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes
  ... 109 more

scala> 

Write to hudi fails and next command prompt it seen.

excerpt from my other shell which succeeded.

scala> df2.write.format("hudi").
     |   options(getQuickstartWriteConfigs).
     |   option(PRECOMBINE_FIELD_OPT_KEY, "ts").
     |   option(RECORDKEY_FIELD_OPT_KEY, "uuid").
     |   option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
     |   option(TABLE_NAME, tableName).
     |   option("hoodie.write.concurrency.mode","optimistic_concurrency_control").
     |   option("hoodie.cleaner.policy.failed.writes","LAZY").
     |   option("hoodie.write.lock.provider","org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider").
     |   option("hoodie.write.lock.zookeeper.url","localhost:2181").
     |   option("hoodie.write.lock.zookeeper.port","2181").
     |   option("hoodie.write.lock.zookeeper.lock_key","locks").
     |   option("hoodie.write.lock.zookeeper.base_path","/tmp/locks/.lock").
     |   mode(Append).
     |   save(basePath)
warning: one deprecation; for details, enable `:setting -deprecation' or `:replay -deprecation'
# WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.]
23/01/23 10:00:19 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-hbase.properties,hadoop-metrics2.properties

scala> 

If you can provide us w/ reproducible script, would be nice. as of now, its not reproducible from our end

nsivabalan commented 1 year ago

@maikouliujian : any updates please.

maikouliujian commented 1 year ago

how are you writing to hudi. Can you give us some reproducible script. Is it spark-datasource, or spark streaming, detalstreamer, spark-sql. atleast from spark-shell, when you are using spark-ds writer, we know the command fails on conflicts.

In my case,I run my job by spark-ds writer.

maikouliujian commented 1 year ago

looks like its a spark datasource write. how do you claim that the job does not fail? are you executing it from spark-shell and the command to write to hudi is just stuck?

In my case, I run my job by azkaban hourly scheduled. when multi jobs run, the azkaban job not fail, but always running . I see the detail log, the exception happends. Why my azkaban job can not fail?

maikouliujian commented 1 year ago

I tried multi-writers from two diff spark-shells, and one of them fails while writing to hudi.


scala> df2.write.format("hudi").
     |   options(getQuickstartWriteConfigs).
     |   option(PRECOMBINE_FIELD_OPT_KEY, "ts").
     |   option(RECORDKEY_FIELD_OPT_KEY, "uuid").
     |   option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
     |   option(TABLE_NAME, tableName).
     |   option("hoodie.write.concurrency.mode","optimistic_concurrency_control").
     |   option("hoodie.cleaner.policy.failed.writes","LAZY").
     |   option("hoodie.write.lock.provider","org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider").
     |   option("hoodie.write.lock.zookeeper.url","localhost:2181").
     |   option("hoodie.write.lock.zookeeper.port","2181").
     |   option("hoodie.write.lock.zookeeper.lock_key","locks").
     |   option("hoodie.write.lock.zookeeper.base_path","/tmp/locks/.lock").
     |   mode(Append).
     |   save(basePath)
warning: there was one deprecation warning; re-run with -deprecation for details
[Stage 14:>                                                         (0 + 3) / 3]# WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.]
23/01/23 10:00:20 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-hbase.properties,hadoop-metrics2.properties
org.apache.hudi.exception.HoodieWriteConflictException: java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes
  at org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy.resolveConflict(SimpleConcurrentFileWritesConflictResolutionStrategy.java:102)
  at org.apache.hudi.client.utils.TransactionUtils.lambda$resolveWriteConflictIfAny$0(TransactionUtils.java:85)
  at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
  at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742)
  at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742)
  at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
  at org.apache.hudi.client.utils.TransactionUtils.resolveWriteConflictIfAny(TransactionUtils.java:79)
  at org.apache.hudi.client.SparkRDDWriteClient.preCommit(SparkRDDWriteClient.java:491)
  at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:234)
  at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:126)
  at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:698)
  at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:343)
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145)
  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:696)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:305)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:291)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:249)
  ... 75 elided
Caused by: java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes
  ... 109 more

scala> 

Write to hudi fails and next command prompt it seen.

excerpt from my other shell which succeeded.

scala> df2.write.format("hudi").
     |   options(getQuickstartWriteConfigs).
     |   option(PRECOMBINE_FIELD_OPT_KEY, "ts").
     |   option(RECORDKEY_FIELD_OPT_KEY, "uuid").
     |   option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
     |   option(TABLE_NAME, tableName).
     |   option("hoodie.write.concurrency.mode","optimistic_concurrency_control").
     |   option("hoodie.cleaner.policy.failed.writes","LAZY").
     |   option("hoodie.write.lock.provider","org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider").
     |   option("hoodie.write.lock.zookeeper.url","localhost:2181").
     |   option("hoodie.write.lock.zookeeper.port","2181").
     |   option("hoodie.write.lock.zookeeper.lock_key","locks").
     |   option("hoodie.write.lock.zookeeper.base_path","/tmp/locks/.lock").
     |   mode(Append).
     |   save(basePath)
warning: one deprecation; for details, enable `:setting -deprecation' or `:replay -deprecation'
# WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.]
23/01/23 10:00:19 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-hbase.properties,hadoop-metrics2.properties

scala> 

If you can provide us w/ reproducible script, would be nice. as of now, its not reproducible from our end In hudi 0.11.0, it can not supports multiple writers on spark ds ?

tomyanth commented 1 year ago

I have the same issue of running multi-write in local console as well

tomyanth commented 1 year ago

image image

tomyanth commented 1 year ago

"""

Install https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop2.tgz

hadoop2.7 https://github.com/soumilshah1995/winutils/blob/master/hadoop-2.7.7/bin/winutils.exe

pyspark --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.1 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

VAR SPARK_HOME HADOOP_HOME

PATH %HAPOOP_HOME%\bin %SPARK_HOME%\bin

Complete Tutorials on HUDI https://github.com/soumilshah1995/Insert-Update-Read-Write-SnapShot-Time-Travel-incremental-Query-on-APache-Hudi-transacti/blob/main/hudi%20(1).ipynb """

import os import sys import uuid

import pyspark from pyspark.sql import SparkSession from pyspark import SparkConf, SparkContext from pyspark.sql.functions import col, asc, desc from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when from pyspark.sql.functions import from pyspark.sql.types import from datetime import datetime from functools import reduce from faker import Faker from faker import Faker

import findspark

import datetime

time = datetime.datetime.now() time = time.strftime("YMD%Y%m%dHHMMSSms%H%M%S%f")

SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.1 pyspark-shell" os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS os.environ['PYSPARK_PYTHON'] = sys.executable os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

findspark.init()

spark = SparkSession.builder\ .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \ .config('className', 'org.apache.hudi') \ .config('spark.sql.hive.convertMetastoreParquet', 'false') \ .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \ .config('spark.sql.warehouse.dir', 'file:///C:/tmp/spark_warehouse') \ .getOrCreate()

global faker

faker = Faker()

class DataGenerator(object):

@staticmethod
def get_data():
    return [
        (
            x,
            faker.name(),
            faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
            faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
            faker.random_int(min=10000, max=150000),
            faker.random_int(min=18, max=60),
            faker.random_int(min=0, max=100000),
            faker.unix_time()
        ) for x in range(5)
    ]

data = DataGenerator.get_data()

columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"] spark_df = spark.createDataFrame(data=data, schema=columns) print(spark_df.show())

db_name = "hudidb" table_name = "hudi_table" recordkey = 'emp_id' precombine = 'ts'

path = "file:///C:/tmp/spark_warehouse" method = 'upsert' table_type = "COPY_ON_WRITE"

hudi_options = { 'hoodie.table.name': table_name, 'hoodie.datasource.write.recordkey.field': 'emp_id', 'hoodie.datasource.write.table.name': table_name, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.precombine.field': 'ts', 'hoodie.upsert.shuffle.parallelism': 2, 'hoodie.insert.shuffle.parallelism': 2, 'hoodie.schema.on.read.enable' : 'true', # for changing column names

'hoodie.write.concurrency.mode':'optimistic_concurrency_control',     #added for zookeeper to deal with multiple source writes
'hoodie.cleaner.policy.failed.writes':'LAZY',
# 'hoodie.write.lock.provider':'org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider',

'hoodie.write.lock.provider':'org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider',
'hoodie.write.lock.zookeeper.url':'localhost',
'hoodie.write.lock.zookeeper.port':'2181',
'hoodie.write.lock.zookeeper.lock_key':'my_lock',
'hoodie.write.lock.zookeeper.base_path':'/hudi_locks',

}

print(""55) print("over-write") print(""55)

spark_df.write.format("hudi"). \ options(**hudi_options). \ mode("overwrite"). \ save(path)

print(""55) print("READ") print(""55) read_df = spark.read. \ format("hudi"). \ load(path) print(read_df.show())

impleDataUpd = [ (6, "This is APPEND4", "Sales", "RJ", 81000, 30, 23000, 827307999), (7, "This is APPEND4", "Engineering", "RJ", 79000, 53, 15000, 1627694678), ]

columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"] usr_up_df = spark.createDataFrame(data=impleDataUpd, schema=columns)

usr_up_df.write.format("hudi").options(**hudi_options).mode("append").save(path)

print(""55) print("READ") print(""55) read_df = spark.read. \ format("hudi"). \ load(path) print(read_df.show())

tomyanth commented 1 year ago

when I run the above code in 2 seperate notebooks to simulate the multi-write process, the error occurs. Just like what @maikouliujian had faced

ad1happy2go commented 1 year ago

@tomyanth I tried to run with 0.13.0 version and I didn't had any issues like spark-shell job getting stuck. Can you try with 0.13.0 if you still face the issue.

soumilshah1995 commented 1 year ago

try using DynamoDB as lock Table

soumilshah1995 commented 1 year ago
DYNAMODB_LOCK_TABLE_NAME = 'hudi-lock-table'
curr_session = boto3.session.Session()
curr_region = curr_session.region_name

def upsert_hudi_table(glue_database, table_name,
                      record_id, precomb_key, table_type, spark_df,
                      enable_partition, enable_cleaner, enable_hive_sync, enable_dynamodb_lock,
                      use_sql_transformer, sql_transformer_query,
                      target_path, index_type, method='upsert'):
    """
    Upserts a dataframe into a Hudi table.

    Args:
        glue_database (str): The name of the glue database.
        table_name (str): The name of the Hudi table.
        record_id (str): The name of the field in the dataframe that will be used as the record key.
        precomb_key (str): The name of the field in the dataframe that will be used for pre-combine.
        table_type (str): The Hudi table type (e.g., COPY_ON_WRITE, MERGE_ON_READ).
        spark_df (pyspark.sql.DataFrame): The dataframe to upsert.
        enable_partition (bool): Whether or not to enable partitioning.
        enable_cleaner (bool): Whether or not to enable data cleaning.
        enable_hive_sync (bool): Whether or not to enable syncing with Hive.
        use_sql_transformer (bool): Whether or not to use SQL to transform the dataframe before upserting.
        sql_transformer_query (str): The SQL query to use for data transformation.
        target_path (str): The path to the target Hudi table.
        method (str): The Hudi write method to use (default is 'upsert').
        index_type : BLOOM or GLOBAL_BLOOM
    Returns:
        None
    """
    # These are the basic settings for the Hoodie table
    hudi_final_settings = {
        "hoodie.table.name": table_name,
        "hoodie.datasource.write.table.type": table_type,
        "hoodie.datasource.write.operation": method,
        "hoodie.datasource.write.recordkey.field": record_id,
        "hoodie.datasource.write.precombine.field": precomb_key,
    }

    # These settings enable syncing with Hive
    hudi_hive_sync_settings = {
        "hoodie.parquet.compression.codec": "gzip",
        "hoodie.datasource.hive_sync.enable": "true",
        "hoodie.datasource.hive_sync.database": glue_database,
        "hoodie.datasource.hive_sync.table": table_name,
        "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
        "hoodie.datasource.hive_sync.use_jdbc": "false",
        "hoodie.datasource.hive_sync.mode": "hms",
    }

    # These settings enable automatic cleaning of old data
    hudi_cleaner_options = {
        "hoodie.clean.automatic": "true",
        "hoodie.clean.async": "true",
        "hoodie.cleaner.policy": 'KEEP_LATEST_FILE_VERSIONS',
        "hoodie.cleaner.fileversions.retained": "3",
        "hoodie-conf hoodie.cleaner.parallelism": '200',
        'hoodie.cleaner.commits.retained': 5
    }

    # These settings enable partitioning of the data
    partition_settings = {
        "hoodie.datasource.write.partitionpath.field": args['PARTITON_FIELDS'],
        "hoodie.datasource.hive_sync.partition_fields": args['PARTITON_FIELDS'],
        "hoodie.datasource.write.hive_style_partitioning": "true",
    }

    # Define a dictionary with the index settings for Hudi
    hudi_index_settings = {
        "hoodie.index.type": index_type,  # Specify the index type for Hudi
    }

    hudi_dynamo_db_based_lock = {
        'hoodie.write.concurrency.mode': 'optimistic_concurrency_control'
        , 'hoodie.cleaner.policy.failed.writes': 'LAZY'
        , 'hoodie.write.lock.provider': 'org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider'
        , 'hoodie.write.lock.dynamodb.table': DYNAMODB_LOCK_TABLE_NAME
        , 'hoodie.write.lock.dynamodb.partition_key': 'tablename'
        , 'hoodie.write.lock.dynamodb.region': '{0}'.format(curr_region)
        , 'hoodie.write.lock.dynamodb.endpoint_url': 'dynamodb.{0}.amazonaws.com'.format(curr_region)
        , 'hoodie.write.lock.dynamodb.billing_mode': 'PAY_PER_REQUEST'

    }

    hudi_file_size = {
        "hoodie.parquet.max.file.size": 512 * 1024 * 1024,  # 512MB
        "hoodie.parquet.small.file.limit": 104857600,  # 100MB
    }

    # Add the Hudi index settings to the final settings dictionary
    for key, value in hudi_index_settings.items():
        hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary

    for key, value in hudi_file_size.items():
        hudi_final_settings[key] = value

        # If partitioning is enabled, add the partition settings to the final settings
    if enable_partition == "True" or enable_partition == "true" or enable_partition == True:
        for key, value in partition_settings.items(): hudi_final_settings[key] = value

    # if DynamoDB based lock enabled use dynamodb as lock table
    if enable_dynamodb_lock == "True" or enable_dynamodb_lock == "true" or enable_dynamodb_lock == True:
        for key, value in hudi_dynamo_db_based_lock.items(): hudi_final_settings[key] = value

    # If data cleaning is enabled, add the cleaner options to the final settings
    if enable_cleaner == "True" or enable_cleaner == "true" or enable_cleaner == True:
        for key, value in hudi_cleaner_options.items(): hudi_final_settings[key] = value

    # If Hive syncing is enabled, add the Hive sync settings to the final settings
    if enable_hive_sync == "True" or enable_hive_sync == "true" or enable_hive_sync == True:
        for key, value in hudi_hive_sync_settings.items(): hudi_final_settings[key] = value

    # If there is data to write, apply any SQL transformations and write to the target path
    if spark_df.count() > 0:

        if use_sql_transformer == "True" or use_sql_transformer == "true" or use_sql_transformer == True:
            spark_df.createOrReplaceTempView("temp")
            spark_df = spark.sql(sql_transformer_query)

        # Replace null values in all columns with default value 'unknown'
        default_value = 'n/a'
        for column in spark_df.columns:
            spark_df = spark_df.na.fill(default_value)

        print("**************************************************************")
        print(spark_df.show())
        print("**************************************************************")

        spark_df.write.format("hudi"). \
            options(**hudi_final_settings). \
            mode("append"). \
            save(target_path)

you can use dynamoDB lock tables


DYNAMODB_LOCK_TABLE_NAME = 'hudi-lock-table'
curr_session = boto3.session.Session()
curr_region = curr_session.region_name

    hudi_dynamo_db_based_lock = {
        'hoodie.write.concurrency.mode': 'optimistic_concurrency_control'
        , 'hoodie.cleaner.policy.failed.writes': 'LAZY'
        , 'hoodie.write.lock.provider': 'org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider'
        , 'hoodie.write.lock.dynamodb.table': DYNAMODB_LOCK_TABLE_NAME
        , 'hoodie.write.lock.dynamodb.partition_key': 'tablename'
        , 'hoodie.write.lock.dynamodb.region': '{0}'.format(curr_region)
        , 'hoodie.write.lock.dynamodb.endpoint_url': 'dynamodb.{0}.amazonaws.com'.format(curr_region)
        , 'hoodie.write.lock.dynamodb.billing_mode': 'PAY_PER_REQUEST'

    }
Jason-liujc commented 1 year ago

We are encountering the same issue. After using DynamoDB as the lock table, we still see this error: java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes

What I observed:

  1. I have 4 EMR Spark clusters that writes to the same table. One by one, it fails with the above error. When I look at the DynamoDB lock history, I see locks constantly getting created and released.
  2. The DynamoDB lock is not at file level, but on the table level. So two Hudi jobs might try to write to the same files and one of them failure. It seems if there are a couple of concurrent jobs running at the same time writing to the same files, it'll go into some sort of failure storm, which might fail everything unless you set a really really high retry threshold.
SamarthRaval commented 11 months ago

Hello @Jason-liujc @maikouliujian

I am seeing exact error, and also using dynamoDB lock same way as last comment, were you guys able to figure out the work around for it ?

Or anything to fix this issue ?, facing something very similar.

Jason-liujc commented 11 months ago

The main thing we did was to change our Hudi table structure to avoid concurrent writes to the same partition as much as possible (batch workload together, sequence the job etc)

For us, the DynamoDB lock provider wasn't able to to do any write retries, so it just fails the Spark job. We increased the yarn and spark retry to automatically retry from the cluster side.

SamarthRaval commented 11 months ago

@Jason-liujc Can we just increase yarn.resourcemanager.am.max-attempts to higher number? so we can retry to run hudi again if somehow it fails on java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes.

SamarthRaval commented 11 months ago

I heard that DynamoDB lock provider doesn't work with retries, but zookeeper does ? If anyone has knowledge about this, would mind sharing here ?

Talking about this: https://github.com/apache/hudi/issues/9728#issuecomment-1765275004

Jason-liujc commented 11 months ago

Can't speak to what the official guidance from Hudi is at the moment (seems like they will rollout the non-blocking concurent write feature in version 1.0+).

We had to increase yarn.resourcemanager.am.max-attempts and spark.yarn.maxAppAttempts (the spark specific config) to make it retry more and reoganize our tables to reduce concurrent writes. Any other lock provider wasn't an option for us since we are running different jobs from different clusters.

subash-metica commented 9 months ago

I have similar issue when tried to perform clustering (as separate process) and stream happening at same time. Even after providing lock providers (Zookeeper) running on same cluster , why this behaviour happens ?

Job1:

spark-submit
--class org.apache.hudi.utilities.HoodieClusteringJob
--conf spark.executor.cores=2
--conf spark.executor.memory=4g
--conf spark.driver.cores=3
--conf spark.driver.memory=4g
--conf spark.executor.instances=2
--deploy-mode cluster
--master yarn
--conf spark.yarn.maxAppAttempts=2
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=3
--conf spark.network.timeout=300s
--jars /usr/lib/hudi/hudi-spark-bundle.jar /usr/lib/hudi/hudi-utilities-slim-bundle.jar
--mode scheduleAndExecute
--base-path <s3_path>
--table-name<table_name>
--spark-memory 4g
--hoodie-conf hoodie.write.concurrency.mode=optimistic_concurrency_control
--hoodie-conf hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
--hoodie-conf hoodie.write.lock.zookeeper.url=<emr_master_dns>
--hoodie-conf hoodie.write.lock.zookeeper.port=2181
--hoodie-conf hoodie.write.lock.zookeeper.base_path=/hudi_locks
--hoodie-conf hoodie.write.lock.zookeeper.lock_key=<table_name>
--hoodie-conf hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824
--hoodie-conf hoodie.clustering.plan.strategy.small.file.limit=629145600
--hoodie-conf hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy
--hoodie-conf hoodie.clustering.updates.strategy=org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy

Job 2: (Hudi Config)

DataSourceWriteOptions.OPERATION.key  -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
      HoodieWriteConfig.TBL_NAME.key        -> <table_name>,
      DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE",
      HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key -> WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name(),
      HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key -> HoodieFailedWritesCleaningPolicy.LAZY.name(),
      HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key -> classOf[ZookeeperBasedLockProvider].getName,
      HoodieLockConfig.ZK_CONNECT_URL.key -> <emr_master_dns>,
      HoodieLockConfig.ZK_PORT.key -> "2181",
      HoodieLockConfig.ZK_BASE_PATH.key -> "/hudi_locks",
      HoodieLockConfig.ZK_LOCK_KEY.key  -> <table_name>,
      DataSourceWriteOptions.RECORDKEY_FIELD.key -> Seq("col3", "col4"),
      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> Seq("col1", "col2"),
      DataSourceWriteOptions.PRECOMBINE_FIELD.key              -> Seq("ts").mkString(","),
      DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key       -> classOf[ComplexKeyGenerator].getName,
      // Metadata config
      HoodieMetadataConfig.ENABLE.key                    -> "true",
      DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true",
      // Table services config
      // Cleaning config
      HoodieCleanConfig.ASYNC_CLEAN.key -> "false",
      // Clustering config
      HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE.key -> "true",
      HoodieClusteringConfig.INLINE_CLUSTERING.key -> "false"
meatheadmike commented 2 weeks ago

I'm seeing this with flink using the hive metastore for locking as well. The worst part is that it seems to have wiped my table in the process!

danny0405 commented 2 weeks ago

Do you have multi writers, do you config the lock providers explicitly?

meatheadmike commented 2 weeks ago

Yes. This happens this seems to happens with multiple writers. I do specify the lock config:

         'compaction.delta_commits'='2',
         'connector' = 'hudi',
         'hive_sync.db'='{hudi_db}',
         'hive_sync.enable'='true',
         'hive_sync.jdbcurl'='{hive_jdbc_url}',
         'hive_sync.metastore.uris'='{hive_thrift_url}',
         'hive_sync.mode'='hms',
         'hive_sync.password'='',
         'hive_sync.table'='{hudi_table}',
         'hive_sync.username'='hive',
         'hoodie.cleaner.policy.failed.writes' = 'LAZY',
         'hoodie.database.name'='{hudi_db}',
         'hoodie.datasource.write.recordkey.field'='name',
         'hoodie.embed.timeline.server'='false',
         'hoodie.parquet.compression.codec' = 'snappy',
         'hoodie.payload.event.time.field' = 'event_time',
         'hoodie.payload.ordering.field' = 'event_time',
         'hoodie.table.name'='{hudi_table}',
         'hoodie.table.precombine.field' = 'event_time',
         'hoodie.table.recordkey.fields' = 'name',
         'hoodie.write.concurrency.mode' = 'optimistic_concurrency_control',
         'hoodie.write.lock.hivemetastore.database'='{hudi_db}',
         'hoodie.write.lock.hivemetastore.table'='{hudi_table}',
         'hoodie.write.lock.provider' = 'org.apache.hudi.hive.transaction.lock.HiveMetastoreBasedLockProvider',
         'path' = '{hudi_table_path}',
         'precombine.field'='event_time',
         'read.streaming.enabled'='true',
         'read.streaming.skip_compaction'='true',
         'read.tasks'='4',
         'recordkey.field'='name',
         'table.type' = 'MERGE_ON_READ',
         'write.operation' = 'upsert',
         'write.precombine.field' = 'event_time',
         'write.recordkey.field'='name',
         'write.tasks'='4'
danny0405 commented 2 weeks ago

You are using OCC, does the conflict really happens or not? If it happens, the job needs a retry to make the comit succeed.