AbsaOSS / spline-spark-agent

Spline agent for Apache Spark
https://absaoss.github.io/spline/
Apache License 2.0
185 stars 95 forks source link

Write delta in spark, when spline parses spark lineage, an error is reported? #609

Closed jinmu0410 closed 1 year ago

jinmu0410 commented 1 year ago

image

submit code

./spark-submit --class com.hs.test
--master spark://jinmudeMacBook-Pro.local:7077
--name test_015
--jars "/Users/jinmu/Downloads/spark-3.1-spline-agent-bundle_2.12-1.0.4.jar" \
--conf spark.sql.queryExecutionListeners=za.co.absa.spline.harvester.listener.SplineQueryExecutionListener \
--conf spark.spline.lineageDispatcher=console \
/Users/jinmu/Downloads/hs/lakehouse/target/hs-lakehouse-1.0-SNAPSHOT.jar

this code

val spark = SparkSession.builder()
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.appName("test1")
.master("local[*]")
.getOrCreate()

val path = "/Users/jinmu/Downloads/111.csv"

val readerOptions = Map(
  "delimiter" ->",",
  "header" -> "true",
  "encoding" -> "utf-8",
  "escape" -> "\""
)
import spark.implicits._

spark.read.format("csv").options(readerOptions).load(path)
  .write.format("delta").mode("append")
  .save("/Users/jinmu/Downloads/delta/jinmu.db/test33")

who can help me?

cerveada commented 1 year ago

Can you post the whole exception? Preferably as a text.

jinmu0410 commented 1 year ago

ok

23/02/21 17:43:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/02/21 17:43:38 INFO SparkContext: Running Spark version 3.1.2
23/02/21 17:43:39 INFO ResourceUtils: ==============================================================
23/02/21 17:43:39 INFO ResourceUtils: No custom resources configured for spark.driver.
23/02/21 17:43:39 INFO ResourceUtils: ==============================================================
23/02/21 17:43:39 INFO SparkContext: Submitted application: test1
23/02/21 17:43:39 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
23/02/21 17:43:39 INFO ResourceProfile: Limiting resource is cpu
23/02/21 17:43:39 INFO ResourceProfileManager: Added ResourceProfile id: 0
23/02/21 17:43:39 INFO SecurityManager: Changing view acls to: jinmu
23/02/21 17:43:39 INFO SecurityManager: Changing modify acls to: jinmu
23/02/21 17:43:39 INFO SecurityManager: Changing view acls groups to: 
23/02/21 17:43:39 INFO SecurityManager: Changing modify acls groups to: 
23/02/21 17:43:39 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(jinmu); groups with view permissions: Set(); users  with modify permissions: Set(jinmu); groups with modify permissions: Set()
23/02/21 17:43:39 INFO Utils: Successfully started service 'sparkDriver' on port 56109.
23/02/21 17:43:39 INFO SparkEnv: Registering MapOutputTracker
23/02/21 17:43:39 INFO SparkEnv: Registering BlockManagerMaster
23/02/21 17:43:39 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
23/02/21 17:43:39 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
23/02/21 17:43:39 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
23/02/21 17:43:39 INFO DiskBlockManager: Created local directory at /private/var/folders/g6/3j6jyxt964x1pk4qk7_b49rw0000gn/T/blockmgr-315a09b6-cd7a-4b15-b807-99473f820d62
23/02/21 17:43:39 INFO MemoryStore: MemoryStore started with capacity 366.3 MiB
23/02/21 17:43:39 INFO SparkEnv: Registering OutputCommitCoordinator
23/02/21 17:43:39 INFO Utils: Successfully started service 'SparkUI' on port 4040.
23/02/21 17:43:39 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.218.8:4040
23/02/21 17:43:39 INFO SparkContext: Added JAR file:///Users/jinmu/Downloads/spark-3.1-spline-agent-bundle_2.12-1.0.4.jar at spark://192.168.218.8:56109/jars/spark-3.1-spline-agent-bundle_2.12-1.0.4.jar with timestamp 1676972618954
23/02/21 17:43:39 INFO SparkContext: Added JAR file:/Users/jinmu/Downloads/hs/lakehouse/target/hs-lakehouse-1.0-SNAPSHOT.jar at spark://192.168.218.8:56109/jars/hs-lakehouse-1.0-SNAPSHOT.jar with timestamp 1676972618954
23/02/21 17:43:40 INFO Executor: Starting executor ID driver on host 192.168.218.8
23/02/21 17:43:40 INFO Executor: Fetching spark://192.168.218.8:56109/jars/spark-3.1-spline-agent-bundle_2.12-1.0.4.jar with timestamp 1676972618954
23/02/21 17:43:40 INFO TransportClientFactory: Successfully created connection to /192.168.218.8:56109 after 27 ms (0 ms spent in bootstraps)
23/02/21 17:43:40 INFO Utils: Fetching spark://192.168.218.8:56109/jars/spark-3.1-spline-agent-bundle_2.12-1.0.4.jar to /private/var/folders/g6/3j6jyxt964x1pk4qk7_b49rw0000gn/T/spark-15280169-319f-4d7b-8d11-e3261d824b44/userFiles-6b25b9b2-0b90-4e58-86e7-1a527c6b54ba/fetchFileTemp2786412298596410544.tmp
23/02/21 17:43:40 INFO Executor: Adding file:/private/var/folders/g6/3j6jyxt964x1pk4qk7_b49rw0000gn/T/spark-15280169-319f-4d7b-8d11-e3261d824b44/userFiles-6b25b9b2-0b90-4e58-86e7-1a527c6b54ba/spark-3.1-spline-agent-bundle_2.12-1.0.4.jar to class loader
23/02/21 17:43:40 INFO Executor: Fetching spark://192.168.218.8:56109/jars/hs-lakehouse-1.0-SNAPSHOT.jar with timestamp 1676972618954
23/02/21 17:43:40 INFO Utils: Fetching spark://192.168.218.8:56109/jars/hs-lakehouse-1.0-SNAPSHOT.jar to /private/var/folders/g6/3j6jyxt964x1pk4qk7_b49rw0000gn/T/spark-15280169-319f-4d7b-8d11-e3261d824b44/userFiles-6b25b9b2-0b90-4e58-86e7-1a527c6b54ba/fetchFileTemp8602971486436238988.tmp
23/02/21 17:43:40 INFO Executor: Adding file:/private/var/folders/g6/3j6jyxt964x1pk4qk7_b49rw0000gn/T/spark-15280169-319f-4d7b-8d11-e3261d824b44/userFiles-6b25b9b2-0b90-4e58-86e7-1a527c6b54ba/hs-lakehouse-1.0-SNAPSHOT.jar to class loader
23/02/21 17:43:40 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 56111.
23/02/21 17:43:40 INFO NettyBlockTransferService: Server created on 192.168.218.8:56111
23/02/21 17:43:40 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
23/02/21 17:43:40 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.218.8, 56111, None)
23/02/21 17:43:40 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.218.8:56111 with 366.3 MiB RAM, BlockManagerId(driver, 192.168.218.8, 56111, None)
23/02/21 17:43:40 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.218.8, 56111, None)
23/02/21 17:43:40 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.218.8, 56111, None)
23/02/21 17:43:40 INFO SingleEventLogFileWriter: Logging events to file:/Users/jinmu/Downloads/directory/local-1676972619872.inprogress
23/02/21 17:43:40 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/Users/jinmu/Downloads/soft/spark-3.0.3-bin-hadoop2.7/bin/spark-warehouse').
23/02/21 17:43:40 INFO SharedState: Warehouse path is 'file:/Users/jinmu/Downloads/soft/spark-3.0.3-bin-hadoop2.7/bin/spark-warehouse'.
23/02/21 17:43:41 INFO SparkUI: Stopped Spark web UI at http://192.168.218.8:4040
23/02/21 17:43:41 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/02/21 17:43:41 INFO MemoryStore: MemoryStore cleared
23/02/21 17:43:41 INFO BlockManager: BlockManager stopped
23/02/21 17:43:41 INFO BlockManagerMaster: BlockManagerMaster stopped
23/02/21 17:43:41 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/02/21 17:43:41 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" java.lang.IncompatibleClassChangeError: Implementing class
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at io.delta.sql.parser.DeltaSqlParser.<init>(DeltaSqlParser.scala:71)
    at io.delta.sql.DeltaSparkSessionExtension.$anonfun$apply$1(DeltaSparkSessionExtension.scala:79)
    at org.apache.spark.sql.SparkSessionExtensions.$anonfun$buildParser$1(SparkSessionExtensions.scala:239)
    at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
    at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
    at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:49)
    at org.apache.spark.sql.SparkSessionExtensions.buildParser(SparkSessionExtensions.scala:238)
    at org.apache.spark.sql.internal.BaseSessionStateBuilder.sqlParser$lzycompute(BaseSessionStateBuilder.scala:124)
    at org.apache.spark.sql.internal.BaseSessionStateBuilder.sqlParser(BaseSessionStateBuilder.scala:123)
    at org.apache.spark.sql.internal.BaseSessionStateBuilder.build(BaseSessionStateBuilder.scala:341)
    at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1145)
    at org.apache.spark.sql.SparkSession.$anonfun$sessionState$2(SparkSession.scala:159)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:155)
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:152)
    at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:997)
    at org.apache.spark.sql.SparkSession.read(SparkSession.scala:658)
    at com.hs.test$.main(test.scala:36)
    at com.hs.test.main(test.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
23/02/21 17:43:41 INFO ShutdownHookManager: Shutdown hook called
23/02/21 17:43:41 INFO ShutdownHookManager: Deleting directory /private/var/folders/g6/3j6jyxt964x1pk4qk7_b49rw0000gn/T/spark-22f428e3-b4f2-41d8-8ff4-684d30e6517f
23/02/21 17:43:41 INFO ShutdownHookManager: Deleting directory /private/var/folders/g6/3j6jyxt964x1pk4qk7_b49rw0000gn/T/spark-15280169-319f-4d7b-8d11-e3261d824b44
cerveada commented 1 year ago

From the log it seems this is caused by loading some class in io.delta.sql.parser.DeltaSqlParser. So, not an issue caused by Spline Agent.

Can you try to run the same thing without Spline Agent?

I would also check if the proper delta version is in use: https://docs.delta.io/latest/releases.html

jinmu0410 commented 1 year ago

yes, thanks

jinmu0410 commented 1 year ago

@cerveada My spark task is wrong, but the lineage still comes out,is this normal?

jinmu0410 commented 1 year ago
23/02/22 16:06:53 INFO FileSourceStrategy: Output Data Schema: struct<value: string>
23/02/22 16:06:53 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 339.9 KiB, free 365.6 MiB)
23/02/22 16:06:53 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 30.8 KiB, free 365.6 MiB)
23/02/22 16:06:53 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.218.12:51952 (size: 30.8 KiB, free: 366.2 MiB)
23/02/22 16:06:53 INFO SparkContext: Created broadcast 2 from load at test.scala:50
23/02/22 16:06:53 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
23/02/22 16:06:54 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
23/02/22 16:06:54 INFO SparkUI: Stopped Spark web UI at http://192.168.218.12:4040
ExecutionPlan (apiVersion: 1.2):
{"id":"4427d12c-d567-5ccd-b89a-a9d95f858ef2","name":"test123","operations":{"write":{"outputSource":"jdbc:mysql://192.168.110.40:3306:test.test_csv","append":true,"id":"op-0","name":"SaveIntoDataSourceCommand","childIds":["op-1"],"extra":{"destinationType":"jdbc"}},"reads":[{"inputSources":["hdfs://lake-node3:8020/jinmu/test/111.csv"],"id":"op-3","name":"LogicalRelation","output":["attr-0","attr-1","attr-2","attr-3","attr-4","attr-5","attr-6","attr-7","attr-8","attr-9","attr-10","attr-11","attr-12","attr-13","attr-14","attr-15","attr-16","attr-17","attr-18","attr-19","attr-20","attr-21","attr-22","attr-23","attr-24","attr-25","attr-26","attr-27","attr-28","attr-29","attr-30","attr-31","attr-32","attr-33","attr-34","attr-35","attr-36","attr-37","attr-38","attr-39","attr-40"],"params":{"path":"/jinmu/test/111.csv","encoding":"utf-8","escape":"\"","header":"true","delimiter":","},"extra":{"sourceType":"csv"}}],"other":[{"id":"op-2","name":"SubqueryAlias","childIds":["op-3"],"output":["attr-0","attr-1","attr-2","attr-3","attr-4","attr-5","attr-6","attr-7","attr-8","attr-9","attr-10","attr-11","attr-12","attr-13","attr-14","attr-15","attr-16","attr-17","attr-18","attr-19","attr-20","attr-21","attr-22","attr-23","attr-24","attr-25","attr-26","attr-27","attr-28","attr-29","attr-30","attr-31","attr-32","attr-33","attr-34","attr-35","attr-36","attr-37","attr-38","attr-39","attr-40"],"params":{"identifier":"test_scv_tmp"}},{"id":"op-1","name":"Project","childIds":["op-2"],"output":["attr-1","attr-4"],"params":{"projectList":[{"__attrId":"attr-1"},{"__attrId":"attr-4"}]}}]},"attributes":[{"id":"attr-0","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sbit"},{"id":"attr-1","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sbool"},{"id":"attr-2","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sbox"},{"id":"attr-3","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sbytea"},{"id":"attr-4","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"schar"},{"id":"attr-5","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"scidr"},{"id":"attr-6","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"scircle"},{"id":"attr-7","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sdate"},{"id":"attr-8","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sdecimal"},{"id":"attr-9","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sfloat4"},{"id":"attr-10","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sfloat8"},{"id":"attr-11","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sinet"},{"id":"attr-12","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sint2"},{"id":"attr-13","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sint4"},{"id":"attr-14","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sint8"},{"id":"attr-15","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sinterval"},{"id":"attr-16","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sjson"},{"id":"attr-17","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sjsonb"},{"id":"attr-18","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sline"},{"id":"attr-19","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"slseg"},{"id":"attr-20","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"smacaddr"},{"id":"attr-21","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"smonkey"},{"id":"attr-22","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"snumeric"},{"id":"attr-23","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"spath"},{"id":"attr-24","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"spoint"},{"id":"attr-25","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"spolygon"},{"id":"attr-26","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sserial2"},{"id":"attr-27","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sserial4"},{"id":"attr-28","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sserial8"},{"id":"attr-29","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"stext"},{"id":"attr-30","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"stime"},{"id":"attr-31","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"stimestamp"},{"id":"attr-32","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"stimestamptz"},{"id":"attr-33","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"stimetz"},{"id":"attr-34","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"stsquery"},{"id":"attr-35","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"stsvector"},{"id":"attr-36","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"stxid_snapshot"},{"id":"attr-37","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"svarbit"},{"id":"attr-38","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"suuid"},{"id":"attr-39","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"svarchar"},{"id":"attr-40","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sxml"}],"expressions":{},"systemInfo":{"name":"spark","version":"3.1.2"},"agentInfo":{"name":"spline","version":"1.0.4"},"extraInfo":{"appName":"test123","dataTypes":[{"_typeHint":"dt.Simple","id":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"string","nullable":true}]}}
ExecutionEvent (apiVersion: 1.2):
{"planId":"4427d12c-d567-5ccd-b89a-a9d95f858ef2","timestamp":1677053222119,"error":"java.sql.SQLException: No suitable driver\n\tat java.sql.DriverManager.getDriver(DriverManager.java:315)\n\tat org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$2(JDBCOptions.scala:108)\n\tat scala.Option.getOrElse(Option.scala:189)\n\tat org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:108)\n\tat org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:217)\n\tat org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:221)\n\tat org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)\n\tat org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)\n\tat org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)\n\tat org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)\n\tat org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)\n\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)\n\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)\n\tat org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)\n\tat org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)\n\tat org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)\n\tat org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)\n\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)\n\tat org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)\n\tat org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)\n\tat org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)\n\tat org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)\n\tat com.hs.test$.main(test.scala:55)\n\tat com.hs.test.main(test.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)\n\tat org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)\n\tat org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)\n\tat org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)\n\tat org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)\n\tat org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)\n\tat org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)\n\tat org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)\n","extra":{"appId":"local-1677053200739","user":"jinmu","readMetrics":{},"writeMetrics":{}}}
23/02/22 16:07:02 INFO AsyncEventQueue: Process of event SparkListenerSQLExecutionEnd(2,1677053214309) by listener ExecutionListenerBus took 8.060134333s.
23/02/22 16:07:02 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/02/22 16:07:02 INFO MemoryStore: MemoryStore cleared
23/02/22 16:07:02 INFO BlockManager: BlockManager stopped
23/02/22 16:07:02 INFO BlockManagerMaster: BlockManagerMaster stopped
23/02/22 16:07:02 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/02/22 16:07:02 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" java.sql.SQLException: No suitable driver
    at java.sql.DriverManager.getDriver(DriverManager.java:315)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$2(JDBCOptions.scala:108)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:108)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:217)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:221)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
    at com.hs.test$.main(test.scala:55)
    at com.hs.test.main(test.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
cerveada commented 1 year ago

yes, spline is able to capture lineage of both successful and failed jobs.

jinmu0410 commented 1 year ago

@cerveada so, i have another question,Lineage sent to kafka

23/02/22 17:19:48 INFO SharedState: Warehouse path is 'file:/Users/jinmu/Downloads/soft/spark-3.1.2-bin-hadoop3.2/bin/spark-warehouse'.
23/02/22 17:19:49 INFO SparkLineageInitializer: Initializing Spline Agent...
23/02/22 17:19:49 INFO SparkLineageInitializer: Spline Version: 1.0.4 (rev. 4266fca)
23/02/22 17:19:49 INFO SparkLineageInitializer: Init Type: AUTO (codeless)
23/02/22 17:19:49 INFO SparkLineageInitializer: Init Mode: ENABLED
23/02/22 17:19:49 INFO SplineRecordSenderFactory: Kafka topic: spline_test
23/02/22 17:19:49 ERROR SparkLineageInitializer: Spline initialization failed! Spark Lineage tracking is DISABLED.
org.apache.commons.configuration.ConversionException: 'max.in.flight.requests.per.connection' doesn't map to a List object: 1, a java.lang.Integer
    at org.apache.commons.configuration.AbstractConfiguration.getList(AbstractConfiguration.java:1144)
    at org.apache.commons.configuration.AbstractConfiguration.getList(AbstractConfiguration.java:1109)
    at org.apache.commons.configuration.ConfigurationConverter.getProperties(ConfigurationConverter.java:116)
    at za.co.absa.spline.harvester.dispatcher.KafkaLineageDispatcher$$anonfun$$lessinit$greater$1.$anonfun$new$1(KafkaLineageDispatcher.scala:47)
    at za.co.absa.spline.harvester.dispatcher.kafkadispatcher.SplineRecordSenderFactory.<init>(SplineRecordSenderFactory.scala:34)
    at za.co.absa.spline.harvester.dispatcher.KafkaLineageDispatcher$$anonfun$$lessinit$greater$1.apply(KafkaLineageDispatcher.scala:46)
    at za.co.absa.spline.harvester.dispatcher.KafkaLineageDispatcher$$anonfun$$lessinit$greater$1.apply(KafkaLineageDispatcher.scala:43)
    at za.co.absa.spline.harvester.dispatcher.KafkaLineageDispatcher.<init>(KafkaLineageDispatcher.scala:57)
    at za.co.absa.spline.harvester.dispatcher.KafkaLineageDispatcher.<init>(KafkaLineageDispatcher.scala:43)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at za.co.absa.commons.HierarchicalObjectFactory$$anonfun$instantiate$3.applyOrElse(HierarchicalObjectFactory.scala:47)
    at za.co.absa.commons.HierarchicalObjectFactory$$anonfun$instantiate$3.applyOrElse(HierarchicalObjectFactory.scala:44)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
    at scala.util.Failure.recover(Try.scala:234)
    at za.co.absa.commons.HierarchicalObjectFactory.instantiate(HierarchicalObjectFactory.scala:44)
    at za.co.absa.spline.agent.AgentBOM$$anon$1.obtain(AgentBOM.scala:119)
    at za.co.absa.spline.agent.AgentBOM$$anon$1.$anonfun$lineageDispatcher$2(AgentBOM.scala:100)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.immutable.List.map(List.scala:298)
    at za.co.absa.spline.agent.AgentBOM$$anon$1.lineageDispatcher$lzycompute(AgentBOM.scala:100)
    at za.co.absa.spline.agent.AgentBOM$$anon$1.lineageDispatcher(AgentBOM.scala:91)
    at za.co.absa.spline.harvester.SparkLineageInitializer.$anonfun$createListener$8(SparkLineageInitializer.scala:146)
    at org.apache.spark.internal.Logging.logInfo(Logging.scala:57)
    at org.apache.spark.internal.Logging.logInfo$(Logging.scala:56)
cerveada commented 1 year ago

There could be some issue with the new yaml config. I will check that.

As a workaround you can set the same property using config for spark-submit:

--conf spark.spline.lineageDispatcher=kafka \
--conf spark.spline.lineageDispatcher.kafka.producer.max.in.flight.requests.per.connection=1 \

that should fix the issue for you.

jinmu0410 commented 1 year ago

thanks

jinmu0410 commented 1 year ago
截屏2023-02-23 16 36 48

in producer.model Is there any introduction to the class inside? There is no comment, and some people don’t understand what it means?

jinmu0410 commented 1 year ago

@cerveada

cerveada commented 1 year ago

All classes in this package represents a lineage that the agent captured. This is an internal model for the agent. It will be later transformed to similar "output" model defined by producer API. As the last step, the "output" model is converted to json and send to server.

jinmu0410 commented 1 year ago

like this json,"expressions" Can you help me find out what this model means? by the way ,spline-spark-agent Can the parsed lineage be sent to a third-party system? like dataHub?

"expressions":{
        "constants":[
            {
                "id":"expr-0",
                "dataType":"75fe27b9-9a00-5c7d-966f-33ba32333133",
                "extra":{
                    "simpleClassName":"Literal",
                    "_typeHint":"expr.Literal"
                },
                "value":1
            },
            {
                "id":"expr-1",
                "dataType":"75fe27b9-9a00-5c7d-966f-33ba32333133",
                "extra":{
                    "simpleClassName":"Literal",
                    "_typeHint":"expr.Literal"
                },
                "value":1
            },
            {
                "id":"expr-2",
                "dataType":"75fe27b9-9a00-5c7d-966f-33ba32333133",
                "extra":{
                    "simpleClassName":"Literal",
                    "_typeHint":"expr.Literal"
                },
                "value":1
            },
            {
                "id":"expr-3",
                "dataType":"75fe27b9-9a00-5c7d-966f-33ba32333133",
                "extra":{
                    "simpleClassName":"Literal",
                    "_typeHint":"expr.Literal"
                },
                "value":1
            }
        ]
    }
jinmu0410 commented 1 year ago

https://firststr.com/2021/04/26/spark-compute-lineage-to-datahub/ I found an article, but do we have examples?

cerveada commented 1 year ago

like this json,"expressions" Can you help me find out what this model means?

Maybe this discussion will help https://github.com/AbsaOSS/spline-spark-agent/discussions/563

Can the parsed lineage be sent to a third-party system? like dataHub?

Yes check the documentation: https://github.com/AbsaOSS/spline-spark-agent#dispatchers

https://firststr.com/2021/04/26/spark-compute-lineage-to-datahub/ I found an article, but do we have examples?

We don't provide any support for external servers other than Spline, but using the dispatchers should allow you to send the data wherever you want, you can always create custom dispatcher if the ones provided are not enough.

jinmu0410 commented 1 year ago

Thank you very much for your reply

jinmu0410 commented 1 year ago

In Kafka, each lineage process receives 2 messages,Is there any concurrency order problem when dealing with exectionPlan and exectionEvent? I mean if i parse “exectionEvent”, but only from “exectionPlan” i can confirm if this spark task is successful?

jinmu0410 commented 1 year ago

I was wrong, it is necessary to judge whether the task is successful by checking whether there is error information in the "exectionEvent" again

wajda commented 1 year ago

it is necessary to judge whether the task is successful by checking whether there is error information in the "exectionEvent"

That's right. Any non-null value in the "error" property means there was an error.

jinmu0410 commented 1 year ago

@cerveada Hi ,i have another question?

截屏2023-03-07 21 16 30
jinmu0410 commented 1 year ago

spark on yarn

wajda commented 1 year ago

What Spark and Yarn version are you using? We were never able to reproduce this error, but it does happen sometimes to other people. Basically what happens is Spark application shutdown event fires before all the listeners are executed. This seems to be abnormal Spark driver behavior, but more investigation is needed.

I'm closing this issue now, as it turned into a thread where different kinds of issues are discussed. Please feel free to open a new issue with more details about how to reliably reproduce this issue and we will take a look.

jinmu0410 commented 1 year ago

@wajda spark 3.1.2 and yarn 3.1