apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.45k stars 2.43k forks source link

[SUPPORT] The run_clustering procedure is not working with the path parameter. #12193

Closed RameshkumarChikoti123 closed 2 weeks ago

RameshkumarChikoti123 commented 2 weeks ago

I am using the path parameter with run_clustering, but I'm encountering an error.

Expected behaviour Clustering should execute successfully.

Environment Description

Hudi version : 0.15.0 Spark version : 3.3.0 Storage : S3 Hive version : NA Running on Docker : Yes Hadoop version : 3.3.4

Steps to reproduce the behaviour:

#Queries that have been tried.

hudi_table_path = "s3a://bucket/var/hudipoc/clustering_poc/"
spark.sql(f"""
    CALL run_clustering(path => '{hudi_table_path}')
""").show()

spark.sql(f"""
    CALL run_clustering(path => '{hudi_table_path}', options => '
        hoodie.write.concurrency.mode=SINGLE_WRITER)
""").show()

Stacktrace:

Py4JJavaError: An error occurred while calling o98.sql.
: org.apache.hudi.exception.HoodieException: Unable to instantiate class org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider
    at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:75)
    at org.apache.hudi.client.transaction.lock.LockManager.getLockProvider(LockManager.java:125)
    at org.apache.hudi.client.transaction.lock.LockManager.unlock(LockManager.java:112)
    at org.apache.hudi.client.transaction.TransactionManager.endTransaction(TransactionManager.java:70)
    at org.apache.hudi.client.BaseHoodieTableServiceClient.scheduleTableService(BaseHoodieTableServiceClient.java:609)
    at org.apache.hudi.client.BaseHoodieWriteClient.scheduleTableService(BaseHoodieWriteClient.java:1216)
    at org.apache.hudi.client.BaseHoodieWriteClient.scheduleClusteringAtInstant(BaseHoodieWriteClient.java:1165)
    at org.apache.spark.sql.hudi.command.procedures.RunClusteringProcedure.call(RunClusteringProcedure.scala:163)
    at org.apache.spark.sql.hudi.command.CallProcedureHoodieCommand.run(CallProcedureHoodieCommand.scala:33)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
    at jdk.internal.reflect.GeneratedMethodAccessor206.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.reflect.InvocationTargetException
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
    at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:73)
    ... 49 more
Caused by: org.apache.hudi.exception.HoodieLockException: Unsupported scheme :s3a, since this fs can not support atomic creation
    at org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider.<init>(FileSystemBasedLockProvider.java:90)
    ... 55 more
danny0405 commented 2 weeks ago

just configure another lock provider class instead of the file system based one.

ad1happy2go commented 2 weeks ago

@RameshkumarChikoti123 clustering needs some lock provider to be configured. If you are sure there is no concurrent ingestion job running, one was is to configure to this property s3a hoodie.fs.atomic_creation.support S3 is currently not supported with lock provider as it doesn't allow atomic creation of objects. But it can be workaround for you in this case.

Although i do believe if you are setting hoodie.write.concurrency.mode to SINGLE_WRITER explicitly it should work as it shouldn't need any lock provider

RameshkumarChikoti123 commented 2 weeks ago

@ad1happy2go I am not running any jobs and have configured SINGLE_WRITER, but I am still seeing the same issue. Below is my SQL query. please let me know if any additional configuration is needed.

spark.sql(f""" CALL run_clustering(path => '{hudi_table_path}', options => ' hoodie.write.concurrency.mode=SINGLE_WRITER) """).show()

ad1happy2go commented 2 weeks ago

@RameshkumarChikoti123 Can you try this -

spark.sql(f""" CALL run_clustering(path => '{hudi_table_path}', options => ' hoodie.fs.atomic_creation.support=s3a') """).show()

RameshkumarChikoti123 commented 2 weeks ago

@ad1happy2go The configuration you provided is working. Thank you!"