apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.2k stars 2.38k forks source link

[SUPPORT] Hudi 0.10.1 raises exception java.lang.NoClassDefFoundError: com/amazonaws/services/dynamodbv2/model/LockNotGrantedException #5451

Open jdattani opened 2 years ago

jdattani commented 2 years ago

Describe the problem you faced

Using DynamoDB as the lock provider for concurrent writes results in an error stating java.lang.NoClassDefFoundError: com/amazonaws/services/dynamodbv2/model/LockNotGrantedException

To Reproduce

Steps to reproduce the behaviour:

Expected behavior

Job is able to acquire lock.

Environment Description

Additional context

Using on Glue 3.0. Dynamo DB table is already created manually and role assigned to the job has all the permissions to operate on the table.

'hoodie.write.concurrency.mode': 'optimistic_concurrency_control',
'hoodie.cleaner.policy.failed.writes': 'LAZY',
'hoodie.write.lock.dynamodb.endpoint_url': 'dynamodb.us-east-1.amazonaws.com',
'hoodie.write.lock.provider': 'org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider',
'hoodie.write.lock.dynamodb.table': '<TABLE_NAME>',
'hoodie.write.lock.dynamodb.partition_key': '<KEY_NAME>',
'hoodie.write.lock.dynamodb.region': 'us-east-1',

Tried both with and without providing "hoodie.write.lock.dynamodb.endpoint_url"

Jars included:

extra-jars/hudi-spark3.1.2-bundle_2.12-0.10.1.jar extra-jars/spark-avro_2.12-3.1.2.jar

Job runs fine without concurrency mode configurations.

Stacktrace

2022-04-27 14:13:05,812 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(73)): Error from Python:Traceback (most recent call last):
  File "/tmp/glue_process_bundle.py", line 17, in <module>
    start_process(glue_ctx, config, glue_catalog_svc)
  File "/tmp/glue_process_bundle.zip/jobs/process.py", line 180, in start_signal_process
    load(final_df, config)
  File "/tmp/glue_process_bundle.zip/jobs/process.py", line 99, in load
    df.write.format("hudi").options(**hudi_options).mode("append").save(config.params.processed_bucket)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1109, in save
    self._jwrite.save(path)
  File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o255.save.
: java.lang.NoClassDefFoundError: com/amazonaws/services/dynamodbv2/model/LockNotGrantedException
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:264)
    at org.apache.hudi.common.util.ReflectionUtils.getClass(ReflectionUtils.java:54)
    at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:89)
    at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:100)
    at org.apache.hudi.client.transaction.lock.LockManager.getLockProvider(LockManager.java:91)
    at org.apache.hudi.client.transaction.lock.LockManager.unlock(LockManager.java:83)
    at org.apache.hudi.client.transaction.TransactionManager.endTransaction(TransactionManager.java:71)
    at org.apache.hudi.client.SparkRDDWriteClient.getTableAndInitCtx(SparkRDDWriteClient.java:445)
    at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:157)
    at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:217)
    at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:277)
    at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: com.amazonaws.services.dynamodbv2.model.LockNotGrantedException
    at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 51 more

Since this is NoClassDefFoundError, was wondering if there are some additional sdk jars that I need to include to use this functionality?

Thanks.

kazdy commented 2 years ago

Hi, I would add this jar to your job: https://mvnrepository.com/artifact/org.apache.hudi/hudi-aws

I think these dependencies were separated from Spark bundle in Hudi 0.10.1 release: https://github.com/apache/hudi/pull/4542

yihua commented 2 years ago

@jdattani As @kazdy suggested, you need to add hudi-aws jar for classes that are specific to AWS. Let us know if adding the jar works.

jdattani commented 2 years ago

@kazdy @yihua I tried including hudi-aws jar to Glue Dependent JARs path. But still getting the exact same error. Is there anything else I can try?

jdattani commented 2 years ago

Update: I ended up including all three jars removed from here into Glue Dependent Jars path https://github.com/apache/hudi/pull/4542

com.amazonaws:dynamodb-lock-client com.amazonaws:aws-java-sdk-dynamodb com.amazonaws:aws-java-sdk-core

The error changed, now getting: An error occurred while calling o255.save. Unable to acquire lock, lock object null. Issue similar to the one mentioned here: https://github.com/apache/hudi/issues/4456 and using the same config as suggested in that. The only difference against what is suggested in the issue 4456, is that I have table already created.

2022-04-28 10:00:19,695 ERROR [spark-listener-group-eventLog] scheduler.AsyncEventQueue (Logging.scala:logError(94)): Listener EventLoggingListener threw an exception
java.util.ConcurrentModificationException
    at java.util.Hashtable$Enumerator.next(Hashtable.java:1408)
    at scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:424)
    at scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:420)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)

File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o255.save.
: **org.apache.hudi.exception.HoodieLockException: Unable to acquire lock, lock object null**
    at org.apache.hudi.client.transaction.lock.LockManager.lock(LockManager.java:76)
    at org.apache.hudi.client.transaction.TransactionManager.beginTransaction(TransactionManager.java:51)
    at org.apache.hudi.client.SparkRDDWriteClient.getTableAndInitCtx(SparkRDDWriteClient.java:430)
    at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:157)
    at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:217)
    at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:277)
    at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750)
yihua commented 2 years ago

@umehrot2 Do you know the right setup for using DynamoDB as the lock provider?

yihua commented 2 years ago

@jdattani Could you add the Hudi config below, bumping the retries, and try again? It is likely due to transient error.

hoodie.write.lock.client.num_retries=10
jdattani commented 2 years ago

@yihua I tried adding retries as you suggested, still got the same error msg: Unable to acquire lock, lock object null.

Additional detail on the error states : "The provided key element does not match the schema "

'hoodie.write.concurrency.mode': 'optimistic_concurrency_control',
'hoodie.cleaner.policy.failed.writes': 'LAZY',
'hoodie.write.lock.dynamodb.endpoint_url': 'dynamodb.us-east-1.amazonaws.com',
'hoodie.write.lock.provider': 'org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider',
'hoodie.write.lock.dynamodb.table': 'hudi-lock-provider,
'hoodie.write.lock.dynamodb.partition_key': 'tablename',
'hoodie.write.lock.dynamodb.region': 'us-east-1',
'hoodie.write.lock.wait_time_ms':'1500',
'hoodie.write.lock.num_retries':'10'

I see it retried the operation 3 times.

INFO [Thread-8] lock.LockManager (LockManager.java:getLockProvider(90)): LockProvider org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider
INFO [Thread-8] lock.DynamoDBBasedLockProvider (DynamoDBBasedLockProvider.java:tryLock(105)): ACQUIRING lock at DynamoDb table = hudi-lock-provider, partition key = tablename

2022-04-29 07:55:09,998 WARN [Thread-8] dynamodbv2.AmazonDynamoDBLockClient (AmazonDynamoDBLockClient.java:acquireLock(534)): Could not acquire lock because of a client side failure in talking to DDB
com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: The provided key element does not match the schema (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ValidationException; Request ID: 6EVA0VDQKC3Q7HNI6NNCCL67MJVV4KQNSO5AEMVJF66Q9ASUAAJG; Proxy: null)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
    at 

2022-04-29 07:55:11,010 WARN [Thread-8] dynamodbv2.AmazonDynamoDBLockClient (AmazonDynamoDBLockClient.java:acquireLock(534)): Could not acquire lock because of a client side failure in talking to DDB
com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: The provided key element does not match the schema (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ValidationException; Request ID: M630K6QR1GRQ8UNJKRV1CGNQAJVV4KQNSO5AEMVJF66Q9ASUAAJG; Proxy: null)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)

2022-04-29 07:55:12,018 WARN [Thread-8] dynamodbv2.AmazonDynamoDBLockClient (AmazonDynamoDBLockClient.java:acquireLock(534)): Could not acquire lock because of a client side failure in talking to DDB
com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: The provided key element does not match the schema (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ValidationException; Request ID: N1U0BAITEL1G6Q9HH4UJKO2VERVV4KQNSO5AEMVJF66Q9ASUAAJG; Proxy: null)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)

I checked the CloudTrail logs and could see a successful "DescribeTable" call from Glue Job to the DynamoDB lock table. So that rules out any IAM related issues.

The table has only one field - partition key "tablename" as seen in screenshot below

image

I also manually inserted a dummy value in the table.

image

Do we also need to define a sort key on the table?

yihua commented 2 years ago

@umehrot2 It looks like that DynamoDB throws validation exception when it is used as the lock provider. Do you have any idea on this?

umehrot2 commented 2 years ago

Did you manually create the DynamoDB table ? The code needs a string attribute name key in that table and thats it. Not sure where tableName attribute came from. I am guessing you have added it manually.

I would recommend deleting the table and let Hudi code setup the DynamoDB table for you. And regarding the other errors regarding class not found, yes it seems those jars have been removed from the bundle. Simply adding hudi-aws jar will not help, since it is not a bundle. I will look into why such a change was made, but for now you have to workaround by adding those jars yourself.

yihua commented 2 years ago

Thanks, @umehrot2 for the information. Do you think we can improve the document around how to use DynamoDB-based lock provider?

jdattani commented 2 years ago

@umehrot2 Thanks for your inputs. Yes, the table was manually created.

Since partition_key is an attribute in hoodie config, I assumed it could be any name and hudi would read it from config. https://hudi.apache.org/docs/concurrency_control/

hoodie.write.lock.provider=org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider hoodie.write.lock.dynamodb.table hoodie.write.lock.dynamodb.partition_key hoodie.write.lock.dynamodb.region

Nevertheless I will try changing the partition key field name and update if that solves the issue.

umehrot2 commented 2 years ago

@jdattani I see how it can be a bit confusing. The partition_key here actually means the value to use for the column, and not the column name itself. The column name is fixed to be key.

umehrot2 commented 2 years ago

Created a couple of Jiras to improve the naming/docs and to package the jars in a bundle so customers don't have to pass these AWS specific jars manually:

https://issues.apache.org/jira/browse/HUDI-4011 https://issues.apache.org/jira/browse/HUDI-4010

yihua commented 2 years ago

@jdattani Have you tried the suggestion and see if it works? Feel free to close this issue once all good.

jdattani commented 2 years ago

@yihua @umehrot2 The issue was resolved after defining dynamodb partition key name to "key". Thanks for your inputs.

jtmzheng commented 1 year ago

@kazdy @yihua I tried including hudi-aws jar to Glue Dependent JARs path. But still getting the exact same error. Is there anything else I can try?

This seems currently broken wrt https://github.com/apache/hudi/issues/5451#issuecomment-1111800234 using the hudi-aws-bundle (same issue). Ran into it on EMR 6.9 (/usr/lib/hudi/hudi-aws-bundle-0.12.1-amzn-0.jar)

ad1happy2go commented 1 year ago

@jtmzheng I was not able to reproduce the bug. I tried with both versions 0.12.2 and master code. I was able to successfully use DynamoDB without any issues with spark bundle and aws bundle.

Can you please let us know the steps to reproduce how you got that error again.

I built the jars and put into emr cluster.

spark-shell \
--jars jars/hudi-spark3.2-bundle_2.12-0.12.2.jar,jars/hudi-aws-bundle-0.12.2.jar \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
df.write.format("hudi").
       options(getQuickstartWriteConfigs).
       option(OPERATION_OPT_KEY, "upsert").
       option(PRECOMBINE_FIELD_OPT_KEY, "ts").
       option(RECORDKEY_FIELD_OPT_KEY, "uuid").
       option("hoodie.write.concurrency.mode","optimistic_concurrency_control").
       option("hoodie.cleaner.policy.failed.writes","LAZY").
       option("hoodie.write.lock.filesystem.path","/Users/<username>/").
       option("hoodie.write.lock.provider","org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider").
       option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
       option(TABLE_NAME, tableName).
       mode(Append).
       save(basePath)
ad1happy2go commented 1 year ago

Also I have tried with both Glue 3.0 and Glue 4.0 with default "--datalake-formats hudi" and both of them are working fine with dynamo db concurrency.