apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.43k stars 2.42k forks source link

[SUPPORT] Multi Writing unable to acquire lock by using flink with diffrent LockProvider #10538

Closed hanson2021 closed 4 weeks ago

hanson2021 commented 10 months ago

(1)Environment Description

(2) My appication is that using flink enging run task from kafka source to hudi sink . I plan to using org.apache.hudi.utilities.HoodieClusteringJob async which need open multi_writing to merge small files .Tasks of my application run ok without multi_writing,but some exceptions occured when using multi_writing. Can anybody help me give some suggestions to solve it?

(3) My hudi sink table created as following. CREATE TABLE bidwhive.birt_test.ods_thasu_short_url_log_inc_test( uuid string PRIMARY KEY NOT ENFORCED, binlog_id bigint comment '日志id', id bigint comment '主键', short_id bigint comment '短链id', short_key string comment '短链后缀', short_base_key string comment '短链表basekey', user_agent string comment 'ua', user_ip string comment '用户ip', created_at string comment '创建时间', cdc_type string comment 'CDC类型', es string comment 'binlog执行的时间', ts string comment 'mario 获取到该数据将要发送到队列的时间', etl_time string comment 'etl同步时间', dt string comment '分区字段' ) PARTITIONED BY (dt) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://ahdpns/user/hive/warehouse/bi_test.db/ods_thasu_short_url_log_inc_test', 'table.type' = 'COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field' = 'uuid', 'write.precombine.field' = 'created_at', 'index.state.ttl' = '0.0', 'index.type' = 'FLINK_STATE', 'write.operation' = 'insert', 'write.task.max.size' = '1024', 'write.tasks' = '12', 'hoodie.parquet.compression.codec' = 'snappy', 'hoodie.cleaner.policy.failed.writes'='LAZY', 'hoodie.write.concurrency.mode' = 'optimistic_concurrency_control', 'hoodie.write.lock.wait_time_ms'='120000', -- 'hoodie.write.lock.provider' = 'org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider', -- 'hoodie.write.lock.zookeeper.url'='host01:2181,host02:2181,host03:2181', -- 'hoodie.write.lock.zookeeper.port'='2181', -- 'hoodie.write.lock.zookeeper.base_path'='/huditest', -- 'hoodie.write.lock.zookeeper.lock_key'='test', -- 'hoodie.write.lock.zookeeper.session_timeout_ms'='60000', -- 'hoodie.write.lock.provider'='org.apache.hudi.hive.transaction.lock.HiveMetastoreBasedLockProvider', -- 'hoodie.write.lock.hivemetastore.uris'='thrift://host02:9083,thrift://host01:9083,thrift://host03:9083', -- 'hoodie.write.lock.hivemetastore.database'='bi_test', -- 'hoodie.write.lock.hivemetastore.table'='testlock', 'hoodie.write.lock.provider'='org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider', -- 'hoodie.write.lock.filesystem.path'='hdfs://ahdpns/user/hive/warehouse/bi_test.db/ods_thasu_short_url_log_inc_test', -- default:hoodie.base.path+/.hoodie/lock 'hoodie.write.lock.filesystem.expire'='0', 'hive_sync.enable' = 'true', 'hive_sync.mode' = 'hms', 'hive_sync.metastore.uris' = 'thrift://host02:9083,thrift://host03:9083,thrift://host01:9083', 'hive_sync.table' = 'ods_thasu_short_url_log_inc_test', 'hive_sync.db' = 'bi_test') ;

(4) When I select FileSystemBasedLockProvider, the exception logs as following: 2024-01-19 17:20:25,056 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint 2 of job 53c0175fec27445402621d4a05d5d797 expired before completing. 2024-01-19 17:22:37,314 ERROR org.apache.hudi.sink.StreamWriteOperatorCoordinator [] - Executor executes action [commits the instant 20240119171545400] error org.apache.hudi.exception.HoodieLockException: Unable to acquire lock, lock object hdfs://ahdpns/user/hive/warehouse/bi_test.db/ods_thasu_short_url_log_inc_test/.hoodie/.aux/lock at org.apache.hudi.client.transaction.lock.LockManager.lock(LockManager.java:87) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.client.transaction.TransactionManager.beginTransaction(TransactionManager.java:53) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:232) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.client.HoodieFlinkWriteClient.commit(HoodieFlinkWriteClient.java:117) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.sink.StreamWriteOperatorCoordinator.doCommit(StreamWriteOperatorCoordinator.java:530) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.sink.StreamWriteOperatorCoordinator.commitInstant(StreamWriteOperatorCoordinator.java:506) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$notifyCheckpointComplete$2(StreamWriteOperatorCoordinator.java:242) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_191] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_191] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_191] 2024-01-19 17:22:37,330 INFO org.apache.hudi.sink.StreamWriteOperatorCoordinator [] - Executor executes action [taking checkpoint 2] success! 2024-01-19 17:22:37,342 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Trying to recover from a global failure. org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'hoodie_append_write: ods_thasu_short_url_log_inc_test' (operator e9bb29a2d1826b2ea3ef409fecfcbfde). at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:545) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$start$0(StreamWriteOperatorCoordinator.java:187) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.sink.utils.NonThrownExecutor.handleException(NonThrownExecutor.java:146) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:133) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_191] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_191] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_191] Caused by: org.apache.hudi.exception.HoodieException: Executor executes action [commits the instant 20240119171545400] error ... 6 more Caused by: org.apache.hudi.exception.HoodieLockException: Unable to acquire lock, lock object hdfs://ahdpns/user/hive/warehouse/bi_test.db/ods_thasu_short_url_log_inc_test/.hoodie/.aux/lock at org.apache.hudi.client.transaction.lock.LockManager.lock(LockManager.java:87) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.client.transaction.TransactionManager.beginTransaction(TransactionManager.java:53) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:232) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.client.HoodieFlinkWriteClient.commit(HoodieFlinkWriteClient.java:117) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.sink.StreamWriteOperatorCoordinator.doCommit(StreamWriteOperatorCoordinator.java:530) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.sink.StreamWriteOperatorCoordinator.commitInstant(StreamWriteOperatorCoordinator.java:506) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$notifyCheckpointComplete$2(StreamWriteOperatorCoordinator.java:242) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0]

(5)When I select ZookeeperBasedLockProvider, the exception logs as following: org.apache.hudi.exception.HoodieLockException: Unable to acquire lock, lock object at org.apache.hudi.client.transaction.lock.LockManager.lock(LockManager.java:75) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.client.HoodieFlinkWriteClient.writeTableMetadata(HoodieFlinkWriteClient.java:272) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:271) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:236) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.client.HoodieFlinkWriteClient.commit(HoodieFlinkWriteClient.java:117) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.sink.StreamWriteOperatorCoordinator.doCommit(StreamWriteOperatorCoordinator.java:530) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.sink.StreamWriteOperatorCoordinator.commitInstant(StreamWriteOperatorCoordinator.java:506) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$notifyCheckpointComplete$2(StreamWriteOperatorCoordinator.java:242) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_191] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_191] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_191] Caused by: org.apache.hudi.exception.HoodieLockException: FAILED_TO_ACQUIRE lock atZkBasePath = /huditest, lock key = test at org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider.tryLock(ZookeeperBasedLockProvider.java:101) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.client.transaction.lock.LockManager.lock(LockManager.java:67) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] ... 11 more Caused by: java.lang.IllegalArgumentException: ALREADY_ACQUIRED lock atZkBasePath = /huditest, lock key = test at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:40) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider.acquireLock(ZookeeperBasedLockProvider.java:140) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider.tryLock(ZookeeperBasedLockProvider.java:96) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.client.transaction.lock.LockManager.lock(LockManager.java:67) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0]

ad1happy2go commented 10 months ago

@hanson2021 This issue was fixed in later releases. Can you try 0.14.

ad1happy2go commented 9 months ago

@hanson2021 Did you got a chance to try with 0.14. Please let us know in case it still blocks you. Thanks.

ad1happy2go commented 4 weeks ago

Closing this as it was fixed in 0.14.X release.