apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.39k stars 2.42k forks source link

[SUPPORT] Pyspark with Hudi is not able to access GlueCatalog on EMR #5053

Closed worf0815 closed 2 years ago

worf0815 commented 2 years ago

Describe the problem you faced

Running pyspark on AWS EMR 6.5.0 Cluster with Hudi Enabled results in an exception when trying to access the glue catalog.

To Reproduce

Steps to reproduce the behavior:

  1. Start pyspark shell pyspark --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
  2. Configure session to utilize Hive support
    spark = SparkSession.builder \
    .appName("job_name") \
    .config(conf=conf) \
    .enableHiveSupport() \
    .getOrCreate()
  3. Connect to database or set as current database spark.catalog.setCurrentDatabase("mydatabase")
  4. Exception java.lang.NoSuchMethodError: com.amazonaws.transform.JsonUnmarshallerContext.getCurrentToken()Lcom/amazonaws/thirdparty/jackson/core/JsonToken; is thrown

Expected behavior

Without specifiying any of the Hudi Jars or options, pyspark is able to connect to the glue catalog. This should be also possible with Hudi.

Environment Description

Additional context

Used EMR 6.5.0 and started pyspark shell according to https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hudi-work-with-dataset.html

Stacktrace

>>> spark.catalog.setCurrentDatabase("mydatabase")

22/03/16 13:12:14 WARN CredentialsLegacyConfigLocationProvider: Found the legacy config profiles file at [/home/hadoop/.aws/config]. Please move it to the latest default location [~/.aws/credentials].
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/spark/python/pyspark/sql/catalog.py", line 53, in setCurrentDatabase
    return self._jcatalog.setCurrentDatabase(dbName)
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o230.setCurrentDatabase.
: java.lang.NoSuchMethodError: com.amazonaws.transform.JsonUnmarshallerContext.getCurrentToken()Lcom/amazonaws/thirdparty/jackson/core/JsonToken;
        at com.amazonaws.services.glue.model.transform.GetDatabaseResultJsonUnmarshaller.unmarshall(GetDatabaseResultJsonUnmarshaller.java:39)
        at com.amazonaws.services.glue.model.transform.GetDatabaseResultJsonUnmarshaller.unmarshall(GetDatabaseResultJsonUnmarshaller.java:29)
        at com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:118)
        at com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:43)
        at com.amazonaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:69)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1734)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleSuccessResponse(AmazonHttpClient.java:1454)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1369)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
        at com.amazonaws.services.glue.AWSGlueClient.doInvoke(AWSGlueClient.java:10640)
        at com.amazonaws.services.glue.AWSGlueClient.invoke(AWSGlueClient.java:10607)
        at com.amazonaws.services.glue.AWSGlueClient.invoke(AWSGlueClient.java:10596)
        at com.amazonaws.services.glue.AWSGlueClient.executeGetDatabase(AWSGlueClient.java:4466)
        at com.amazonaws.services.glue.AWSGlueClient.getDatabase(AWSGlueClient.java:4435)
        at com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient.doesDefaultDBExist(AWSCatalogMetastoreClient.java:238)
        at com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient.<init>(AWSCatalogMetastoreClient.java:151)
        at com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory.createMetaStoreClient(AWSGlueDataCatalogHiveClientFactory.java:20)
        at org.apache.hadoop.hive.ql.metadata.HiveUtils.createMetaStoreClient(HiveUtils.java:507)
        at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3746)
        at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3726)
        at org.apache.hadoop.hive.ql.metadata.Hive.getAllFunctions(Hive.java:3988)
        at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:251)
        at org.apache.hadoop.hive.ql.metadata.Hive.registerAllFunctionsOnce(Hive.java:234)
        at org.apache.hadoop.hive.ql.metadata.Hive.<init>(Hive.java:402)
        at org.apache.hadoop.hive.ql.metadata.Hive.create(Hive.java:335)
        at org.apache.hadoop.hive.ql.metadata.Hive.getInternal(Hive.java:315)
        at org.apache.hadoop.hive.ql.metadata.Hive.get(Hive.java:291)
        at org.apache.spark.sql.hive.client.HiveClientImpl.client(HiveClientImpl.scala:257)
        at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:283)
        at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:224)
        at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:223)
        at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:273)
        at org.apache.spark.sql.hive.client.HiveClientImpl.databaseExists(HiveClientImpl.scala:384)
        at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$databaseExists$1(HiveExternalCatalog.scala:249)
        at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
        at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:105)
        at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:249)
        at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:135)
        at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:125)
        at org.apache.spark.sql.hive.HiveSessionStateBuilder.externalCatalog(HiveSessionStateBuilder.scala:44)
        at org.apache.spark.sql.hive.HiveSessionStateBuilder.$anonfun$catalog$1(HiveSessionStateBuilder.scala:51)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.externalCatalog$lzycompute(SessionCatalog.scala:98)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.externalCatalog(SessionCatalog.scala:98)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.databaseExists(SessionCatalog.scala:266)
        at org.apache.spark.sql.internal.CatalogImpl.requireDatabaseExists(CatalogImpl.scala:44)
        at org.apache.spark.sql.internal.CatalogImpl.setCurrentDatabase(CatalogImpl.scala:65)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:750)
rkkalluri commented 2 years ago

@worf0815 can you confirm that you can use the default database form glue catalog with Hudi, and switching databases is what is Causing this.

rkkalluri commented 2 years ago

https://github.com/localstack/localstack/issues/1031 seems to be related, I will try to reproduce the issue and see how we can relocate jackson.

com.fasterxml.jackson com.amazonaws.thirdparty.jackson
rkkalluri commented 2 years ago

@nsivabalan I have verified that this issue is resolved in 0.11 SNAPSHOT, do you know if we have relocated jackson recently

rkkalluri commented 2 years ago

@worf0815 can you try with Hudi 0.10.1 on EMR here is the command to launch pyspark

pyspark --packages org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.10.1,org.apache.spark:spark-avro_2.12:3.1.2 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

worf0815 commented 2 years ago

@rkkalluri I can confirm with above settings using hudi 0.10.1 everything works as expected :)

kination commented 2 years ago

@worf0815 you mean it works well at hudi 0.10.1?

nsivabalan commented 2 years ago

thanks @rkkalluri for helping out. @worf0815 : will go ahead and close out the issue.

worf0815 commented 2 years ago

@worf0815 you mean it works well at hudi 0.10.1?

Yes, the Jackson-Issues was solved and it is now working as expected...

kination commented 2 years ago

@nsivabalan @worf0815 will there be some way to make it work on older hudi version?

kination commented 2 years ago

I'm working with scala, so can it be solved by importing jackson separately?

worf0815 commented 2 years ago

I'm working with scala, so can it be solved by importing jackson separately?

If you are using EMR, AWS-Support recommended to separately specify the AWS dependency, e.g. for pyspark (though the same should work for spark-submit as well):

pyspark --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" --jars /usr/share/aws/aws-java-sdk/aws-java-sdk-bundle-1.12.31.jar,/usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/jars/spark-avro.jar

rkkalluri commented 2 years ago

spark.{driver,executor}.userClassPathFirst=true will also hint spark to prioritize the --jars you provide

kination commented 2 years ago

@worf0815 so will it be solved by using aws-java-sdk-bundle-1.12.31.jar, hudi-spark-bundle.jar, spark-avro.jar jar files inside EMR?

rkkalluri commented 2 years ago

I confirmed that adding aws-java-sdk-bundle-1.12.31.jar explicitly to --jars resolved this issue.