datahub-project / datahub

The Metadata Platform for your Data Stack
https://datahubproject.io
Apache License 2.0
9.76k stars 2.88k forks source link

Support spark lineage? #7347

Closed jinmu0410 closed 1 year ago

jinmu0410 commented 1 year ago

Describe the bug I used Spark2.4 3.1 to try to submit Spark jar. I only collected a little metadata, but not lineage

截屏2023-02-15 11 53 40

./spark-submit --class com.hs.test \ --master local \ --name test_003 \ --jars "/Users/jinmu/Downloads/datahub-spark-lineage-0.10.0rc1.jar" \ --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" \ --conf "spark.datahub.rest.server=http://localhost:8080" \ /Users/jinmu/Downloads/hs/lakehouse/target/hs-lakehouse-1.0-SNAPSHOT.jar

Submit log information

23/02/15 11:39:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 23/02/15 11:39:55 INFO SparkContext: Running Spark version 3.0.3 23/02/15 11:39:55 INFO ResourceUtils: ============================================================== 23/02/15 11:39:55 INFO ResourceUtils: Resources for spark.driver:

23/02/15 11:39:55 INFO ResourceUtils: ============================================================== 23/02/15 11:39:55 INFO SparkContext: Submitted application: test_005 23/02/15 11:39:55 INFO SecurityManager: Changing view acls to: jinmu 23/02/15 11:39:55 INFO SecurityManager: Changing modify acls to: jinmu 23/02/15 11:39:55 INFO SecurityManager: Changing view acls groups to: 23/02/15 11:39:55 INFO SecurityManager: Changing modify acls groups to: 23/02/15 11:39:55 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(jinmu); groups with view permissions: Set(); users with modify permissions: Set(jinmu); groups with modify permissions: Set() 23/02/15 11:39:55 INFO Utils: Successfully started service 'sparkDriver' on port 49993. 23/02/15 11:39:56 INFO SparkEnv: Registering MapOutputTracker 23/02/15 11:39:56 INFO SparkEnv: Registering BlockManagerMaster 23/02/15 11:39:56 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 23/02/15 11:39:56 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 23/02/15 11:39:56 INFO SparkEnv: Registering BlockManagerMasterHeartbeat 23/02/15 11:39:56 INFO DiskBlockManager: Created local directory at /private/var/folders/g6/3j6jyxt964x1pk4qk7_b49rw0000gn/T/blockmgr-80f940fd-87c5-4ab7-9dc5-e829dc456fbc 23/02/15 11:39:56 INFO MemoryStore: MemoryStore started with capacity 366.3 MiB 23/02/15 11:39:56 INFO SparkEnv: Registering OutputCommitCoordinator 23/02/15 11:39:56 INFO Utils: Successfully started service 'SparkUI' on port 4040. 23/02/15 11:39:56 INFO SparkUI: Bound SparkUI to 127.0.0.1, and started at http://localhost:4040 23/02/15 11:39:56 INFO SparkContext: Added JAR /Users/jinmu/Downloads/datahub-spark-lineage-0.10.0rc1.jar at spark://localhost:49993/jars/datahub-spark-lineage-0.10.0rc1.jar with timestamp 1676432395541 23/02/15 11:39:56 INFO Executor: Starting executor ID driver on host localhost 23/02/15 11:39:56 INFO Executor: Fetching spark://localhost:49993/jars/datahub-spark-lineage-0.10.0rc1.jar with timestamp 1676432395541 23/02/15 11:39:56 INFO TransportClientFactory: Successfully created connection to localhost/127.0.0.1:49993 after 28 ms (0 ms spent in bootstraps) 23/02/15 11:39:56 INFO Utils: Fetching spark://localhost:49993/jars/datahub-spark-lineage-0.10.0rc1.jar to /private/var/folders/g6/3j6jyxt964x1pk4qk7_b49rw0000gn/T/spark-d8419284-d3b1-44f9-8df9-081dccb96513/userFiles-5019790b-2569-48c6-a06d-2717c80eb075/fetchFileTemp4317236260463150066.tmp 23/02/15 11:39:57 INFO Executor: Adding file:/private/var/folders/g6/3j6jyxt964x1pk4qk7_b49rw0000gn/T/spark-d8419284-d3b1-44f9-8df9-081dccb96513/userFiles-5019790b-2569-48c6-a06d-2717c80eb075/datahub-spark-lineage-0.10.0rc1.jar to class loader 23/02/15 11:39:57 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 49995. 23/02/15 11:39:57 INFO NettyBlockTransferService: Server created on localhost:49995 23/02/15 11:39:57 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 23/02/15 11:39:57 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, localhost, 49995, None) 23/02/15 11:39:57 INFO BlockManagerMasterEndpoint: Registering block manager localhost:49995 with 366.3 MiB RAM, BlockManagerId(driver, localhost, 49995, None) 23/02/15 11:39:57 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, localhost, 49995, None) 23/02/15 11:39:57 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, localhost, 49995, None) SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 23/02/15 11:39:57 INFO SparkContext: Registered listener datahub.spark.DatahubSparkListener SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 23/02/15 11:39:57 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/Users/jinmu/Downloads/soft/spark-2.4.0-bin-hadoop2.7/bin/spark-warehouse'). 23/02/15 11:39:57 INFO SharedState: Warehouse path is 'file:/Users/jinmu/Downloads/soft/spark-2.4.0-bin-hadoop2.7/bin/spark-warehouse'. 23/02/15 11:39:58 INFO AsyncEventQueue: Process of event SparkListenerApplicationStart(test_005,Some(local-1676432396713),1676432395541,jinmu,None,None,None) by listener DatahubSparkListener took 1.620096s. 23/02/15 11:39:59 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 23/02/15 11:39:59 INFO InMemoryFileIndex: It took 90 ms to list leaf files for 1 paths. 23/02/15 11:39:59 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 344.0 KiB, free 366.0 MiB) 23/02/15 11:40:00 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 29.9 KiB, free 365.9 MiB) 23/02/15 11:40:00 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:49995 (size: 29.9 KiB, free: 366.3 MiB) 23/02/15 11:40:00 INFO SparkContext: Created broadcast 0 from csv at test.scala:29 23/02/15 11:40:00 INFO FileInputFormat: Total input paths to process : 1 23/02/15 11:40:00 INFO FileInputFormat: Total input paths to process : 1 23/02/15 11:40:00 INFO CombineFileInputFormat: DEBUG: Terminated node allocation with : CompletedNodes: 3, size left: 8707 23/02/15 11:40:00 INFO SparkContext: Starting job: csv at test.scala:29 23/02/15 11:40:00 INFO DAGScheduler: Got job 0 (csv at test.scala:29) with 1 output partitions 23/02/15 11:40:00 INFO DAGScheduler: Final stage: ResultStage 0 (csv at test.scala:29) 23/02/15 11:40:00 INFO DAGScheduler: Parents of final stage: List() 23/02/15 11:40:00 INFO DAGScheduler: Missing parents: List() 23/02/15 11:40:00 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at csv at test.scala:29), which has no missing parents 23/02/15 11:40:00 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 6.2 KiB, free 365.9 MiB) 23/02/15 11:40:00 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 3.6 KiB, free 365.9 MiB) 23/02/15 11:40:00 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:49995 (size: 3.6 KiB, free: 366.3 MiB) 23/02/15 11:40:00 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1223 23/02/15 11:40:00 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[2] at csv at test.scala:29) (first 15 tasks are for partitions Vector(0)) 23/02/15 11:40:00 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 23/02/15 11:40:00 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 7456 bytes) 23/02/15 11:40:00 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 23/02/15 11:40:01 INFO BinaryFileRDD: Input split: Paths:/jinmu/test/3.csv:0+8707 23/02/15 11:40:01 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1057 bytes result sent to driver 23/02/15 11:40:01 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 898 ms on localhost (executor driver) (1/1) 23/02/15 11:40:01 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 23/02/15 11:40:01 INFO DAGScheduler: ResultStage 0 (csv at test.scala:29) finished in 1.016 s 23/02/15 11:40:01 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job 23/02/15 11:40:01 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished 23/02/15 11:40:01 INFO DAGScheduler: Job 0 finished: csv at test.scala:29, took 1.111524 s 23/02/15 11:40:02 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:49995 in memory (size: 3.6 KiB, free: 366.3 MiB) 23/02/15 11:40:02 INFO BlockManagerInfo: Removed broadcast_0_piece0 on localhost:49995 in memory (size: 29.9 KiB, free: 366.3 MiB) 23/02/15 11:40:03 INFO FileSourceStrategy: Pushed Filters: 23/02/15 11:40:03 INFO FileSourceStrategy: Post-Scan Filters: 23/02/15 11:40:03 INFO FileSourceStrategy: Output Data Schema: struct 23/02/15 11:40:03 INFO ParquetFileFormat: Using default output committer for Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter 23/02/15 11:40:03 INFO FileOutputCommitter: File Output Committer Algorithm version is 1 23/02/15 11:40:03 INFO SQLHadoopMapReduceCommitProtocol: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter 23/02/15 11:40:03 INFO FileOutputCommitter: File Output Committer Algorithm version is 1 23/02/15 11:40:03 INFO SQLHadoopMapReduceCommitProtocol: Using output committer class org.apache.parquet.hadoop.ParquetOutputCommitter 23/02/15 11:40:03 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 343.6 KiB, free 366.0 MiB) 23/02/15 11:40:03 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 29.9 KiB, free 365.9 MiB) 23/02/15 11:40:03 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:49995 (size: 29.9 KiB, free: 366.3 MiB) 23/02/15 11:40:03 INFO SparkContext: Created broadcast 2 from parquet at test.scala:34 23/02/15 11:40:03 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes. 23/02/15 11:40:04 INFO SparkContext: Starting job: parquet at test.scala:34 23/02/15 11:40:04 INFO DAGScheduler: Got job 1 (parquet at test.scala:34) with 1 output partitions 23/02/15 11:40:04 INFO DAGScheduler: Final stage: ResultStage 1 (parquet at test.scala:34) 23/02/15 11:40:04 INFO DAGScheduler: Parents of final stage: List() 23/02/15 11:40:04 INFO DAGScheduler: Missing parents: List() 23/02/15 11:40:04 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[5] at parquet at test.scala:34), which has no missing parents 23/02/15 11:40:04 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 170.1 KiB, free 365.8 MiB) 23/02/15 11:40:04 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 65.7 KiB, free 365.7 MiB) 23/02/15 11:40:04 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:49995 (size: 65.7 KiB, free: 366.2 MiB) 23/02/15 11:40:04 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1223 23/02/15 11:40:04 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at parquet at test.scala:34) (first 15 tasks are for partitions Vector(0)) 23/02/15 11:40:04 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 23/02/15 11:40:04 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, executor driver, partition 0, ANY, 7736 bytes) 23/02/15 11:40:04 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) 23/02/15 11:40:04 INFO FileOutputCommitter: File Output Committer Algorithm version is 1 23/02/15 11:40:04 INFO SQLHadoopMapReduceCommitProtocol: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter 23/02/15 11:40:04 INFO FileOutputCommitter: File Output Committer Algorithm version is 1 23/02/15 11:40:04 INFO SQLHadoopMapReduceCommitProtocol: Using output committer class org.apache.parquet.hadoop.ParquetOutputCommitter 23/02/15 11:40:04 INFO CodecConfig: Compression: SNAPPY 23/02/15 11:40:04 INFO CodecConfig: Compression: SNAPPY 23/02/15 11:40:04 INFO ParquetOutputFormat: Parquet block size to 134217728 23/02/15 11:40:04 INFO ParquetOutputFormat: Parquet page size to 1048576 23/02/15 11:40:04 INFO ParquetOutputFormat: Parquet dictionary page size to 1048576 23/02/15 11:40:04 INFO ParquetOutputFormat: Dictionary is on 23/02/15 11:40:04 INFO ParquetOutputFormat: Validation is off 23/02/15 11:40:04 INFO ParquetOutputFormat: Writer version is: PARQUET_1_0 23/02/15 11:40:04 INFO ParquetOutputFormat: Maximum row group padding size is 8388608 bytes 23/02/15 11:40:04 INFO ParquetOutputFormat: Page size checking is: estimated 23/02/15 11:40:04 INFO ParquetOutputFormat: Min row count for page size check is: 100 23/02/15 11:40:04 INFO ParquetOutputFormat: Max row count for page size check is: 10000 23/02/15 11:40:04 INFO ParquetWriteSupport: Initialized Parquet WriteSupport with Catalyst schema: { "type" : "struct", "fields" : [ { "name" : "PK\u0003\u0004", "type" : "string", "nullable" : true, "metadata" : { } } ] } and corresponding Parquet message type: message spark_schema { optional binary PK (UTF8); }

23/02/15 11:40:04 INFO CodecPool: Got brand-new compressor [.snappy] 23/02/15 11:40:04 INFO FileScanRDD: Reading File path: hdfs://lake-node3:8020/jinmu/test/3.csv, range: 0-8707, partition values: [empty row] 23/02/15 11:40:05 INFO CodeGenerator: Code generated in 261.802583 ms 23/02/15 11:40:05 INFO InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 7672 23/02/15 11:40:05 INFO FileOutputCommitter: Saved output of task 'attempt_202302151140035966139727189887635_0001_m_000000_1' to hdfs://lake-node3:8020/jinmu/test/2.parquet/_temporary/0/task_202302151140035966139727189887635_0001_m_000000 23/02/15 11:40:05 INFO SparkHadoopMapRedUtil: attempt_202302151140035966139727189887635_0001_m_000000_1: Committed 23/02/15 11:40:05 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 2284 bytes result sent to driver 23/02/15 11:40:05 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 1577 ms on localhost (executor driver) (1/1) 23/02/15 11:40:05 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 23/02/15 11:40:05 INFO DAGScheduler: ResultStage 1 (parquet at test.scala:34) finished in 1.676 s 23/02/15 11:40:05 INFO DAGScheduler: Job 1 is finished. Cancelling potential speculative or zombie tasks for this job 23/02/15 11:40:05 INFO TaskSchedulerImpl: Killing all running tasks in stage 1: Stage finished 23/02/15 11:40:05 INFO DAGScheduler: Job 1 finished: parquet at test.scala:34, took 1.712650 s 23/02/15 11:40:05 INFO FileFormatWriter: Write Job 388ff337-c292-43cf-bc78-b8eb88b9f7e2 committed. 23/02/15 11:40:05 INFO FileFormatWriter: Finished processing stats for write job 388ff337-c292-43cf-bc78-b8eb88b9f7e2. 23/02/15 11:40:05 INFO SparkContext: Invoking stop() from shutdown hook 23/02/15 11:40:05 INFO SparkUI: Stopped Spark web UI at http://localhost:4040 23/02/15 11:40:06 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 23/02/15 11:40:06 INFO MemoryStore: MemoryStore cleared 23/02/15 11:40:06 INFO BlockManager: BlockManager stopped 23/02/15 11:40:06 INFO BlockManagerMaster: BlockManagerMaster stopped 23/02/15 11:40:06 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 23/02/15 11:40:06 INFO SparkContext: Successfully stopped SparkContext 23/02/15 11:40:06 INFO ShutdownHookManager: Shutdown hook called 23/02/15 11:40:06 INFO ShutdownHookManager: Deleting directory /private/var/folders/g6/3j6jyxt964x1pk4qk7_b49rw0000gn/T/spark-c134fad9-07fa-479d-b54d-8fd2657c247c 23/02/15 11:40:06 INFO ShutdownHookManager: Deleting directory /private/var/folders/g6/3j6jyxt964x1pk4qk7_b49rw0000gn/T/spark-d8419284-d3b1-44f9-8df9-081dccb96513 jinmu@jinmudeMacBook-Pro bin %

DataHub

截屏2023-02-15 11 57 07 截屏2023-02-15 11 57 15
jinmu0410 commented 1 year ago

I am a beginner, can anyone help me

github-actions[bot] commented 1 year ago

This issue is stale because it has been open for 30 days with no activity. If you believe this is still an issue on the latest DataHub release please leave a comment with the version that you tested it with. If this is a question/discussion please head to https://slack.datahubproject.io. For feature requests please use https://feature-requests.datahubproject.io

prarshah1 commented 1 year ago

I tested this with current latest version 0.10.1, Following is what I see...

Screenshot 2023-03-30 at 2 14 14 PM

When I do the same spark operations with some other data lineage tool like OpenLineage, we get info about the stages such as join, etc

I am new to datahub, is my understanding correct?

github-actions[bot] commented 1 year ago

This issue is stale because it has been open for 30 days with no activity. If you believe this is still an issue on the latest DataHub release please leave a comment with the version that you tested it with. If this is a question/discussion please head to https://slack.datahubproject.io. For feature requests please use https://feature-requests.datahubproject.io

github-actions[bot] commented 1 year ago

This issue was closed because it has been inactive for 30 days since being marked as stale.

chadlagore commented 1 year ago

I think you need to ensure that all dataset nodes actually exist with URNs: https://github.com/datahub-project/datahub/issues/7640#issuecomment-1515112287.

lhtrong commented 1 year ago

@chadlagore How can we do that ? Can you give some more detail please ? Thanks.