AbsaOSS / spline-spark-agent

Spline agent for Apache Spark
https://absaoss.github.io/spline/
Apache License 2.0
185 stars 95 forks source link

Incompatible version between Spline and Spark BigQuery connector #821

Closed XavierZYXue closed 2 months ago

XavierZYXue commented 3 months ago

I encounterd an exception as following when I tried to send Spark's lineage message to Kafka cluster.

24/07/29 22:56:26 ERROR org.apache.spark.util.Utils: throw uncaught fatal error in thread spark-listener-group-shared

java.lang.ExceptionInInitializerError

    at za.co.absa.spline.harvester.plugin.embedded.BigQueryPlugin$$anonfun$relationProviderProcessor$1.bigQueryConfig$1(BigQueryPlugin.scala:68)

    at za.co.absa.spline.harvester.plugin.embedded.BigQueryPlugin$$anonfun$relationProviderProcessor$1.$anonfun$applyOrElse$7(BigQueryPlugin.scala:98)

    at scala.Option.orElse(Option.scala:447)

    at za.co.absa.spline.harvester.plugin.embedded.BigQueryPlugin$$anonfun$relationProviderProcessor$1.applyOrElse(BigQueryPlugin.scala:98)

    at za.co.absa.spline.harvester.plugin.embedded.BigQueryPlugin$$anonfun$relationProviderProcessor$1.applyOrElse(BigQueryPlugin.scala:62)

    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:172)

    at za.co.absa.spline.harvester.plugin.composite.SaveIntoDataSourceCommandPlugin$$anonfun$writeNodeProcessor$1.applyOrElse(SaveIntoDataSourceCommandPlugin.scala:49)

    at za.co.absa.spline.harvester.plugin.composite.SaveIntoDataSourceCommandPlugin$$anonfun$writeNodeProcessor$1.applyOrElse(SaveIntoDataSourceCommandPlugin.scala:45)

    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)

    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)

    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)

    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)

    at scala.PartialFunction$Lifted.apply(PartialFunction.scala:228)

    at scala.PartialFunction$Lifted.apply(PartialFunction.scala:224)

    at za.co.absa.spline.harvester.builder.write.PluggableWriteCommandExtractor.asWriteCommand(PluggableWriteCommandExtractor.scala:45)

    at za.co.absa.spline.harvester.LineageHarvester.$anonfun$tryExtractWriteCommand$1(LineageHarvester.scala:145)

    at scala.util.Try$.apply(Try.scala:213)

    at za.co.absa.spline.harvester.LineageHarvester.tryExtractWriteCommand(LineageHarvester.scala:145)

    at za.co.absa.spline.harvester.LineageHarvester.harvest(LineageHarvester.scala:61)

    at za.co.absa.spline.agent.SplineAgent$$anon$1.$anonfun$handle$1(SplineAgent.scala:91)

    at za.co.absa.spline.agent.SplineAgent$$anon$1.withErrorHandling(SplineAgent.scala:100)

    at za.co.absa.spline.agent.SplineAgent$$anon$1.handle(SplineAgent.scala:72)

    at za.co.absa.spline.harvester.listener.QueryExecutionListenerDelegate.onSuccess(QueryExecutionListenerDelegate.scala:28)

    at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$1(SplineQueryExecutionListener.scala:41)

    at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$1$adapted(SplineQueryExecutionListener.scala:41)

    at scala.Option.foreach(Option.scala:407)

    at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.onSuccess(SplineQueryExecutionListener.scala:41)

    at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:155)

    at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:131)

    at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)

    at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)

    at org.apache.spark.sql.util.ExecutionListenerBus.postToAll(QueryExecutionListener.scala:131)

    at org.apache.spark.sql.util.ExecutionListenerBus.onOtherEvent(QueryExecutionListener.scala:135)

    at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)

    at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)

    at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)

    at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)

    at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)

    at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)

    at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)

    at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)

    at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)

    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)

    at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)

    at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)

    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1381)

    at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)

Caused by: java.lang.RuntimeException: Cannot find method `public static SparkBigQueryConfig from(... {7 args} ...)` in the class `class com.google.cloud.spark.bigquery.SparkBigQueryConfig`

    at scala.sys.package$.error(package.scala:30)

    at za.co.absa.spline.harvester.plugin.embedded.BigQueryPlugin$SparkBigQueryConfig$.$anonfun$methodFrom$2(BigQueryPlugin.scala:150)

    at scala.Option.getOrElse(Option.scala:189)

    at za.co.absa.spline.harvester.plugin.embedded.BigQueryPlugin$SparkBigQueryConfig$.<init>(BigQueryPlugin.scala:150)

    at za.co.absa.spline.harvester.plugin.embedded.BigQueryPlugin$SparkBigQueryConfig$.<clinit>(BigQueryPlugin.scala)

    ... 47 more

Then, I went to check the commits history records of BigQueryPlugin.scala. I found that there has always been a check for whether the number of parameter types equals 7, right after the initial commit.

Hence, base on the prompt, I go to SparkBigQueryConfig.java and check the commits history records as following: v0.25.0 v0.25.1

Base on the above commits history, it indeed provides a static SparkBigQueryConfig with 7 args before version 0.25.0, but it was changed after 0.25.0. Therefore, you will encounter this error if you use the version after this version.

XavierZYXue commented 3 months ago

due to this PR break the SparkBigQueryConfig into two classes.

wajda commented 3 months ago

Amazing! Unfortunately due to zero capacity of the team I cannot promise any ETA of the fix. But if you could create a PR we would happily accept it and release an updated version of the agent.

XavierZYXue commented 3 months ago

@wajda hi, I have tested in my local environment. It's passed. Could you please reivew this #823 for this bugfix?