apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.09k stars 2.12k forks source link

java.lang.ClassNotFoundException: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions - using spark structured streaming #6266

Closed HannesDS closed 1 year ago

HannesDS commented 1 year ago

Apache Iceberg version

1.0.0 (latest release)

Query engine

Spark

Please describe the bug 🐞

Application:

We are trying to use spark structured streaming in combination with iceberg. The application is containerised and behind the scenes running on EKS. Spark structured streaming worked well until we tried to add the iceberg extension. We have used iceberg successfully before in a local spark session before (not structured streaming).

Environment We have tried with the following versions:

Iceberg version: 1.0.0 ; 0.14.1 ; 0.13.1 Spark version: 3.2.0 ; 3.0.3 AWS SDK version: 2.17.257 ; 2.18.22 Scala version: 2.12

Extra configuration included into the spark session:

` REMOTE_CATALOG_NAME: str = "remote_glue_catalog"

config.setAll( [ ( "spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.0_2.12:0.14.1", ), ("spark.jars.packages", "software.amazon.awssdk:bundle:2.17.257"), ( "spark.jars.packages", "software.amazon.awssdk:url-connection-client:2.17.257", ), ( f"spark.sql.catalog.{REMOTE_CATALOG_NAME}", "org.apache.iceberg.spark.SparkCatalog", ), ( f"spark.sql.catalog.{REMOTE_CATALOG_NAME}.warehouse", f"{aws.datalake_root_path()}/{table_name}/", ), ( f"spark.sql.catalog.{REMOTE_CATALOG_NAME}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog", ), ( f"spark.sql.catalog.{REMOTE_CATALOG_NAME}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO", ), ( "spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", ), (f"spark.sql.defaultCatalog", REMOTE_CATALOG_NAME), ] )`

Stacktrace

`22/11/24 09:18:36 WARN SparkSession: Cannot use org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions to configure session extensions. java.lang.ClassNotFoundException: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions at java.base/java.net.URLClassLoader.findClass(Unknown Source) at java.base/java.lang.ClassLoader.loadClass(Unknown Source) at java.base/java.lang.ClassLoader.loadClass(Unknown Source) at java.base/java.lang.Class.forName0(Native Method) at java.base/java.lang.Class.forName(Unknown Source) at org.apache.spark.util.Utils$.classForName(Utils.scala:206) at org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$1(SparkSession.scala:1160) at org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$1$adapted(SparkSession.scala:1158) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$applyExtensions(SparkSession.scala:1158) at org.apache.spark.sql.SparkSession.(SparkSession.scala:101) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:238) at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80) at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Unknown Source)

.... 22/11/24 09:18:45 at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:564) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191) at org.apache.spark.sql.execution.streaming.OneTimeExecutor.execute(TriggerExecutor.scala:39) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245) 22/11/24 09:18:45 ERROR MicroBatchExecution: Query [id = , runId = ] terminated with error org.apache.spark.SparkException: Cannot find catalog plugin class for catalog 'remote_glue_catalog': org.apache.iceberg.spark.SparkCatalog at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:66) at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:52) at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86) at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:52) at org.apache.spark.sql.connector.catalog.CatalogManager.currentCatalog(CatalogManager.scala:122) at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$currentNamespace$1(CatalogManager.scala:100) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.connector.catalog.CatalogManager.currentNamespace(CatalogManager.scala:100) at org.apache.spark.sql.catalyst.optimizer.GetCurrentDatabase.apply(finishAnalysis.scala:98) at org.apache.spark.sql.catalyst.optimizer.GetCurrentDatabase.apply(finishAnalysis.scala:95) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:149) at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68) at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:146) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:138) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:138) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:116) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:116) at org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:81) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:138) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:138) at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:81) at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:79) at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:90) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:108) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:105) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$14(MicroBatchExecution.scala:574) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:564) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245) ERROR:root:<traceback object at 0x7f50f454a640> Traceback (most recent call last): File "/opt/spark/work-dir/src/app/streaming/main.py", line 11, in streaming_main(registry=streaming_registry, ) File "/opt/spark/work-dir/src/app/app.py", line 20, in main start_job(session, job=job, environment=args.env, kwargs=args.kwargs) File "/opt/spark/work-dir/src/app/app.py", line 44, in start_job job.run(session, kwargs=kwargs) File "/opt/spark/work-dir/src/elia_streaming/jobs/current_system_imbalance/producer.py", line 31, in run iceberg.awaitTermination() File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 103, in awaitTermination File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in call File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 134, in deco File "", line 3, in raise_from pyspark.sql.utils.StreamingQueryException: Cannot find catalog plugin class for catalog 'remote_glue_catalog': org.apache.iceberg.spark.SparkCatalog`

Debug checks:

Question So anybody know what is going on? My guess is that with spark structured streaming the jars aren't really added to the classpath for the different executors but not sure.

Thanks in advance!

nastra commented 1 year ago

Have you tried adding those jars (aws jars + iceberg-spark-runtime jar) directly to the respective spark nodes (under their jars folder)?

HannesDS commented 1 year ago

Yes, they are for sure there. If I use the following code to print the extra jars included: self.logger.info(f"Spark Jars: {spark.sparkContext._jsc.sc().listJars()}") It logs the following:

22/11/28 16:29:50 INFO SparkContext: Added JAR /opt/spark/jars/iceberg-spark-runtime-3.2_2.12-1.0.0.jar at spark://streaming-c7066896-a115-4017-b47d-5-7d9dff84bf12a7e0-driver-svc.dev.svc:7078/jars/iceberg-spark-runtime-3.2_2.12-1.0.0.jar with timestamp 1669652989004
22/11/28 16:29:50 INFO SparkContext: Added JAR /opt/spark/jars/bundle-2.17.257.jar at spark://streaming-c7066896-a115-4017-b47d-5-7d9dff84bf12a7e0-driver-svc.dev.svc:7078/jars/bundle-2.17.257.jar with timestamp 1669652989004
22/11/28 16:29:50 INFO SparkContext: Added JAR /opt/spark/jars/url-connection-client-2.17.257.jar at spark://streaming-c7066896-a115-4017-b47d-5-7d9dff84bf12a7e0-driver-svc.dev.svc:7078/jars/url-connection-client-2.17.257.jar with timestamp 1669652989004
22/11/28 16:30:09 INFO CurrentSystemImbalanceProducer: Spark Jars : Vector(spark://streaming-c7066896-a115-4017-b47d-5-7d9dff84bf12a7e0-driver-svc.dev.svc:7078/jars/iceberg-spark-runtime-3.2_2.12-1.0.0.jar, spark://streaming-c7066896-a115-4017-b47d-5-7d9dff84bf12a7e0-driver-svc.dev.svc:7078/jars/url-connection-client-2.17.257.jar, spark://streaming-c7066896-a115-4017-b47d-5-7d9dff84bf12a7e0-driver-svc.dev.svc:7078/jars/bundle-2.17.257.jar)
22/11/28 16:30:11 

Now I added this config:

        "spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.0.0,"
                               "software.amazon.awssdk:bundle:2.17.257,"
                               "software.amazon.awssdk:url-connection-client:2.17.257,",
        "spark.jars": "/opt/spark/jars/iceberg-spark-runtime-3.2_2.12-1.0.0.jar,"
                      "/opt/spark/jars/bundle-2.17.257.jar,"
                      "/opt/spark/jars/url-connection-client-2.17.257.jar,",
        f"spark.sql.catalog.{REMOTE_CATALOG_NAME}": "org.apache.iceberg.spark.SparkCatalog",
        f"spark.sql.catalog.{REMOTE_CATALOG_NAME}.warehouse": f"{bucket_root_path}/{purpose_name}/",
        f"spark.sql.catalog.{REMOTE_CATALOG_NAME}.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
        f"spark.sql.catalog.{REMOTE_CATALOG_NAME}.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
        "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
        "spark.sql.defaultCatalog": REMOTE_CATALOG_NAME,

Edit: Removed the extensions jars, it was a legacy thing for debugging which is not actually added in the code that was shown.

RussellSpitzer commented 1 year ago

In your config above it has the wrong runtime jar version, did you update this? That said the jar is most likely not on the classpath or possibly the jar with the correct name was not properly copied. Also do not include iceberg-spark-extensions, it should be already included and shaded in the runtime jar.

HannesDS commented 1 year ago

Yes, so I updated back to the latest version. So I currently still have the same error and I am using:

RussellSpitzer commented 1 year ago

You should just check on the Spark UI, and see if the jars are actually listed in the Environment tab

HannesDS commented 1 year ago

@RussellSpitzer I did, and I saw they were there, only once, and thus no different versions. I did use the same docker and envirnment for a batch job, and it is not working either. So I must be doing something wrong or different. When I find something I will update.

HannesDS commented 1 year ago

Update: I fixed it, I had a mistake in my docker container where I used wget ... -o ... This did write a jar but it was empty, I fixed it by using curl. Stupid mistake on my end, thanks for the help anyway.