Open GallonREX opened 1 year ago
with same data. if only hudi 0.12 can be multi-write successfully if only hudi 0.8 can be multi-write successfully Use hudi0.12 to single-write the existing hudi0.8 table successfully But using hudi0.12 to multi-write the existing hudi0.8 table failed Part of the same partition data is written between multiple writers
Generally multi-writering capablity means, both the writer can write concurrently only if they don't have overlapping data being ingested. for eg, if both are ingesting to two different partitions completely. If not, hudi may not be able to resolve the winner and hence will abort/fail one of the writer.
Its expected.
Can you clarify if two writers are writing non-overlapping data and still results in concurrent modification exception.
In the case of the same data, if hudi 0.8 is used for the newly created table, multiple writers can write successfully, and if hudi 0.12 is used for the newly created table, multiple writers can also write successfully. But using hudi 0.12 multi-writer to write to the existing hudi 0.8 table fails. I just used hudi 0.12 to directly write to an existing hudi 0.8 table. It is expected that the table version can be automatically upgraded. Can tables from hudi 0.8 to hudi 0.12 be automatically upgraded?
Generally multi-writering capablity means, both the writer can write concurrently only if they don't have overlapping data being ingested. for eg, if both are ingesting to two different partitions completely. If not, hudi may not be able to resolve the winner and hence will abort/fail one of the writer.
Its expected.
Can you clarify if two writers are writing non-overlapping data and still results in concurrent modification exception.
In the case of the same data, if hudi 0.8 is used for the newly created table, multiple writers can write successfully, and if hudi 0.12 is used for the newly created table, multiple writers can also write successfully. But using hudi 0.12 multi-writer to write to the existing hudi 0.8 table fails. I just used hudi 0.12 to directly write to an existing hudi 0.8 table. It is expected that the table version can be automatically upgraded. Can the tables from hudi 0.8 to hudi 0.12 be automatically upgraded?
@GallonREX The error what you getting Cannot resolve conflicts for overlapping writes
is normally comes when you try to update the same file group concurrently. This should not be depending on versions. Even 0.12 should fail if multiple writers try to write in same file group.
这是自动回复。谢谢您的邮件,您的邮件我已收到,我将尽快回复您。
@GallonREX : yes the table will get automatically upgraded when you pull in 0.12.x binary
are we good here, if yes can you close the ticket.
这是自动回复。谢谢您的邮件,您的邮件我已收到,我将尽快回复您。
Tips before filing an issue
Have you gone through our FAQs?
Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.
If you have triaged this as a bug, then file an issue directly.
Describe the problem you faced Upgrade from hudi 0.8 to hudi 0.12 Upgrade steps: Use the hudi 0.12 program to write to the table created by the existing hudi 0.8, and use automatic upgrade After writing to the 0.8 table, hudi 0.12 cannot be written by two writers at the same time
To Reproduce
Steps to reproduce the behavior:
1.Use hudi0.12 to write to existing hudi 0.8 tables 2.scala code:
articleDataframe .write.format("org.apache.hudi"). option("hoodie.insert.shuffle.parallelism", "264"). option("hoodie.upsert.shuffle.parallelism", "264"). option("hoodie.cleaner.policy.failed.writes", "LAZY") .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control") .option("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider") .option("hoodie.write.lock.zookeeper.url", "10.1.4.1,10.1.4.2,10.1.4.3") .option("hoodie.write.lock.zookeeper.port", "2181") .option("hoodie.write.lock.zookeeper.lock_key", "zycg_article_data_day_test08limit") .option("hoodie.write.lock.zookeeper.base_path", "/hudi_data_zycg_article_data_day_test") .option(RECORDKEY_FIELD.key(), "doc_id"). option(PARTITIONPATH_FIELD.key(), "partionpath"). option(PRECOMBINE_FIELD.key(), "publish_time"). option(TBL_NAME.key(), "hudi_test_tb"). mode(Append). save("hdfs://10.1.4.1:9000/data_center/hudidata/hudi_test_tb")
3.spark submit(hudi 0.12):bin/spark-submit \ --name hudi012_20220807 \ --class com.honeycomb.hudi.hudiimport.spark.ZhongyunImportHudiRecovery \ --master yarn --deploy-mode cluster \ --executor-memory 10g --driver-memory 5g --executor-cores 2 --num-executors 20 \ --queue default \ --jars /data/sas01/opt/module/hudi-0.12.0/packaging/hudi-spark-bundle/target/hudi-spark2.4-bundle_2.11-0.12.0.jar \ /data/sas01/crontabprogram2/zytongzhan2/honeycomb-hudi08-download-1.0-SNAPSHOT.jar \
Expected behavior update hudi 0.8 table to hudi 0.12 hudi 0.12 can write to the table at the same time without error
Environment Description
Additional context Use hudi0.12 to write to the existing hudi0.8 table hudi 0.8 hoodie.properties hoodie.table.precombine.field=publish_time hoodie.table.name=zycg_article_data_day_test08 hoodie.archivelog.folder=archived hoodie.table.type=COPY_ON_WRITE hoodie.table.version=1 hoodie.timeline.layout.version=1
hudi 0.12 hoodie.properties hoodie.table.precombine.field=publish_time hoodie.table.partition.fields=partionpath hoodie.table.type=COPY_ON_WRITE hoodie.archivelog.folder=archived hoodie.timeline.layout.version=1 hoodie.table.version=5 hoodie.table.metadata.partitions=files hoodie.table.recordkey.fields=doc_id hoodie.table.base.file.format=PARQUET hoodie.datasource.write.partitionpath.urlencode=false hoodie.table.name=zycg_article_data_day_test08 hoodie.table.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator hoodie.datasource.write.hive_style_partitioning=false hoodie.table.checksum=3536879415
Stacktrace
Add the stacktrace of the error.
23/02/11 23:01:42 INFO view.FileSystemViewManager: Creating remote first table view 23/02/11 23:01:42 INFO timeline.HoodieActiveTimeline: Loaded instants upto : Option{val=[20230211225240790rollbackCOMPLETED]} 23/02/11 23:01:42 INFO transaction.SimpleConcurrentFileWritesConflictResolutionStrategy: Found conflicting writes between first operation = {actionType=commit, instantTime=20230211224333389, actionState=INFLIGHT'}, second operation = {actionType=commit, instantTime=20230211224251755, actionState=COMPLETED'} , intersecting file ids [29d8e24e-f5c5-43b5-a10e-2240cc51dda0-0, 3ac5d5f6-df53-4f81-848a-316ca38107b6-0, cb1f2488-d860-4d08-aa2a-134ba89558e3-0, ea157114-677d-4011-8c63-84af3b2526e5-0, f5301297-6e18-4166-8f56-a853b5d6485b-0, f124d9a9-f04e-4655-8a4b-c45fa357b38f-0, a4681446-fb69-4ebd-a121-13323fdb62a5-0, d48017c8-56cb-4172-a92a-5caf08d605a6-0, a6fc9c73-dc74-47ad-8085-ec63915b534b-0, 637d35d4-c492-4236-955b-ce3c515cf7ee-0, 836bd77a-55f4-46bb-9235-c880a5b8daf7-0, ba0aa8be-c5f3-4a21-bec9-1d5931cea051-0, 73a9fc8e-9247-410a-aa0e-8754aa9c750e-0, 4a99458f-8cd9-4752-a429-9334c98c599b-0, 4db57399-44f9-4823-aa82-f0733319a84b-0, e1522b8c-9a8b-4a7a-ad0c-847d4d7a548d-0, de67ca47-4532-4b98-8e4a-44d6d6281806-0, 344f1459-d787-4aac-9396-32dd0fe45532-0, 20c053ea-5f75-4912-a22f-e2d6b7b85189-0, 342ac19e-74db-46b7-b4e2-1158c46dd433-0, b9d88fa0-ace3-40b0-be07-8818dd4fb3d3-0, be557599-26ac-4bf7-a120-849f4a65a48e-0, 956d20a6-ccbc-42b0-8a96-c3bbf766f0ff-0, 20ce19f3-e523-4f97-8258-049ed347b7ff-0, 58ce6c4e-1599-4f64-a933-207b22a485f9-0, dac58caf-7a06-4a57-81ef-b8fbd69b0fdb-0, 1f1aacd7-4a4f-4821-8efe-9afdee5c4370-0, b9c3a4fc-0ccb-4eb0-b3c2-ad7c89493d1d-0, c2a7bbef-bd41-4374-a181-6e08a9a177c2-0, 7a99f25d-c58a-4451-a2e9-f49980122cbb-0] 23/02/11 23:01:42 INFO utils.TransactionUtils: Conflict encountered between current instant = {actionType=commit, instantTime=20230211224333389, actionState=INFLIGHT'} and instant = {actionType=commit, instantTime=20230211224251755, actionState=COMPLETED'}, attempting to resolve it... 23/02/11 23:01:42 INFO transaction.TransactionManager: Transaction ending with transaction owner Option{val=[==>20230211224333389commitINFLIGHT]} 23/02/11 23:01:42 INFO lock.ZookeeperBasedLockProvider: RELEASING lock atZkBasePath = /hudi_data_zycg_article_data_day_test, lock key = zycg_article_data_day_test08limit 23/02/11 23:01:42 INFO lock.ZookeeperBasedLockProvider: RELEASED lock atZkBasePath = /hudi_data_zycg_article_data_day_test, lock key = zycg_article_data_day_test08limit 23/02/11 23:01:42 INFO transaction.TransactionManager: Transaction ended with transaction owner Option{val=[==>20230211224333389commitINFLIGHT]} 23/02/11 23:01:42 ERROR yarn.ApplicationMaster: User class threw exception: org.apache.hudi.exception.HoodieWriteConflictException: java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes org.apache.hudi.exception.HoodieWriteConflictException: java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes at org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy.resolveConflict(SimpleConcurrentFileWritesConflictResolutionStrategy.java:102) at org.apache.hudi.client.utils.TransactionUtils.lambda$resolveWriteConflictIfAny$0(TransactionUtils.java:85) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374) at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742) at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742) at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580) at org.apache.hudi.client.utils.TransactionUtils.resolveWriteConflictIfAny(TransactionUtils.java:79) at org.apache.hudi.client.SparkRDDWriteClient.preCommit(SparkRDDWriteClient.java:473) at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:235) at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:122) at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:701) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:336) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:183) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229) at com.honeycomb.hudi.hudiimport.spark.ZhongyunImportHudiRecovery$.main(ZhongyunImportHudiRecovery.scala:457) at com.honeycomb.hudi.hudiimport.spark.ZhongyunImportHudiRecovery.main(ZhongyunImportHudiRecovery.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:685) Caused by: java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes ... 41 more 23/02/11 23:01:42 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: org.apache.hudi.exception.HoodieWriteConflictException: java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes at org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy.resolveConflict(SimpleConcurrentFileWritesConflictResolutionStrategy.java:102) at org.apache.hudi.client.utils.TransactionUtils.lambda$resolveWriteConflictIfAny$0(TransactionUtils.java:85) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374) at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742) at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742) at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580) at org.apache.hudi.client.utils.TransactionUtils.resolveWriteConflictIfAny(TransactionUtils.java:79) at org.apache.hudi.client.SparkRDDWriteClient.preCommit(SparkRDDWriteClient.java:473) at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:235) at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:122) at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:701) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:336) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:183) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229) at com.honeycomb.hudi.hudiimport.spark.ZhongyunImportHudiRecovery$.main(ZhongyunImportHudiRecovery.scala:457) at com.honeycomb.hudi.hudiimport.spark.ZhongyunImportHudiRecovery.main(ZhongyunImportHudiRecovery.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:685) Caused by: java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes ... 41 more )