AbsaOSS / spline-spark-agent

Spline agent for Apache Spark
https://absaoss.github.io/spline/
Apache License 2.0
183 stars 93 forks source link

Spark not able to query S3 after adding the jar spark-3.3-spline-agent-bundle_2.12-1.0.1.jar #592

Closed wasimblocknaut closed 1 year ago

wasimblocknaut commented 1 year ago

I am using a single node spark to test with spline. Spline servers are running on docker based on https://absaoss.github.io/spline/#step-by-step

SparkConfig spark-defaults.conf => spark.sql.queryExecutionListeners za.co.absa.spline.harvester.listener.SplineQueryExecutionListener spark.spline.producer.url http://spline-server:9090/producer

Added spark-3.3-spline-agent-bundle_2.12-1.0.1.jar to the $SPARK_HOME/jars path

Execute => val df = spark.read.format("csv").option("inferSchema", "True").option("header", "True").option("sep", ",").load("s3a://{Bucket}/{file_name}.csv")

spark is throwing exception

java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder':
  at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1178)
  at org.apache.spark.sql.SparkSession.$anonfun$sessionState$2(SparkSession.scala:162)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:160)
  at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:157)
  at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:698)
  at org.apache.spark.sql.SparkSession.read(SparkSession.scala:662)
  ... 43 elided
Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make public void sun.net.www.protocol.jar.JarURLConnection$JarURLInputStream.close() throws java.io.IOException accessible: module java.base does not "opens sun.net.www.protocol.jar" to unnamed module @76a4ebf2
  at java.base/java.lang.reflect.AccessibleObject.throwInaccessibleObjectException(AccessibleObject.java:387)
  at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:363)
  at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:311)
  at java.base/java.lang.reflect.Method.checkCanSetAccessible(Method.java:201)
  at java.base/java.lang.reflect.Method.setAccessible(Method.java:195)
  at scala.reflect.package$.ensureAccessible(package.scala:65)
  at scala.runtime.ScalaRunTime$.ensureAccessible(ScalaRunTime.scala:162)
  at za.co.absa.commons.lang.ARM$.reflMethod$Method1(ARM.scala:32)
  at za.co.absa.commons.lang.ARM$.using(ARM.scala:32)
  at za.co.absa.spline.harvester.conf.YAMLConfiguration.<init>(YAMLConfiguration.scala:37)
  at za.co.absa.spline.harvester.conf.StandardSplineConfigurationStack$.defaultConfig(StandardSplineConfigurationStack.scala:44)
  at za.co.absa.spline.harvester.SparkLineageInitializer.createListener(SparkLineageInitializer.scala:123)
  at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.<init>(SplineQueryExecutionListener.scala:37)
  at java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:67)
  at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
  at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:484)
  at org.apache.spark.util.Utils$.$anonfun$loadExtensions$1(Utils.scala:2930)
  at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
  at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
  at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
  at org.apache.spark.util.Utils$.loadExtensions(Utils.scala:2919)
  at org.apache.spark.sql.util.ExecutionListenerManager.$anonfun$new$2(QueryExecutionListener.scala:90)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  at org.apache.spark.sql.internal.SQLConf$.withExistingConf(SQLConf.scala:158)
  at org.apache.spark.sql.util.ExecutionListenerManager.$anonfun$new$1(QueryExecutionListener.scala:90)
  at org.apache.spark.sql.util.ExecutionListenerManager.$anonfun$new$1$adapted(QueryExecutionListener.scala:88)
  at scala.Option.foreach(Option.scala:407)
  at org.apache.spark.sql.util.ExecutionListenerManager.<init>(QueryExecutionListener.scala:88)
  at org.apache.spark.sql.internal.BaseSessionStateBuilder.$anonfun$listenerManager$2(BaseSessionStateBuilder.scala:336)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.sql.internal.BaseSessionStateBuilder.listenerManager(BaseSessionStateBuilder.scala:336)
  at org.apache.spark.sql.internal.BaseSessionStateBuilder.build(BaseSessionStateBuilder.scala:364)
  at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1175)
  ... 49 more
cerveada commented 1 year ago

What are the versions of Spark, Scala and Java that you are using?

wasimblocknaut commented 1 year ago

spark 3.3.1 scala 2.12 java openjdk 19.0.2

cerveada commented 1 year ago

Could you try if the issue happens on Java 11 and Java 17?

wajda commented 1 year ago

It looks like a common issue with modern JVMs. Try add JVM option when starting Spark:

--add-opens=java.base/sun.net.www.protocol.jar=ALL-UNNAMED
wasimblocknaut commented 1 year ago

Could you try if the issue happens on Java 11 and Java 17?

Got new exception

23/02/06 11:44:15 ERROR SparkLineageInitializer: Spline initialization failed! Spark Lineage tracking is DISABLED.
java.lang.RuntimeException: Plugin instantiation failure: class za.co.absa.spline.harvester.plugin.embedded.DeltaPlugin
    at za.co.absa.spline.harvester.plugin.registry.AutoDiscoveryPluginRegistry$$anonfun$$nestedInanonfun$allPlugins$1$1.applyOrElse(AutoDiscoveryPluginRegistry.scala:54)
    at za.co.absa.spline.harvester.plugin.registry.AutoDiscoveryPluginRegistry$$anonfun$$nestedInanonfun$allPlugins$1$1.applyOrElse(AutoDiscoveryPluginRegistry.scala:54)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
    at scala.util.Failure.recover(Try.scala:234)
    at za.co.absa.spline.harvester.plugin.registry.AutoDiscoveryPluginRegistry.$anonfun$allPlugins$1(AutoDiscoveryPluginRegistry.scala:54)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at za.co.absa.spline.harvester.plugin.registry.AutoDiscoveryPluginRegistry.<init>(AutoDiscoveryPluginRegistry.scala:51)
    at za.co.absa.spline.agent.SplineAgent$.create(SplineAgent.scala:66)
    at za.co.absa.spline.harvester.SparkLineageInitializer.createListener(SparkLineageInitializer.scala:162)
    at za.co.absa.spline.harvester.SparkLineageInitializer.$anonfun$createListener$6(SparkLineageInitializer.scala:139)
    at za.co.absa.spline.harvester.SparkLineageInitializer.withErrorHandling(SparkLineageInitializer.scala:176)
    at za.co.absa.spline.harvester.SparkLineageInitializer.createListener(SparkLineageInitializer.scala:138)
    at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.<init>(SplineQueryExecutionListener.scala:37)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
    at org.apache.spark.util.Utils$.$anonfun$loadExtensions$1(Utils.scala:2930)
    at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
    at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
    at org.apache.spark.util.Utils$.loadExtensions(Utils.scala:2919)
    at org.apache.spark.sql.util.ExecutionListenerManager.$anonfun$new$2(QueryExecutionListener.scala:90)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.internal.SQLConf$.withExistingConf(SQLConf.scala:158)
    at org.apache.spark.sql.util.ExecutionListenerManager.$anonfun$new$1(QueryExecutionListener.scala:90)
    at org.apache.spark.sql.util.ExecutionListenerManager.$anonfun$new$1$adapted(QueryExecutionListener.scala:88)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.sql.util.ExecutionListenerManager.<init>(QueryExecutionListener.scala:88)
    at org.apache.spark.sql.internal.BaseSessionStateBuilder.$anonfun$listenerManager$2(BaseSessionStateBuilder.scala:336)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.internal.BaseSessionStateBuilder.listenerManager(BaseSessionStateBuilder.scala:336)
    at org.apache.spark.sql.internal.BaseSessionStateBuilder.build(BaseSessionStateBuilder.scala:364)
    at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1175)
    at org.apache.spark.sql.SparkSession.$anonfun$sessionState$2(SparkSession.scala:162)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:160)
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:157)
    at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:698)
    at org.apache.spark.sql.SparkSession.read(SparkSession.scala:662)
    at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:22)
    at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:26)
    at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:28)
    at $line14.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:30)
    at $line14.$read$$iw$$iw$$iw$$iw.<init>(<console>:32)
    at $line14.$read$$iw$$iw$$iw.<init>(<console>:34)
    at $line14.$read$$iw$$iw.<init>(<console>:36)
    at $line14.$read$$iw.<init>(<console>:38)
    at $line14.$read.<init>(<console>:40)
    at $line14.$read$.<init>(<console>:44)
    at $line14.$read$.<clinit>(<console>)
    at $line14.$eval$.$print$lzycompute(<console>:7)
    at $line14.$eval$.$print(<console>:6)
    at $line14.$eval.$print(<console>)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:747)
    at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1020)
    at scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:568)
    at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:36)
    at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:116)
    at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:41)
    at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:567)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:594)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:564)
    at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:865)
    at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:733)
    at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:435)
    at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:456)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:239)
    at org.apache.spark.repl.Main$.doMain(Main.scala:78)
    at org.apache.spark.repl.Main$.main(Main.scala:58)
    at org.apache.spark.repl.Main.main(Main.scala)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.reflect.InvocationTargetException
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
    at za.co.absa.spline.harvester.plugin.registry.AutoDiscoveryPluginRegistry.$anonfun$instantiatePlugin$1(AutoDiscoveryPluginRegistry.scala:73)
    at scala.util.Try$.apply(Try.scala:213)
    at za.co.absa.spline.harvester.plugin.registry.AutoDiscoveryPluginRegistry.instantiatePlugin(AutoDiscoveryPluginRegistry.scala:63)
    at za.co.absa.spline.harvester.plugin.registry.AutoDiscoveryPluginRegistry.$anonfun$allPlugins$1(AutoDiscoveryPluginRegistry.scala:53)
    ... 92 more
Caused by: java.lang.IncompatibleClassChangeError: class za.co.absa.spline.harvester.plugin.embedded.DeltaPlugin$SyntheticDeltaRead has interface org.apache.spark.sql.catalyst.plans.logical.LeafNode as super class
    at java.base/java.lang.ClassLoader.defineClass1(Native Method)
    at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1017)
    at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
    at java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(BuiltinClassLoader.java:800)
    at java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(BuiltinClassLoader.java:698)
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(BuiltinClassLoader.java:621)
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:579)
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
    at za.co.absa.spline.harvester.plugin.embedded.DeltaPlugin.<init>(DeltaPlugin.scala:43)
    ... 100 more
wasimblocknaut commented 1 year ago

It looks like a common issue with modern JVMs. Try add JVM option when starting Spark:

--add-opens=java.base/sun.net.www.protocol.jar=ALL-UNNAMED

Getting the same exception with this jvm options --add-opens=java.base/sun.net.www.protocol.jar=ALL-UNNAMED

wajda commented 1 year ago

Caused by: java.lang.IncompatibleClassChangeError: class za.co.absa.spline.harvester.plugin.embedded.DeltaPlugin$SyntheticDeltaRead has interface org.apache.spark.sql.catalyst.plans.logical.LeafNode as super class

This was fixed in #579

wasimblocknaut commented 1 year ago

@wajda Thanks for the information. It's working