apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.43k stars 2.42k forks source link

Hudi 0.13.1 compatibility issues with EMR-6.7.0 and EMR-6.11.1 #9919

Open Shubham21k opened 1 year ago

Shubham21k commented 1 year ago

We are trying to upgrade from hudi 0.11.1 to hudi 0.13.1 for batch workloads. we use emr to run these batch HoodieDeltastreamer / HoodieIncrSource jobs. we are facing dependency issues while using emr release (6.7.0 / 6.11.1) having spark 3.2.x or 3.3.x

To Reproduce Steps to reproduce the behavior:

  1. Run deltastreamer job on emr-6.5.0 (spark 3.1.2) it should run fine. 2.Run same deltastreamer job on emr-6.11.1 (spark 3.3.2) or emr-6.7.0 (spark 3.2.1), job should result in failure (see error logs attached below)

Expected behavior :

As hudi 0.13.1 is compatible with 3.2.x and 3.3.x, hudi jobs should run fine on emr realeases with same spark version.

Environment Description

Additional context

Spark step : spark-submit --master yarn --jars /usr/lib/spark/external/lib/spark-avro.jar,s3://generic-data-lake/jars/hudi-utilities-bundle_2.12-0.13.1.jar,s3://generic-data-lake/jars/hudi-aws-bundle-0.13.1.jar --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf spark.executor.cores=5 --conf spark.driver.memory=3200m --conf spark.driver.memoryOverhead=800m --conf spark.executor.memoryOverhead=1400m --conf spark.executor.memory=14600m --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.initialExecutors=1 --conf spark.dynamicAllocation.minExecutors=1 --conf spark.dynamicAllocation.maxExecutors=21 --conf spark.scheduler.mode=FAIR --conf spark.task.maxFailures=5 --conf spark.rdd.compress=true --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.shuffle.service.enabled=true --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.yarn.max.executor.failures=5 --conf spark.driver.userClassPathFirst=true --conf spark.executor.userClassPathFirst=true --conf spark.sql.catalogImplementation=hive --deploy-mode cluster s3://generic-data-lake/jars/deltastreamer-addons-2.0-SNAPSHOT.jar --hoodie-conf hoodie.parquet.compression.codec=snappy --hoodie-conf hoodie.deltastreamer.source.hoodieincr.num_instants=100 --table-type COPY_ON_WRITE --source-class org.apache.hudi.utilities.sources.HoodieIncrSource --hoodie-conf hoodie.deltastreamer.source.hoodieincr.path=s3://generic-data-lake/raw-data/credit_underwriting/features --hoodie-conf hoodie.metrics.on=true --hoodie-conf hoodie.metrics.reporter.type=PROMETHEUS_PUSHGATEWAY --hoodie-conf hoodie.metrics.pushgateway.host=pushgateway.prod.generic-tech.in --hoodie-conf hoodie.metrics.pushgateway.port=443 --hoodie-conf hoodie.metrics.pushgateway.delete.on.shutdown=false --hoodie-conf hoodie.metrics.pushgateway.job.name=credit_underwriting_transformed_features_accounts_hudi --hoodie-conf hoodie.metrics.pushgateway.random.job.name.suffix=false --hoodie-conf hoodie.metadata.enable=true --hoodie-conf hoodie.metrics.reporter.metricsname.prefix=hudi --target-base-path s3://generic-data-lake/raw-data/credit_underwriting_transformed/features_accounts --target-table features_accounts --enable-sync --hoodie-conf hoodie.datasource.hive_sync.database=credit_underwriting_transformed --hoodie-conf hoodie.datasource.hive_sync.table=features_accounts --sync-tool-classes org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool --hoodie-conf hoodie.datasource.write.recordkey.field=id,pos --hoodie-conf hoodie.datasource.write.precombine.field=id --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator --hoodie-conf hoodie.datasource.write.partitionpath.field=created_at_dt --hoodie-conf hoodie.datasource.hive_sync.partition_fields=created_at_dt --hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor --hoodie-conf hoodie.deltastreamer.keygen.timebased.timestamp.type=DATE_STRING --hoodie-conf "hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z',yyyy-MM-dd' 'HH:mm:ss.SSSSSS,yyyy-MM-dd' 'HH:mm:ss,yyyy-MM-dd'T'HH:mm:ss'Z'" --hoodie-conf hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd --source-ordering-field id --hoodie-conf secret.key.name=credit-underwriting-secret --hoodie-conf transformer.decrypt.cols=features_json --hoodie-conf transformer.uncompress.cols=false --hoodie-conf transformer.jsonToStruct.column=features_json --hoodie-conf transformer.normalize.column=features_json.accounts --hoodie-conf transformer.copy.fields=created_at,created_at_dt --transformer-class com.generic.transform.DecryptTransformer,com.generic.transform.JsonToStructTypeTransformer,com.generic.transform.NormalizeArrayTransformer,com.generic.transform.FlatteningTransformer,com.generic.transform.CopyFieldTransformer

Stacktrace

23/10/17 12:13:41 ERROR Client: Application diagnostics message: User class threw exception: org.apache.hudi.exception.HoodieException: Unable to load class
    at org.apache.hudi.common.util.ReflectionUtils.getClass(ReflectionUtils.java:58)
    at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:68)
    at org.apache.spark.sql.hudi.analysis.HoodieAnalysis$.instantiateKlass(HoodieAnalysis.scala:141)
    at org.apache.spark.sql.hudi.analysis.HoodieAnalysis$.customOptimizerRules(HoodieAnalysis.scala:118)
    at org.apache.spark.sql.hudi.HoodieSparkSessionExtension.apply(HoodieSparkSessionExtension.scala:43)
    at org.apache.spark.sql.hudi.HoodieSparkSessionExtension.apply(HoodieSparkSessionExtension.scala:28)
    at org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$1(SparkSession.scala:1197)
    at org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$1$adapted(SparkSession.scala:1192)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$applyExtensions(SparkSession.scala:1192)
    at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:956)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.<init>(HoodieDeltaStreamer.java:655)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:152)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:125)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:592)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:740)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.execution.datasources.Spark32NestedSchemaPruning
    at java.lang.ClassLoader.findClass(ClassLoader.java:523)
    at org.apache.spark.util.ParentClassLoader.findClass(ParentClassLoader.java:35)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.java:40)
    at org.apache.spark.util.ChildFirstURLClassLoader.loadClass(ChildFirstURLClassLoader.java:48)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:264)
    at org.apache.hudi.common.util.ReflectionUtils.getClass(ReflectionUtils.java:55)
    ... 21 more
ad1happy2go commented 1 year ago

@Shubham21k I think you are using the wrong utilities bundle jar. There are two utility jars - Hudi-utility (contains hudi spark bundle classes also) and Hudi-slim-bundle.

Can you try using Hudi-slim-bundle and spark3.3/spark3.2 bundle jar.

Shubham21k commented 1 year ago

@ad1happy2go when used hudi-slim-bundle with spark 3.2 bundle on emr-6.7.0 --packages org.apache.hudi:hudi-utilities-slim-bundle_2.12:0.13.1,org.apache.hudi:hudi-spark3.2-bundle_2.12:0.13.1

Exception in thread "main" java.lang.ExceptionInInitializerError
    at org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$.<init>(YarnSparkHadoopUtil.scala:46)
    at org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$.<clinit>(YarnSparkHadoopUtil.scala)
    at org.apache.spark.deploy.yarn.Client.<init>(Client.scala:107)
    at org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1688)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1000)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1089)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1098)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassCastException: org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl cannot be cast to org.apache.hadoop.yarn.api.records.Priority
    at org.apache.hadoop.yarn.api.records.Priority.newInstance(Priority.java:39)
    at org.apache.hadoop.yarn.api.records.Priority.<clinit>(Priority.java:34)
    ... 11 more
ad1happy2go commented 1 year ago

@Shubham21k Looks like yarn jars are conflicting now. But we did this testing on emr-6.7.0 and it works fine. Can you try building utility bundle jar with profile spark3.2 and try once.

Checkout the release-0.13.1 tag and try

mvn -T 2C clean install -DskipTests -Dscala-2.12 -Dspark3.2

ad1happy2go commented 1 year ago

@Shubham21k Were you able to build jar and try out?