AbsaOSS / spline-spark-agent

Spline agent for Apache Spark
https://absaoss.github.io/spline/
Apache License 2.0
181 stars 93 forks source link

SplineQueryExecutionListener: Unexpected error occurred during lineage processing for application #478

Open Taurus-Le opened 2 years ago

Taurus-Le commented 2 years ago

Version:

Description: The app is reading from multiple delta lake tables and writing the output of join as a new delta lake table. Spline successfully initialized. But unexpected error occurred.

Error logs: delta2delta-1.log delta2delta-2.log delta2delta-3.log

Error information:

22/07/20 15:16:38 ERROR LineageHarvester: Write extraction failed for: class org.apache.spark.sql.catalyst.plans.logical.ReplaceTableAsSelect
22/07/20 15:16:39 ERROR LineageHarvester: 
****************************** OBJECT DUMP BEGIN ******************************
class org.apache.spark.sql.catalyst.plans.logical.ReplaceTableAsSelect
  resolved: boolean = true
  catalog: org.apache.spark.sql.connector.catalog.TableCatalog 
    BestEffortStagedTable$module: org.apache.spark.sql.delta.catalog.DeltaCatalog$BestEffortStagedTable$ = null
    spark: org.apache.spark.sql.SparkSession 
      implicits$module: org.apache.spark.sql.SparkSession$implicits$ 
        $outer: org.apache.spark.sql.SparkSession ! Object was already logged
      org$apache$spark$sql$SparkSession$$creationSite: org.apache.spark.util.CallSite 
        shortForm: java.lang.String = getOrCreate at SparkEnvironment.scala:30
        longForm: java.lang.String = org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:958)
io.github.interestinglab.waterdrop.spark.SparkEnvironment.prepare(SparkEnvironment.scala:30)
io.github.interestinglab.waterdrop.spark.SparkEnvironment.prepare(SparkEnvironment.scala:14)
io.github.interestinglab.waterdrop.config.ConfigBuilder.createEnv(ConfigBuilder.java:193)
io.github.interestinglab.waterdrop.config.ConfigBuilder.<init>(ConfigBuilder.java:39)
io.github.interestinglab.waterdrop.Waterdrop.entryPoint(Waterdrop.java:92)
io.github.interestinglab.waterdrop.Waterdrop.run(Waterdrop.java:58)
io.github.interestinglab.waterdrop.Waterdrop.main(Waterdrop.java:42)
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.base/java.lang.reflect.Method.invoke(Method.java:566)

*****omit for length limit

****************************** OBJECT DUMP END   ******************************
22/07/20 15:16:39 ERROR SplineQueryExecutionListener: Unexpected error occurred during lineage processing for application: dim_t_device_mapping_test #application_1658293063172_0008
java.lang.reflect.InvocationTargetException
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaVanillaMethodMirror0.jinvokeraw(JavaMirrors.scala:404)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaMethodMirror.jinvoke(JavaMirrors.scala:380)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaVanillaMethodMirror.apply(JavaMirrors.scala:396)
    at za.co.absa.commons.reflect.ReflectionUtils$.$anonfun$extractFieldValue$5(ReflectionUtils.scala:143)
    at scala.Option.map(Option.scala:230)
    at za.co.absa.commons.reflect.ReflectionUtils$.reflectClass$1(ReflectionUtils.scala:141)
    at za.co.absa.commons.reflect.ReflectionUtils$.reflectClassHierarchy$1(ReflectionUtils.scala:112)
    at za.co.absa.commons.reflect.ReflectionUtils$.extractFieldValue(ReflectionUtils.scala:169)
    at za.co.absa.commons.reflect.ReflectionUtils$.extractFieldValue(ReflectionUtils.scala:184)
    at za.co.absa.spline.harvester.plugin.embedded.DataSourceV2Plugin.extractSourceIdFromDeltaTableV2(DataSourceV2Plugin.scala:154)
    at za.co.absa.spline.harvester.plugin.embedded.DataSourceV2Plugin.za$co$absa$spline$harvester$plugin$embedded$DataSourceV2Plugin$$extractSourceIdFromTable(DataSourceV2Plugin.scala:150)
    at za.co.absa.spline.harvester.plugin.embedded.DataSourceV2Plugin.za$co$absa$spline$harvester$plugin$embedded$DataSourceV2Plugin$$processV2CreateTableCommand(DataSourceV2Plugin.scala:108)
    at za.co.absa.spline.harvester.plugin.embedded.DataSourceV2Plugin$$anonfun$2.applyOrElse(DataSourceV2Plugin.scala:77)
    at za.co.absa.spline.harvester.plugin.embedded.DataSourceV2Plugin$$anonfun$2.applyOrElse(DataSourceV2Plugin.scala:52)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
    at scala.PartialFunction$Lifted.apply(PartialFunction.scala:228)
    at scala.PartialFunction$Lifted.apply(PartialFunction.scala:224)
    at za.co.absa.spline.harvester.builder.write.PluggableWriteCommandExtractor.asWriteCommand(PluggableWriteCommandExtractor.scala:44)
    at za.co.absa.spline.harvester.LineageHarvester.$anonfun$tryExtractWriteCommand$1(LineageHarvester.scala:145)
    at scala.util.Try$.apply(Try.scala:213)
    at za.co.absa.spline.harvester.LineageHarvester.tryExtractWriteCommand(LineageHarvester.scala:145)
    at za.co.absa.spline.harvester.LineageHarvester.harvest(LineageHarvester.scala:64)
    at za.co.absa.spline.harvester.QueryExecutionEventHandler.onSuccess(QueryExecutionEventHandler.scala:42)
    at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$2(SplineQueryExecutionListener.scala:40)
    at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$2$adapted(SplineQueryExecutionListener.scala:40)
    at scala.Option.foreach(Option.scala:407)
    at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$1(SplineQueryExecutionListener.scala:40)
    at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.withErrorHandling(SplineQueryExecutionListener.scala:49)
    at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.onSuccess(SplineQueryExecutionListener.scala:40)
    at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:158)
    at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:128)
    at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
    at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
    at org.apache.spark.sql.util.ExecutionListenerBus.postToAll(QueryExecutionListener.scala:128)
    at org.apache.spark.sql.util.ExecutionListenerBus.onOtherEvent(QueryExecutionListener.scala:140)
    at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)
    at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
    at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
    at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
    at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
    at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
    at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
    at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
    at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
    at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1404)
    at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
Caused by: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:943)
io.github.interestinglab.waterdrop.spark.SparkEnvironment.prepare(SparkEnvironment.scala:30)
io.github.interestinglab.waterdrop.spark.SparkEnvironment.prepare(SparkEnvironment.scala:14)
io.github.interestinglab.waterdrop.config.ConfigBuilder.createEnv(ConfigBuilder.java:193)
io.github.interestinglab.waterdrop.config.ConfigBuilder.<init>(ConfigBuilder.java:39)
io.github.interestinglab.waterdrop.Waterdrop.entryPoint(Waterdrop.java:92)
io.github.interestinglab.waterdrop.Waterdrop.run(Waterdrop.java:58)
io.github.interestinglab.waterdrop.Waterdrop.main(Waterdrop.java:42)
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.base/java.lang.reflect.Method.invoke(Method.java:566)
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:737)

The currently active SparkContext was created at:

org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:943)
io.github.interestinglab.waterdrop.spark.SparkEnvironment.prepare(SparkEnvironment.scala:30)
io.github.interestinglab.waterdrop.spark.SparkEnvironment.prepare(SparkEnvironment.scala:14)
io.github.interestinglab.waterdrop.config.ConfigBuilder.createEnv(ConfigBuilder.java:193)
io.github.interestinglab.waterdrop.config.ConfigBuilder.<init>(ConfigBuilder.java:39)
io.github.interestinglab.waterdrop.Waterdrop.entryPoint(Waterdrop.java:92)
io.github.interestinglab.waterdrop.Waterdrop.run(Waterdrop.java:58)
io.github.interestinglab.waterdrop.Waterdrop.main(Waterdrop.java:42)
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.base/java.lang.reflect.Method.invoke(Method.java:566)
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:737)

    at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:118)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1512)
    at org.apache.spark.sql.delta.Snapshot.$anonfun$stateReconstruction$2(Snapshot.scala:107)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:120)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:118)
    at org.apache.spark.sql.delta.Snapshot.recordFrameProfile(Snapshot.scala:58)
    at org.apache.spark.sql.delta.Snapshot.$anonfun$stateReconstruction$1(Snapshot.scala:98)
    at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag(DeltaLogging.scala:124)
    at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag$(DeltaLogging.scala:123)
    at org.apache.spark.sql.delta.Snapshot.withDmqTag(Snapshot.scala:58)
    at org.apache.spark.sql.delta.Snapshot.stateReconstruction(Snapshot.scala:98)
    at org.apache.spark.sql.delta.Snapshot.$anonfun$cachedState$1(Snapshot.scala:154)
    at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag(DeltaLogging.scala:124)
    at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag$(DeltaLogging.scala:123)
    at org.apache.spark.sql.delta.Snapshot.withDmqTag(Snapshot.scala:58)
    at org.apache.spark.sql.delta.Snapshot.cachedState$lzycompute(Snapshot.scala:154)
    at org.apache.spark.sql.delta.Snapshot.cachedState(Snapshot.scala:153)
    at org.apache.spark.sql.delta.Snapshot.$anonfun$stateDF$1(Snapshot.scala:164)
    at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag(DeltaLogging.scala:124)
    at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag$(DeltaLogging.scala:123)
    at org.apache.spark.sql.delta.Snapshot.withDmqTag(Snapshot.scala:58)
    at org.apache.spark.sql.delta.Snapshot.stateDF(Snapshot.scala:164)
    at org.apache.spark.sql.delta.Snapshot.$anonfun$computedState$3(Snapshot.scala:209)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:120)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:118)
    at org.apache.spark.sql.delta.Snapshot.recordFrameProfile(Snapshot.scala:58)
    at org.apache.spark.sql.delta.Snapshot.$anonfun$computedState$2(Snapshot.scala:205)
    at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag(DeltaLogging.scala:124)
    at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag$(DeltaLogging.scala:123)
    at org.apache.spark.sql.delta.Snapshot.withDmqTag(Snapshot.scala:58)
    at org.apache.spark.sql.delta.Snapshot.$anonfun$computedState$1(Snapshot.scala:205)
    at org.apache.spark.sql.delta.util.DeltaProgressReporter.withJobDescription(DeltaProgressReporter.scala:53)
    at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode(DeltaProgressReporter.scala:32)
    at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode$(DeltaProgressReporter.scala:27)
    at org.apache.spark.sql.delta.Snapshot.withStatusCode(Snapshot.scala:58)
    at org.apache.spark.sql.delta.Snapshot.computedState$lzycompute(Snapshot.scala:204)
    at org.apache.spark.sql.delta.Snapshot.computedState(Snapshot.scala:202)
    at org.apache.spark.sql.delta.Snapshot.metadata(Snapshot.scala:242)
    at org.apache.spark.sql.delta.stats.DataSkippingReaderBase.$init$(DataSkippingReader.scala:174)
    at org.apache.spark.sql.delta.Snapshot.<init>(Snapshot.scala:67)
    at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$createSnapshot$1(SnapshotManagement.scala:271)
    at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotFromGivenOrEquivalentLogSegment(SnapshotManagement.scala:390)
    at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotFromGivenOrEquivalentLogSegment$(SnapshotManagement.scala:378)
    at org.apache.spark.sql.delta.DeltaLog.createSnapshotFromGivenOrEquivalentLogSegment(DeltaLog.scala:64)
    at org.apache.spark.sql.delta.SnapshotManagement.createSnapshot(SnapshotManagement.scala:270)
    at org.apache.spark.sql.delta.SnapshotManagement.createSnapshot$(SnapshotManagement.scala:266)
    at org.apache.spark.sql.delta.DeltaLog.createSnapshot(DeltaLog.scala:64)
    at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$getSnapshotAtInit$2(SnapshotManagement.scala:252)
    at scala.Option.map(Option.scala:230)
    at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$getSnapshotAtInit$1(SnapshotManagement.scala:248)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:120)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:118)
    at org.apache.spark.sql.delta.DeltaLog.recordFrameProfile(DeltaLog.scala:64)
    at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotAtInit(SnapshotManagement.scala:246)
    at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotAtInit$(SnapshotManagement.scala:245)
    at org.apache.spark.sql.delta.DeltaLog.getSnapshotAtInit(DeltaLog.scala:64)
    at org.apache.spark.sql.delta.SnapshotManagement.$init$(SnapshotManagement.scala:53)
    at org.apache.spark.sql.delta.DeltaLog.<init>(DeltaLog.scala:69)
    at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$3(DeltaLog.scala:567)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
    at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$2(DeltaLog.scala:567)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:120)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:118)
    at org.apache.spark.sql.delta.DeltaLog$.recordFrameProfile(DeltaLog.scala:437)
    at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperation$5(DeltaLogging.scala:114)
    at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
    at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
    at org.apache.spark.sql.delta.DeltaLog$.recordOperation(DeltaLog.scala:437)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:113)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:98)
    at org.apache.spark.sql.delta.DeltaLog$.recordDeltaOperation(DeltaLog.scala:437)
    at org.apache.spark.sql.delta.DeltaLog$.createDeltaLog$1(DeltaLog.scala:566)
    at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$4(DeltaLog.scala:577)
    at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792)
    at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
    at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
    at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
    at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257)
    at com.google.common.cache.LocalCache.get(LocalCache.java:4000)
    at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
    at org.apache.spark.sql.delta.DeltaLog$.getDeltaLogFromCache$1(DeltaLog.scala:577)
    at org.apache.spark.sql.delta.DeltaLog$.apply(DeltaLog.scala:589)
    at org.apache.spark.sql.delta.DeltaLog$.forTable(DeltaLog.scala:487)
    at org.apache.spark.sql.delta.catalog.DeltaTableV2.deltaLog$lzycompute(DeltaTableV2.scala:78)
    at org.apache.spark.sql.delta.catalog.DeltaTableV2.deltaLog(DeltaTableV2.scala:78)
    at org.apache.spark.sql.delta.catalog.DeltaTableV2.$anonfun$snapshot$3(DeltaTableV2.scala:107)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.delta.catalog.DeltaTableV2.snapshot$lzycompute(DeltaTableV2.scala:107)
    at org.apache.spark.sql.delta.catalog.DeltaTableV2.snapshot(DeltaTableV2.scala:95)
    at org.apache.spark.sql.delta.catalog.DeltaTableV2.properties(DeltaTableV2.scala:124)
    ... 53 more

Additional information:

  1. I've modifed the pom.xml under the root of spline-spark-agent and spline-spark-agent/core:

    <!--
            <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch-hadoop</artifactId>
                <version>7.6.0</version>
            </dependency>
    -->
            <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch-spark-30_2.12</artifactId>
                <version>7.13.4</version>
                <scope>provided</scope>
            </dependency>
  2. I've disabled spark dynamic allocation in suspision that sparkContext might be shutdown for resource problem. Below is the content of my spark-defaults.conf.

    
    spark.master                                        yarn
    spark.eventLog.enabled                              true
    spark.eventLog.dir                                  hdfs://masters/spark/eventLog
    spark.serializer                                    org.apache.spark.serializer.KryoSerializer
    spark.delta.logStore.class                          org.apache.spark.sql.delta.storage.HDFSLogStore
    spark.executor.extraClassPath                       /home/hadoop/SW/extra-libs/*
    spark.driver.extraClassPath                         /home/hadoop/SW/extra-libs/*
    spark.hive.metastore.uris                           thrift://192.168.21.8:9083
    spark.sql.warehouse.dir                             hdfs://masters/
    spark.scheduler.listenerbus.eventqueue.capacity     100000
    spark.sql.extensions                                io.delta.sql.DeltaSparkSessionExtension
    spark.sql.catalog.spark_catalog                     org.apache.spark.sql.delta.catalog.DeltaCatalog
    # yarn
    spark.yarn.jars                                     hdfs://masters/spark-yarn-jars/*

dynamic allocation

spark.shuffle.service.enabled true

spark.dynamicAllocation.enabled true

spark.dynamicAllocation.minExecutors 1

spark.dynamicAllocation.maxExecutors

spark.dynamicAllocation.schedulerBacklogTimeout 5

Lineage

spark.sql.queryExecutionListeners za.co.absa.spline.harvester.listener.SplineQueryExecutionListener spark.spline.mode REQUIRED spark.spline.producer.url http://h8:9095/spline-rest/producer

spark.spline.postProcessingFilter.composite.filters dsPasswordReplace

spark.spline.lineageDispatcher console

spark.spline.lineageDispatcher.console.className za.co.absa.spline.harvester.dispatcher.ConsoleLineageDispatcher

spark.spline.lineageDispatcher.console.stream ERR

spark.spline.lineageDispatcher.http.producer.url http://h8:9095/spline-rest/producer

Kafka dispatcher

spline.lineageDispatcher=kafka

spline.lineageDispatcher.kafka.className=za.co.absa.spline.harvester.dispatcher.KafkaLineageDispatcher

producer configs as defined by kafka (bootstrap.servers, key.serializer, etc) all kafka configs are supported

spline.lineageDispatcher.kafka.producer.bootstrap.servers=h5:9092,h6:9092,h7:9092

spline.lineageDispatcher.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer

spline.lineageDispatcher.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer

spline.lineageDispatcher.kafka.producer.max.in.flight.requests.per.connection=1

topic name for plans and events

spline.lineageDispatcher.kafka.topic=spline-lineage



For delta2delta application, there seems to be something wrong about `ReplaceTableAsSelect`. As we can see write extraction failed. Could you kindly help? If I missed anything important, please just let me know.
cerveada commented 2 years ago

There should be one ticket for an issue, I will create new one for the java.util.NoSuchElementException: None.get problem.

cerveada commented 2 years ago

@Taurus-Le I split the issue in two, this one is for delta and #479 is for es. Feel free to add any relevant info or correct me if I split something wrongly.

Taurus-Le commented 2 years ago

Hi @cerveada, sorry for the trouble. I meant to save you some trouble. I did not do the opposite intentionally. Thanks for helping.

Taurus-Le commented 2 years ago

Version of apache maven and JDK used to build spline-spark-agent:

[hadoop@h8 spline-spark-agent]$ mvn -version
Apache Maven 3.8.5 (3599d3414f046de2324203b78ddcf9b5e4388aa0)
Maven home: /home/hadoop/SW/apache-maven-3.8.5
Java version: 1.8.0_331, vendor: Oracle Corporation, runtime: /home/hadoop/SW/jdk1.8.0_331/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-1062.el7.x86_64", arch: "amd64", family: "unix"

And here is how I build the spline-spark-agent:

git clone https://github.com/AbsaOSS/spline-spark-agent.git
git checkout release/0.7.10
mvn scala-cross-build:change-version -Pscala-2.12
mvn clean package -Pscala-2.12,spark-3.2 -Dmaven.test.skip=true
cerveada commented 2 years ago

It seems to be changes in Delta Lake: 1.2.0 that are causing the issue, the code was tested only on 1.1.0 so if you want to try a workaround try to switch to version 1.1.0

Taurus-Le commented 2 years ago

Got it. Thanks. If there's anything I could help, please let me know. And I just found Delta Lake: 2.0.0 has been released.

Taurus-Le commented 2 years ago

Hi @cerveada, I've switched Delta Lake:1.2.0 to Delta Lake: 1.1.0. And this time I got the same error as #479.

22/07/22 09:42:46 ERROR SplineQueryExecutionListener: Unexpected error occurred during lineage processing for application: dim_t_device_mapping_test #application_1658293063172_0023
java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:529)
    at scala.None$.get(Option.scala:527)
    at za.co.absa.spline.harvester.postprocessing.ViewAttributeAddingFilter.toAttributeReferencesMap(ViewAttributeAddingFilter.scala:59)
    at za.co.absa.spline.harvester.postprocessing.ViewAttributeAddingFilter.$anonfun$addMissingAttributeLinks$1(ViewAttributeAddingFilter.scala:39)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at za.co.absa.spline.harvester.postprocessing.ViewAttributeAddingFilter.addMissingAttributeLinks(ViewAttributeAddingFilter.scala:39)
    at za.co.absa.spline.harvester.postprocessing.ViewAttributeAddingFilter.processExecutionPlan(ViewAttributeAddingFilter.scala:34)
    at za.co.absa.spline.harvester.postprocessing.PostProcessor.$anonfun$process$4(PostProcessor.scala:38)
    at za.co.absa.spline.harvester.postprocessing.PostProcessor.$anonfun$provideCtx$1(PostProcessor.scala:25)
    at scala.Function1.$anonfun$andThen$1(Function1.scala:57)
    at scala.Function1.$anonfun$andThen$1(Function1.scala:57)
    at za.co.absa.spline.harvester.postprocessing.PostProcessor.process(PostProcessor.scala:38)
    at za.co.absa.spline.harvester.LineageHarvester.$anonfun$harvest$4(LineageHarvester.scala:110)
    at scala.Option.flatMap(Option.scala:271)
    at za.co.absa.spline.harvester.LineageHarvester.harvest(LineageHarvester.scala:64)
    at za.co.absa.spline.harvester.QueryExecutionEventHandler.onSuccess(QueryExecutionEventHandler.scala:42)
    at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$2(SplineQueryExecutionListener.scala:40)
    at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$2$adapted(SplineQueryExecutionListener.scala:40)
    at scala.Option.foreach(Option.scala:407)
    at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$1(SplineQueryExecutionListener.scala:40)
    at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.withErrorHandling(SplineQueryExecutionListener.scala:49)
    at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.onSuccess(SplineQueryExecutionListener.scala:40)
    at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:158)
    at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:128)
    at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
    at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
    at org.apache.spark.sql.util.ExecutionListenerBus.postToAll(QueryExecutionListener.scala:128)
    at org.apache.spark.sql.util.ExecutionListenerBus.onOtherEvent(QueryExecutionListener.scala:140)
    at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)
    at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
    at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
    at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
    at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
    at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
    at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
    at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
    at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
    at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1404)

So, I think the problem might have something to do with the calling of createOrReplaceTempView in my code which calls createTempViewCommand that calls CreateViewCommand internally. According to readme of spline-spark-agent, CreateViewCommand will be ignored.

cerveada commented 2 years ago

Hmm, I read the logs again and there seems to be an issue with Spark stoping.

java.lang.reflect.InvocationTargetException
Caused by: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.

This looks like Spline is trying to call DeltaTableV2.properties method, but the spark context is already stoped. Can you try to not stop the Spark if it will change the outcome?

Taurus-Le commented 2 years ago

Hi @cerveada, I did not stop the Spark myself. I never called stop() on SparkContext or SparkSession. Is it because I'm running spark on yarn? I had a suspicion there might be a connection between Spark stopping and dynamic allocation before. So I disabled dynamic allocation. But it did not help. And I'm truely sorry I forgot to tell you I got the same error as https://github.com/AbsaOSS/spline-spark-agent/issues/479#issuecomment-1196398326 after using PR: https://github.com/AbsaOSS/spline-spark-agent/pull/481

cerveada commented 2 years ago

I don't know it might be.

On YARN do you run in local mode or cluster mode?

Do you still get the following errors?

java.lang.reflect.InvocationTargetException
Caused by: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
cerveada commented 1 year ago

You can also try to set higher hadoop.service.shutdown.timeout