AbsaOSS / spline-spark-agent

Spline agent for Apache Spark
https://absaoss.github.io/spline/
Apache License 2.0
183 stars 93 forks source link

Getting error while initializing spline agent in databricks DBR versions above 11.1 till latest #597

Closed Neelotpaul closed 1 year ago

Neelotpaul commented 1 year ago

We have been able to generate lineage information from databricks DBRs as below with spline agents with compatible spark versions like below - DBR Version | Spline Version 7.3 LTS Scala 2.12 Spark 3.0.1 | spark-3.0-spline-agent-bundle_2.12-0.7.6.jar 9.1 LTS Scala 2.12 Spark 3.1.2 | spark-3.0-spline-agent-bundle_2.12-0.7.6.jar 10.4LTS Scala 2.12 Spark 3.2.1 | spark_3_2_spline_agent_bundle_2_12_1_0_0.jar & spark_3_3_spline_agent_bundle_2_12_1_0_0.jar

But for spark 3.3 i.e. DBR versions the spline jar fails to initiate - 11.1 Scala 2.12 Spark 3.3.0 | spark_3_3_spline_agent_bundle_2_12_1_0_0.jar 11.2 Scala 2.12 Spark 3.3.0 | spark_3_3_spline_agent_bundle_2_12_1_0_0.jar 11.3LTS Scala 2.12 Spark 3.0.0 | spark_3_3_spline_agent_bundle_2_12_1_0_0.jar 12.0 Scala 2.12 Spark 3.3.1 | spark_3_3_spline_agent_bundle_2_12_1_0_0.jar 12.1 Scala 2.12 Spark 3.3.1 | spark_3_3_spline_agent_bundle_2_12_1_0_0.jar

Please find the error log -

23/02/02 12:43:09 INFO SparkLineageInitializer: Lineage Dispatcher: Http
23/02/02 12:43:09 INFO SparkLineageInitializer: Post-Processing Filter: Password replace
23/02/02 12:43:09 INFO SparkLineageInitializer: Ignore-Write Detection Strategy: Write metrics
23/02/02 12:43:09 INFO AutoDiscoveryPluginRegistry: Loading plugin: class za.co.absa.spline.harvester.plugin.embedded.AvroPlugin
23/02/02 12:43:09 INFO AutoDiscoveryPluginRegistry: Loading plugin: class za.co.absa.spline.harvester.plugin.embedded.BigQueryPlugin
23/02/02 12:43:09 INFO AutoDiscoveryPluginRegistry: Loading plugin: class za.co.absa.spline.harvester.plugin.embedded.CassandraPlugin
23/02/02 12:43:09 INFO AutoDiscoveryPluginRegistry: Loading plugin: class za.co.absa.spline.harvester.plugin.embedded.CobrixPlugin
23/02/02 12:43:09 INFO AutoDiscoveryPluginRegistry: Loading plugin: class za.co.absa.spline.harvester.plugin.embedded.DataSourceV2Plugin
23/02/02 12:43:09 INFO AutoDiscoveryPluginRegistry: Loading plugin: class za.co.absa.spline.harvester.plugin.embedded.DatabricksPlugin
23/02/02 12:43:09 INFO AutoDiscoveryPluginRegistry: Loading plugin: class za.co.absa.spline.harvester.plugin.embedded.DeltaPlugin
23/02/02 12:43:09 ERROR SparkLineageInitializer: Spline initialization failed! **Spark Lineage tracking is DISABLED**.
java.lang.RuntimeException: **Plugin instantiation failure: class za.co.absa.spline.harvester.plugin.embedded.DeltaPlugin**
    at za.co.absa.spline.harvester.plugin.registry.AutoDiscoveryPluginRegistry$$anonfun$$nestedInanonfun$allPlugins$1$1.applyOrElse(AutoDiscoveryPluginRegistry.scala:54)
    at za.co.absa.spline.harvester.plugin.registry.AutoDiscoveryPluginRegistry$$anonfun$$nestedInanonfun$allPlugins$1$1.applyOrElse(AutoDiscoveryPluginRegistry.scala:54)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
    at scala.util.Failure.recover(Try.scala:234)
    at za.co.absa.spline.harvester.plugin.registry.AutoDiscoveryPluginRegistry.$anonfun$allPlugins$1(AutoDiscoveryPluginRegistry.scala:54)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at za.co.absa.spline.harvester.plugin.registry.AutoDiscoveryPluginRegistry.<init>(AutoDiscoveryPluginRegistry.scala:51)
    at za.co.absa.spline.agent.SplineAgent$.create(SplineAgent.scala:66)
    at za.co.absa.spline.harvester.SparkLineageInitializer.createListener(SparkLineageInitializer.scala:162)
    at za.co.absa.spline.harvester.SparkLineageInitializer.$anonfun$createListener$6(SparkLineageInitializer.scala:139)
    at za.co.absa.spline.harvester.SparkLineageInitializer.withErrorHandling(SparkLineageInitializer.scala:176)
    at za.co.absa.spline.harvester.SparkLineageInitializer.createListener(SparkLineageInitializer.scala:138)
    at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.<init>(SplineQueryExecutionListener.scala:37)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.spark.util.Utils$.$anonfun$loadExtensions$1(Utils.scala:3337)
    at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
    at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
    at org.apache.spark.util.Utils$.loadExtensions(Utils.scala:3326)
    at org.apache.spark.sql.util.ExecutionListenerManager.$anonfun$new$2(QueryExecutionListener.scala:93)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.internal.SQLConf$.withExistingConf(SQLConf.scala:170)
    at org.apache.spark.sql.util.ExecutionListenerManager.$anonfun$new$1(QueryExecutionListener.scala:93)
    at org.apache.spark.sql.util.ExecutionListenerManager.$anonfun$new$1$adapted(QueryExecutionListener.scala:91)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.sql.util.ExecutionListenerManager.<init>(QueryExecutionListener.scala:91)
    at org.apache.spark.sql.internal.BaseSessionStateBuilder.$anonfun$listenerManager$2(BaseSessionStateBuilder.scala:368)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.internal.BaseSessionStateBuilder.listenerManager(BaseSessionStateBuilder.scala:368)
    at org.apache.spark.sql.internal.BaseSessionStateBuilder.build(BaseSessionStateBuilder.scala:419)
    at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1416)
    at org.apache.spark.sql.SparkSession.$anonfun$sessionState$2(SparkSession.scala:265)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:263)
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:259)
    at org.apache.spark.sql.SparkSession.streams(SparkSession.scala:413)
    at org.apache.spark.sql.SQLContext.streams(SQLContext.scala:741)
    at com.databricks.backend.daemon.driver.DatabricksStreamingQueryListener.<init>(DatabricksStreamingQueryListener.scala:55)
    at com.databricks.backend.daemon.driver.DriverSparkHooks.<init>(DriverSparkHooks.scala:35)
    at com.databricks.backend.daemon.driver.DriverSparkHooks.sessionizedDriverHooks(DriverSparkHooks.scala:198)
    at com.databricks.backend.daemon.driver.DriverCorral.createDriverSparkHooks(DriverCorral.scala:422)
    at com.databricks.backend.daemon.driver.DriverCorral.$anonfun$addReplToExecutionContext$1(DriverCorral.scala:555)
    at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
    at com.databricks.backend.daemon.driver.DriverCorral.addReplToExecutionContext(DriverCorral.scala:555)
    at com.databricks.backend.daemon.driver.DriverCorral.com$databricks$backend$daemon$driver$DriverCorral$$handleRequest(DriverCorral.scala:704)
    at com.databricks.backend.daemon.driver.DriverCorral$$anonfun$receive$1.applyOrElse(DriverCorral.scala:1112)
    at com.databricks.backend.daemon.driver.DriverCorral$$anonfun$receive$1.applyOrElse(DriverCorral.scala:1108)
    at com.databricks.rpc.ServerBackend.$anonfun$internalReceive$2(ServerBackend.scala:120)
    at com.databricks.rpc.ServerBackend$$anonfun$commonReceive$1.applyOrElse(ServerBackend.scala:147)
    at com.databricks.rpc.ServerBackend$$anonfun$commonReceive$1.applyOrElse(ServerBackend.scala:147)
    at com.databricks.rpc.ServerBackend.$anonfun$internalReceive$1(ServerBackend.scala:102)
    at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:541)
    at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:636)
    at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:657)
    at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:398)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:147)
    at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:396)
    at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:393)
    at com.databricks.rpc.ServerBackend.withAttributionContext(ServerBackend.scala:24)
    at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:441)
    at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:426)
    at com.databricks.rpc.ServerBackend.withAttributionTags(ServerBackend.scala:24)
    at com.databricks.logging.UsageLogging.recordOperationWithResultTags(UsageLogging.scala:631)
    at com.databricks.logging.UsageLogging.recordOperationWithResultTags$(UsageLogging.scala:550)
    at com.databricks.rpc.ServerBackend.recordOperationWithResultTags(ServerBackend.scala:24)
    at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:541)
    at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:511)
    at com.databricks.rpc.ServerBackend.recordOperation(ServerBackend.scala:24)
    at com.databricks.rpc.ServerBackend.internalReceive(ServerBackend.scala:101)
    at com.databricks.rpc.JettyServer$RequestManager.$anonfun$handleRPC$2(JettyServer.scala:946)
    at scala.util.Try$.apply(Try.scala:213)
    at com.databricks.rpc.JettyServer$RequestManager.handleRPC(JettyServer.scala:946)
    at com.databricks.rpc.JettyServer$RequestManager.handleRequestAndRespond(JettyServer.scala:862)
    at com.databricks.rpc.JettyServer$RequestManager.$anonfun$handleHttp$2(JettyServer.scala:498)
    at com.databricks.rpc.JettyServer$RequestManager.$anonfun$handleHttp$2$adapted(JettyServer.scala:477)
    at com.databricks.logging.activity.ActivityContextFactory$.$anonfun$withActivityInternal$1(ActivityContextFactory.scala:246)
    at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:398)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:147)
    at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:396)
    at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:393)
    at com.databricks.logging.activity.ActivityContextFactory$.withAttributionContext(ActivityContextFactory.scala:38)
    at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:441)
    at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:426)
    at com.databricks.logging.activity.ActivityContextFactory$.withAttributionTags(ActivityContextFactory.scala:38)
    at com.databricks.logging.activity.ActivityContextFactory$.withActivityInternal(ActivityContextFactory.scala:246)
    at com.databricks.logging.activity.ActivityContextFactory$.withServiceRequestActivity(ActivityContextFactory.scala:114)
    at com.databricks.rpc.JettyServer$RequestManager.handleHttp(JettyServer.scala:477)
    at com.databricks.rpc.JettyServer$RequestManager.doPost(JettyServer.scala:372)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:523)
    at com.databricks.rpc.HttpServletWithPatch.service(HttpServletWithPatch.scala:33)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:590)
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:550)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:190)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:501)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
    at org.eclipse.jetty.server.Server.handle(Server.java:516)
    at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:487)
    at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:732)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:479)
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277)
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)
    at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131)
    at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409)
    at com.databricks.rpc.InstrumentedQueuedThreadPool$$anon$1.run(InstrumentedQueuedThreadPool.scala:80)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at za.co.absa.spline.harvester.plugin.registry.AutoDiscoveryPluginRegistry.$anonfun$instantiatePlugin$1(AutoDiscoveryPluginRegistry.scala:73)
    at scala.util.Try$.apply(Try.scala:213)
    at za.co.absa.spline.harvester.plugin.registry.AutoDiscoveryPluginRegistry.instantiatePlugin(AutoDiscoveryPluginRegistry.scala:63)
    at za.co.absa.spline.harvester.plugin.registry.AutoDiscoveryPluginRegistry.$anonfun$allPlugins$1(AutoDiscoveryPluginRegistry.scala:53)
    ... 124 more
Caused by: java.lang.IncompatibleClassChangeError: class za.co.absa.spline.harvester.plugin.embedded.DeltaPlugin$SyntheticDeltaRead has interface org.apache.spark.sql.catalyst.plans.logical.LeafNode as super class
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:473)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
    at za.co.absa.spline.harvester.plugin.embedded.DeltaPlugin.<init>(DeltaPlugin.scala:43)
    ... 132 more

Please let me know if this can be handled by any other jar version? To provide context, we have created our own producer rest API to fetch the lineage information and parse it and store.

cerveada commented 1 year ago

Please use the latest fix version (that is 1.0.4) instead of 1.0.0.

This issue was already fixed.