apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.32k stars 2.42k forks source link

[SUPPORT] Flink-Hudi Unable to use Hudi metadata with S3 #11036

Open ChiehFu opened 5 months ago

ChiehFu commented 5 months ago

Describe the problem you faced

Hi,

I was creating a Flink SQL stream pipeline in AWS EMR to compact data into a Hudi COW table. Because of S3 slowdown errors that occasionally happened during Hudi writes, I tried to turn on the metadata table to eliminate S3 file listing but ran into the following exception saying S3 FS doesn't support atomic creation.

This issue seems particular related to Flink as I have another Spark/Hudi based batch pipeline running in the same type of EMR cluster, and Hudi metadata table functionality is working as expected with S3 FS.

Can you please help me with this issue?

Exception:

Caused by: org.apache.hudi.exception.HoodieLockException: Unsupported scheme :s3, since this fs can not support atomic creation
    at org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider.<init>(FileSystemBasedLockProvider.java:89) ~[hudi-flink1.17-bundle-0.14.1.jar:0.14.1]
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_402]
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[?:1.8.0_402]
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_402]
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_402]
    at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:73) ~[hudi-flink1.17-bundle-0.14.1.jar:0.14.1]
    at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:123) ~[hudi-flink1.17-bundle-0.14.1.jar:0.14.1]
    at org.apache.hudi.client.transaction.lock.LockManager.getLockProvider(LockManager.java:118) ~[hudi-flink1.17-bundle-0.14.1.jar:0.14.1]
    at org.apache.hudi.client.transaction.lock.LockManager.unlock(LockManager.java:109) ~[hudi-flink1.17-bundle-0.14.1.jar:0.14.1]
    at org.apache.hudi.client.HoodieFlinkTableServiceClient.initMetadataTable(HoodieFlinkTableServiceClient.java:216) ~[hudi-flink1.17-bundle-0.14.1.jar:0.14.1]
    at org.apache.hudi.client.HoodieFlinkWriteClient.initMetadataTable(HoodieFlinkWriteClient.java:318) ~[hudi-flink1.17-bundle-0.14.1.jar:0.14.1]
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.initMetadataTable(StreamWriteOperatorCoordinator.java:347) ~[hudi-flink1.17-bundle-0.14.1.jar:0.14.1]
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:192) ~[hudi-flink1.17-bundle-0.14.1.jar:0.14.1]
    at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:181) ~[flink-dist-1.17.1-amzn-1.jar:1.17.1-amzn-1]
    at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:165) ~[flink-dist-1.17.1-amzn-1.jar:1.17.1-amzn-1]
    at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82) ~[flink-dist-1.17.1-amzn-1.jar:1.17.1-amzn-1]
    at org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:618) ~[flink-dist-1.17.1-amzn-1.jar:1.17.1-amzn-1]
    at org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1130) ~[flink-dist-1.17.1-amzn-1.jar:1.17.1-amzn-1]
    at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:1047) ~[flink-dist-1.17.1-amzn-1.jar:1.17.1-amzn-1]
    at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:439) ~[flink-dist-1.17.1-amzn-1.jar:1.17.1-amzn-1]
    at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:198) ~[flink-dist-1.17.1-amzn-1.jar:1.17.1-amzn-1]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:622) ~[flink-rpc-akka_f5fd373f-2282-403d-a522-72b822a720aa.jar:1.17.1-amzn-1]
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_f5fd373f-2282-403d-a522-72b822a720aa.jar:1.17.1-amzn-1]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:621) ~[flink-rpc-akka_f5fd373f-2282-403d-a522-72b822a720aa.jar:1.17.1-amzn-1]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:190) ~[flink-rpc-akka_f5fd373f-2282-403d-a522-72b822a720aa.jar:1.17.1-amzn-1]
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) ~[flink-scala_2.12-1.17.1-amzn-1.jar:1.17.1-amzn-1]
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) ~[flink-scala_2.12-1.17.1-amzn-1.jar:1.17.1-amzn-1]
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) ~[flink-scala_2.12-1.17.1-amzn-1.jar:1.17.1-amzn-1]

To Reproduce

Flink SQL / Hudi configs

 'path'='...',
 'connector' = 'hudi',
 'table.type' = 'COPY_ON_WRITE',
 'precombine.field' = 'integ_key',
 'write.precombine' = 'true',
 'write.bucket_assign.tasks' = '10',
 'write.task.max.size' = '2014',
 'write.operation' = 'upsert',
 'hoodie.datasource.write.recordkey.field' = 'key',
 'write.parquet.max.file.size' = '240',
 'index.bootstrap.enabled' = 'false',
 'write.index_bootstrap.tasks' = '200',
 'metadata.enabled' = 'true'

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

ChiehFu commented 4 months ago

Hello, Could I please get some help on this issue? Also please let me know if any further information is needed. Thanks!

danny0405 commented 4 months ago

yeah, the mdt is not recommended for Flink 0.13.x now, we are striving to make it production ready for 1.x release.

ankit0811 commented 1 month ago

@danny0405 any update on when will this be released. I do see this PR https://github.com/apache/hudi/pull/11124 which enables metadata by default.

danny0405 commented 1 month ago

1.0-beta2 already released out, you can have a try, just be caution that 1.x has some format incompatibility with 0.x, we are trying to resolve those in 1.0 GA release.

ankit0811 commented 1 month ago

@danny0405 I tired using 1.0-beta2 and get s3a atomic writes not supported exception here is the stack trace

"message":"JobMaster for job 7c57d7e79fef591c171e8e487112be09 failed.","name":"org.apache.flink.util.FlinkException","cause":{"commonElementCount":14,"localizedMessage":"Could not start the JobMaster.","message":"Could not start the JobMaster.","name":"org.apache.flink.runtime.jobmaster.JobMasterException","cause":{"commonElementCount":14,"localizedMessage":"Failed to start the operator coordinators","message":"Failed to start the operator coordinators","name":"org.apache.flink.util.FlinkRuntimeException","cause":{"commonElementCount":14,"localizedMessage":"Unable to instantiate class org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider","message":"Unable to instantiate class org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider","name":"org.apache.hudi.exception.HoodieException","cause":{"commonElementCount":14,"name":"java.lang.reflect.InvocationTargetException","cause":{"commonElementCount":14,"localizedMessage":"Unsupported scheme :s3a, since this fs can not support atomic creation","message":"Unsupported scheme :s3a, since this fs can not support atomic creation","name":"org.apache.hudi.exception.HoodieLockException","extendedStackTrace":"org.apache.hudi.exception.HoodieLockException: Unsupported scheme :s3a, since this fs can not support atomic creation\n\tat org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider.<init>(FileSystemBasedLockProvider.java:90) ~[?:?]\n\tat jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]\n\tat jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]\n\tat jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]\n\tat java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]\n\tat org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:73) ~[flink-s3-fs-presto-1.15.2.jar:1.15.2]\n\tat org.apache.hudi.client.transaction.lock.LockManager.getLockProvider(LockManager.java:113) ~[?:?]\n\tat org.apache.hudi.client.transaction.lock.LockManager.unlock(LockManager.java:100) ~[?:?]\n\tat org.apache.hudi.client.HoodieFlinkTableServiceClient.initMetadataTable(HoodieFlinkTableServiceClient.java:221) ~[?:?]\n\tat org.apache.hudi.client.HoodieFlinkWriteClient.initMetadataTable(HoodieFlinkWriteClient.java:320) ~[?:?]\n\tat org.apache.hudi.sink.StreamWriteOperatorCoordinator.initMetadataTable(StreamWriteOperatorCoordinator.java:350) ~[?:?]\n\tat org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:195) ~[?:?]\n\tat org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:164) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:624) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1010) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:927) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:388) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:612) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:611) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:185) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n"},"extendedStackTrace":"java.lang.reflect.InvocationTargetException: null\n\tat jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]\n\tat jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]\n\tat jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]\n\tat java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]\n\tat org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:73) ~[flink-s3-fs-presto-1.15.2.jar:1.15.2]\n\tat org.apache.hudi.client.transaction.lock.LockManager.getLockProvider(LockManager.java:113) ~[?:?]\n\tat org.apache.hudi.client.transaction.lock.LockManager.unlock(LockManager.java:100) ~[?:?]\n\tat org.apache.hudi.client.HoodieFlinkTableServiceClient.initMetadataTable(HoodieFlinkTableServiceClient.java:221) ~[?:?]\n\tat org.apache.hudi.client.HoodieFlinkWriteClient.initMetadataTable(HoodieFlinkWriteClient.java:320) ~[?:?]\n\tat org.apache.hudi.sink.StreamWriteOperatorCoordinator.initMetadataTable(StreamWriteOperatorCoordinator.java:350) ~[?:?]\n\tat org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:195) ~[?:?]\n\tat org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:164) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:624) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1010) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:927) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:388) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:612) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:611) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:185) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\nCaused by: org.apache.hudi.exception.HoodieLockException: Unsupported scheme :s3a, since this fs can not support atomic creation\n\tat org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider.<init>(FileSystemBasedLockProvider.java:90) ~[?:?]\n\tat jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]\n\tat jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]\n\tat jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]\n\tat java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]\n\tat org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:73) ~[flink-s3-fs-presto-1.15.2.jar:1.15.2]\n\tat org.apache.hudi.client.transaction.lock.LockManager.getLockProvider(LockManager.java:113) ~[?:?]\n\tat org.apache.hudi.client.transaction.lock.LockManager.unlock(LockManager.java:100) ~[?:?]\n\tat org.apache.hudi.client.HoodieFlinkTableServiceClient.initMetadataTable(HoodieFlinkTableServiceClient.java:221) ~[?:?]\n\tat org.apache.hudi.client.HoodieFlinkWriteClient.initMetadataTable(HoodieFlinkWriteClient.java:320) ~[?:?]\n\tat org.apache.hudi.sink.StreamWriteOperatorCoordinator.initMetadataTable(StreamWriteOperatorCoordinator.java:350) ~[?:?]\n\tat org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:195) ~[?:?]\n\tat org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:164) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:624) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1010) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:927) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:388) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:612) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:611) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:185) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\t... 14 more\n"},"extendedStackTrace":"org.apache.hudi.exception.HoodieException: Unable to instantiate class org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider\n\tat org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:75) ~[flink-s3-fs-presto-1.15.2.jar:1.15.2]\n\tat org.apache.hudi.client.transaction.lock.LockManager.getLockProvider(LockManager.java:113) ~[?:?]\n\tat org.apache.hudi.client.transaction.lock.LockManager.unlock(LockManager.java:100) ~[?:?]\n\tat org.apache.hudi.client.HoodieFlinkTableServiceClient.initMetadataTable(HoodieFlinkTableServiceClient.java:221) ~[?:?]\n\tat org.apache.hudi.client.HoodieFlinkWriteClient.initMetadataTable(HoodieFlinkWriteClient.java:320) ~[?:?]\n\tat org.apache.hudi.sink.StreamWriteOperatorCoordinator.initMetadataTable(StreamWriteOperatorCoordinator.java:350) ~[?:?]\n\tat org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:195) ~[?:?]\n\tat org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:164) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:624) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1010) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:927) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:388) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:612) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:611) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:185) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\nCaused by: java.lang.reflect.InvocationTargetException\n\tat jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]\n\tat jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]\n\tat jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]\n\tat java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]\n\tat org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:73) ~[flink-s3-fs-presto-1.15.2.jar:1.15.2]\n\tat org.apache.hudi.client.transaction.lock.LockManager.getLockProvider(LockManager.java:113) ~[?:?]\n\tat org.apache.hudi.client.transaction.lock.LockManager.unlock(LockManager.java:100) ~[?:?]\n\tat org.apache.hudi.client.HoodieFlinkTableServiceClient.initMetadataTable(HoodieFlinkTableServiceClient.java:221) ~[?:?]\n\tat org.apache.hudi.client.HoodieFlinkWriteClient.initMetadataTable(HoodieFlinkWriteClient.java:320) ~[?:?]\n\tat org.apache.hudi.sink.StreamWriteOperatorCoordinator.initMetadataTable(StreamWriteOperatorCoordinator.java:350) ~[?:?]\n\tat org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:195) ~[?:?]\n\tat org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:164) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:624) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1010) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:927) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:388) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:612) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:611) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:185) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\t... 14 more\nCaused by: org.apache.hudi.exception.HoodieLockException: Unsupported scheme :s3a, since this fs can not support atomic creation\n\tat org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider.<init>(FileSystemBasedLockProvider.java:90) ~[?:?]\n\tat jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]\n\tat jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]\n\tat jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]\n\tat java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]\n\tat org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:73) ~[flink-s3-fs-presto-1.15.2.jar:1.15.2]\n\tat org.apache.hudi.client.transaction.lock.LockManager.getLockProvider(LockManager.java:113) ~[?:?]\n\tat org.apache.hudi.client.transaction.lock.LockManager.unlock(LockManager.java:100) ~[?:?]\n\tat org.apache.hudi.client.HoodieFlinkTableServiceClient.initMetadataTable(HoodieFlinkTableServiceClient.java:221) ~[?:?]\n\tat org.apache.hudi.client.HoodieFlinkWriteClient.initMetadataTable(HoodieFlinkWriteClient.java:320) ~[?:?]\n\tat org.apache.hudi.sink.StreamWriteOperatorCoordinator.initMetadataTable(StreamWriteOperatorCoordinator.java:350) ~[?:?]\n\tat org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:195) ~[?:?]\n\tat org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:164) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:624) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1010) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:927) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:388) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:612) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:611) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:185) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\t... 14 more\n"},"extendedStackTrace":"org.apache.flink.util.FlinkRuntimeException: Failed to start the operator coordinators\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:169) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:624) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1010) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:927) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:388) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:612) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:611) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:185) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\nCaused by: org.apache.hudi.exception.HoodieException: Unable to instantiate class org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider\n\tat org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:75) ~[flink-s3-fs-presto-1.15.2.jar:1.15.2]\n\tat org.apache.hudi.client.transaction.lock.LockManager.getLockProvider(LockManager.java:113) ~[?:?]\n\tat org.apache.hudi.client.transaction.lock.LockManager.unlock(LockManager.java:100) ~[?:?]\n\tat org.apache.hudi.client.HoodieFlinkTableServiceClient.initMetadataTable(HoodieFlinkTableServiceClient.java:221) ~[?:?]\n\tat org.apache.hudi.client.HoodieFlinkWriteClient.initMetadataTable(HoodieFlinkWriteClient.java:320) ~[?:?]\n\tat org.apache.hudi.sink.StreamWriteOperatorCoordinator.initMetadataTable(StreamWriteOperatorCoordinator.java:350) ~[?:?]\n\tat org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:195) ~[?:?]\n\tat org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:164) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:624) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1010) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:927) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:388) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:612) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:611) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:185) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\t... 14 more\nCaused by: java.lang.reflect.InvocationTargetException\n\tat jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]\n\tat jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]\n\tat jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]\n\tat java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]\n\tat org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:73) ~[flink-s3-fs-presto-1.15.2.jar:1.15.2]\n\tat org.apache.hudi.client.transaction.lock.LockManager.getLockProvider(LockManager.java:113) ~[?:?]\n\tat org.apache.hudi.client.transaction.lock.LockManager.unlock(LockManager.java:100) ~[?:?]\n\tat org.apache.hudi.client.HoodieFlinkTableServiceClient.initMetadataTable(HoodieFlinkTableServiceClient.java:221) ~[?:?]\n\tat org.apache.hudi.client.HoodieFlinkWriteClient.initMetadataTable(HoodieFlinkWriteClient.java:320) ~[?:?]\n\tat org.apache.hudi.sink.StreamWriteOperatorCoordinator.initMetadataTable(StreamWriteOperatorCoordinator.java:350) ~[?:?]\n\tat org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:195) ~[?:?]\n\tat org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:164) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:624) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1010) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:927) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:388) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:612) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:611) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:185) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\t... 14 more\nCaused by: org.apache.hudi.exception.HoodieLockException: Unsupported scheme :s3a, since this fs can not support atomic creation\n\tat org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider.<init>(FileSystemBasedLockProvider.java:90) ~[?:?]\n\tat jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]\n\tat jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]\n\tat jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]\n\tat java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]\n\tat org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:73) ~[flink-s3-fs-presto-1.15.2.jar:1.15.2]\n\tat org.apache.hudi.client.transaction.lock.LockManager.getLockProvider(LockManager.java:113) ~[?:?]\n\tat org.apache.hudi.client.transaction.lock.LockManager.unlock(LockManager.java:100) ~[?:?]\n\tat org.apache.hudi.client.HoodieFlinkTableServiceClient.initMetadataTable(HoodieFlinkTableServiceClient.java:221) ~[?:?]\n\tat org.apache.hudi.client.HoodieFlinkWriteClient.initMetadataTable(HoodieFlinkWriteClient.java:320) ~[?:?]\n\tat org.apache.hudi.sink.StreamWriteOperatorCoordinator.initMetadataTable(StreamWriteOperatorCoordinator.java:350) ~[?:?]\n\tat org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:195) ~[?:?]\n\tat org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:164) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:624) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1010) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:927) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:388) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:612) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:611) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:185) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\t... 14 more\n"},"extendedStackTrace":"org.apache.flink.runtime.jobmaster.JobMasterException: Could not start the JobMaster.\n\tat org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:390) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:612) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:611) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:185) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\nCaused by: org.apache.flink.util.FlinkRuntimeException: Failed to start the operator coordinators\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:169) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:624) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1010) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:927) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:388) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:612) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:611) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:185) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\t... 14 more\nCaused by: org.apache.hudi.exception.HoodieException: Unable to instantiate class org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider\n\tat org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:75) ~[flink-s3-fs-presto-1.15.2.jar:1.15.2]\n\tat org.apache.hudi.client.transaction.lock.LockManager.getLockProvider(LockManager.java:113) ~[?:?]\n\tat org.apache.hudi.client.transaction.lock.LockManager.unlock(LockManager.java:100) ~[?:?]\n\tat org.apache.hudi.client.HoodieFlinkTableServiceClient.initMetadataTable(HoodieFlinkTableServiceClient.java:221) ~[?:?]\n\tat org.apache.hudi.client.HoodieFlinkWriteClient.initMetadataTable(HoodieFlinkWriteClient.java:320) ~[?:?]\n\tat org.apache.hudi.sink.StreamWriteOperatorCoordinator.initMetadataTable(StreamWriteOperatorCoordinator.java:350) ~[?:?]\n\tat org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:195) ~[?:?]\n\tat org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:164) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:624) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1010) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:927) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:388) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:612) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:611) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:185) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\t... 14 more\nCaused by: java.lang.reflect.InvocationTargetException\n\tat jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]\n\tat jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]\n\tat jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]\n\tat java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]\n\tat org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:73) ~[flink-s3-fs-presto-1.15.2.jar:1.15.2]\n\tat org.apache.hudi.client.transaction.lock.LockManager.getLockProvider(LockManager.java:113) ~[?:?]\n\tat org.apache.hudi.client.transaction.lock.LockManager.unlock(LockManager.java:100) ~[?:?]\n\tat org.apache.hudi.client.HoodieFlinkTableServiceClient.initMetadataTable(HoodieFlinkTableServiceClient.java:221) ~[?:?]\n\tat org.apache.hudi.client.HoodieFlinkWriteClient.initMetadataTable(HoodieFlinkWriteClient.java:320) ~[?:?]\n\tat org.apache.hudi.sink.StreamWriteOperatorCoordinator.initMetadataTable(StreamWriteOperatorCoordinator.java:350) ~[?:?]\n\tat org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:195) ~[?:?]\n\tat org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:164) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:624) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1010) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:927) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:388) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:612) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:611) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:185) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\t... 14 more\nCaused by: org.apache.hudi.exception.HoodieLockException: Unsupported scheme :s3a, since this fs can not support atomic creation\n\tat org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider.<init>(FileSystemBasedLockProvider.java:90) ~[?:?]\n\tat jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]\n\tat jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]\n\tat jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]\n\tat java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]\n\tat org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:73) ~[flink-s3-fs-presto-1.15.2.jar:1.15.2]\n\tat org.apache.hudi.client.transaction.lock.LockManager.getLockProvider(LockManager.java:113) ~[?:?]\n\tat org.apache.hudi.client.transaction.lock.LockManager.unlock(LockManager.java:100) ~[?:?]\n\tat org.apache.hudi.client.HoodieFlinkTableServiceClient.initMetadataTable(HoodieFlinkTableServiceClient.java:221) ~[?:?]\n\tat org.apache.hudi.client.HoodieFlinkWriteClient.initMetadataTable(HoodieFlinkWriteClient.java:320) ~[?:?]\n\tat org.apache.hudi.sink.StreamWriteOperatorCoordinator.initMetadataTable(StreamWriteOperatorCoordinator.java:350) ~[?:?]\n\tat org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:195) ~[?:?]\n\tat org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:164) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:624) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1010) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:927) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:388) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:612) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:611) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:185) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\t... 14 more\n"},"extendedStackTrace":"org.apache.flink.util.FlinkException: JobMaster for job 7c57d7e79fef591c171e8e487112be09 failed.\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:1206) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:695) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$4(Dispatcher.java:617) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) ~[?:?]\n\tat java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source) ~[?:?]\n\tat java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) ~[?:?]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:443) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:443) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.actor.ActorCell.invoke(ActorCell.scala:548) [flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]\n\tat java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) [?:?]\n\tat java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]\n\tat java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]\n\tat java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]\nCaused by: org.apache.flink.runtime.jobmaster.JobMasterException: Could not start the JobMaster.\n\tat org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:390) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:612) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:611) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:185) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\t... 14 more\nCaused by: org.apache.flink.util.FlinkRuntimeException: Failed to start the operator coordinators\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:169) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:624) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1010) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:927) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:388) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:612) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:611) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:185) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\t... 14 more\nCaused by: org.apache.hudi.exception.HoodieException: Unable to instantiate class org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider\n\tat org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:75) ~[flink-s3-fs-presto-1.15.2.jar:1.15.2]\n\tat org.apache.hudi.client.transaction.lock.LockManager.getLockProvider(LockManager.java:113) ~[?:?]\n\tat org.apache.hudi.client.transaction.lock.LockManager.unlock(LockManager.java:100) ~[?:?]\n\tat org.apache.hudi.client.HoodieFlinkTableServiceClient.initMetadataTable(HoodieFlinkTableServiceClient.java:221) ~[?:?]\n\tat org.apache.hudi.client.HoodieFlinkWriteClient.initMetadataTable(HoodieFlinkWriteClient.java:320) ~[?:?]\n\tat org.apache.hudi.sink.StreamWriteOperatorCoordinator.initMetadataTable(StreamWriteOperatorCoordinator.java:350) ~[?:?]\n\tat org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:195) ~[?:?]\n\tat org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:164) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:624) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1010) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:927) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:388) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:612) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:611) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:185) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\t... 14 more\nCaused by: java.lang.reflect.InvocationTargetException\n\tat jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]\n\tat jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]\n\tat jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]\n\tat java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]\n\tat org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:73) ~[flink-s3-fs-presto-1.15.2.jar:1.15.2]\n\tat org.apache.hudi.client.transaction.lock.LockManager.getLockProvider(LockManager.java:113) ~[?:?]\n\tat org.apache.hudi.client.transaction.lock.LockManager.unlock(LockManager.java:100) ~[?:?]\n\tat org.apache.hudi.client.HoodieFlinkTableServiceClient.initMetadataTable(HoodieFlinkTableServiceClient.java:221) ~[?:?]\n\tat org.apache.hudi.client.HoodieFlinkWriteClient.initMetadataTable(HoodieFlinkWriteClient.java:320) ~[?:?]\n\tat org.apache.hudi.sink.StreamWriteOperatorCoordinator.initMetadataTable(StreamWriteOperatorCoordinator.java:350) ~[?:?]\n\tat org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:195) ~[?:?]\n\tat org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:164) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:624) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1010) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:927) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:388) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:612) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:611) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:185) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\t... 14 more\nCaused by: org.apache.hudi.exception.HoodieLockException: Unsupported scheme :s3a, since this fs can not support atomic creation\n\tat org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider.<init>(FileSystemBasedLockProvider.java:90) ~[?:?]\n\tat jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]\n\tat jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]\n\tat jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]\n\tat java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]\n\tat org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:73) ~[flink-s3-fs-presto-1.15.2.jar:1.15.2]\n\tat org.apache.hudi.client.transaction.lock.LockManager.getLockProvider(LockManager.java:113) ~[?:?]\n\tat org.apache.hudi.client.transaction.lock.LockManager.unlock(LockManager.java:100) ~[?:?]\n\tat org.apache.hudi.client.HoodieFlinkTableServiceClient.initMetadataTable(HoodieFlinkTableServiceClient.java:221) ~[?:?]\n\tat org.apache.hudi.client.HoodieFlinkWriteClient.initMetadataTable(HoodieFlinkWriteClient.java:320) ~[?:?]\n\tat org.apache.hudi.sink.StreamWriteOperatorCoordinator.initMetadataTable(StreamWriteOperatorCoordinator.java:350) ~[?:?]\n\tat org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:195) ~[?:?]\n\tat org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:164) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:624) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1010) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:927) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:388) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist-1.15.2.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:612) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:611) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:185) ~[flink-rpc-akka_8908cf19-c418-4000-9df1-cc2bb7ac9412.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-scala_2.12-1.15.2.jar:1.15.2]\n\t... 14 more\n"},"endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{},"threadId":172,"threadPriority":5}

Can you tell me what am I missing here?

danny0405 commented 1 month ago

You can try other lock provider impl.

ad1happy2go commented 1 month ago

@ankit0811 You can try adding s3a here to this config - https://hudi.apache.org/docs/configurations/#hoodiefsatomic_creationsupport

ankit0811 commented 1 month ago

@ad1happy2go that seems to have done the trick. Thanks

However, we did see another exception post above change

at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556)
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$start$0(StreamWriteOperatorCoordinator.java:199)
    at org.apache.hudi.sink.utils.NonThrownExecutor.handleException(NonThrownExecutor.java:142)
    at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:133)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.hudi.exception.HoodieException: Executor executes action [handle write metadata event for instant ] error
    ... 6 more
Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :files
    at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.handleUpsertPartition(BaseFlinkCommitActionExecutor.java:167)
    at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:101)
    at org.apache.hudi.table.action.commit.delta.FlinkUpsertPreppedDeltaCommitActionExecutor.execute(FlinkUpsertPreppedDeltaCommitActionExecutor.java:52)
    at org.apache.hudi.table.HoodieFlinkMergeOnReadTable.upsertPrepped(HoodieFlinkMergeOnReadTable.java:85)
    at org.apache.hudi.client.HoodieFlinkWriteClient.lambda$upsertPreppedRecords$4(HoodieFlinkWriteClient.java:168)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
    at java.base/java.util.HashMap$ValueSpliterator.forEachRemaining(Unknown Source)
    at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
    at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(Unknown Source)
    at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(Unknown Source)
    at java.base/java.util.stream.AbstractTask.compute(Unknown Source)
    at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
    at java.base/java.util.concurrent.ForkJoinTask.doInvoke(Unknown Source)
    at java.base/java.util.concurrent.ForkJoinTask.invoke(Unknown Source)
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateParallel(Unknown Source)
    at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
    at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source)
    at org.apache.hudi.client.HoodieFlinkWriteClient.upsertPreppedRecords(HoodieFlinkWriteClient.java:171)
    at org.apache.hudi.client.HoodieFlinkWriteClient.upsertPreppedRecords(HoodieFlinkWriteClient.java:75)
    at org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter.commitInternal(FlinkHoodieBackedTableMetadataWriter.java:162)
    at org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter.commit(FlinkHoodieBackedTableMetadataWriter.java:103)
    at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.processAndCommit(HoodieBackedTableMetadataWriter.java:962)
    at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.updateFromWriteStatuses(HoodieBackedTableMetadataWriter.java:1019)
    at org.apache.hudi.client.BaseHoodieClient.writeTableMetadata(BaseHoodieClient.java:289)
    at org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:287)
    at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:237)
    at org.apache.hudi.client.HoodieFlinkWriteClient.commit(HoodieFlinkWriteClient.java:112)
    at org.apache.hudi.client.HoodieFlinkWriteClient.commit(HoodieFlinkWriteClient.java:75)
    at org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:202)
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.doCommit(StreamWriteOperatorCoordinator.java:581)
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.commitInstant(StreamWriteOperatorCoordinator.java:557)
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.commitInstant(StreamWriteOperatorCoordinator.java:526)
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.initInstant(StreamWriteOperatorCoordinator.java:421)
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.handleBootstrapEvent(StreamWriteOperatorCoordinator.java:453)
    at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$handleEventFromOperator$4(StreamWriteOperatorCoordinator.java:294)
    at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130)
    ... 3 more
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.hudi.org.apache.hadoop.hbase.io.hfile.HFile
    at org.apache.hudi.common.util.HFileUtils.serializeRecordsToLogBlock(HFileUtils.java:210)
    at org.apache.hudi.common.table.log.block.HoodieHFileDataBlock.serializeRecords(HoodieHFileDataBlock.java:108)
    at org.apache.hudi.common.table.log.block.HoodieDataBlock.getContentBytes(HoodieDataBlock.java:148)
    at org.apache.hudi.common.table.log.HoodieLogFormatWriter.appendBlocks(HoodieLogFormatWriter.java:151)
    at org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:474)
    at org.apache.hudi.io.HoodieAppendHandle.doAppend(HoodieAppendHandle.java:445)
    at org.apache.hudi.table.action.commit.delta.BaseFlinkDeltaCommitActionExecutor.handleUpdate(BaseFlinkDeltaCommitActionExecutor.java:54)
    at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.handleUpsertPartition(BaseFlinkCommitActionExecutor.java:159)
    ... 40 more

The Hfile class seems to present in the flink hudi bundled jar so not sure whats causing this. Any pointers? we did try loading hbase-server lib in the class path on the server but that seems to be flaky.

Another issue that we keep on seeing is

org.apache.hudi.exception.HoodieException: Timeout(21000ms) while waiting for instant initialize
    at org.apache.hudi.sink.utils.TimeWait.waitFor(TimeWait.java:57)
    at org.apache.hudi.sink.common.AbstractStreamWriteFunction.instantToWrite(AbstractStreamWriteFunction.java:270)
    at org.apache.hudi.sink.append.AppendWriteFunction.initWriterHelper(AppendWriteFunction.java:138)
    at org.apache.hudi.sink.append.AppendWriteFunction.processElement(AppendWriteFunction.java:97)
    at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.base/java.lang.Thread.run(Unknown Source)

Any pointers or config that you seems we might have set incorrectly?

Thanks Ankit

ankit0811 commented 1 month ago

@ad1happy2go @danny0405 any pointers ?

ad1happy2go commented 1 month ago

Looks like library conflict Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.hudi.org.apache.hadoop.hbase.io.hfile.HFile

alberttwong commented 1 week ago

@ankit0811 what did you use to get pass the unsupported scheme s3a issue?