datahub-project / datahub

The Metadata Platform for your Data Stack
https://datahubproject.io
Apache License 2.0
9.76k stars 2.88k forks source link

Apache Spark agent is unable to push metadata due to unknown reason #10838

Closed Lockdain closed 2 months ago

Lockdain commented 3 months ago

Describe the bug I've tried to setup a test Spark example according to the article: https://datahubproject.io/docs/metadata-integration/java/spark-lineage-beta

Datahub 0.13.1 is running in Docker.

I use spark-submit running locally to execute PySpark job listed below:

spark-submit --packages io.acryl:acryl-spark-lineage:0.2.11 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" test.py

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import *
import os
import sys

def main():
    os.environ['PYSPARK_PYTHON'] = sys.executable
    os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
    spark = SparkSession.builder.master("local[*]").appName("test-application").config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.11").config("spark.extraListeners", "datahub.spark.DatahubSparkListener").config("spark.datahub.rest.server", "http://localhost:8080").enableHiveSupport().getOrCreate()
    df = spark.createDataFrame([(1, "foo"),(2, "bar"),],["id", "label"])
    df.createOrReplaceTempView("stock_price1")
    sqlDF = spark.sql("SELECT * FROM stock_price1")
    sqlDF.printSchema()
if __name__ == "__main__":
    main()

The spark application finishes as expected and as per logs shows some details concerning Spark agent. But nothing appears in DataHub, seems that agent doesn't even try to publish meta events to GMS:

C:\App\Spark\bin>spark-submit --packages io.acryl:acryl-spark-lineage:0.2.11 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" test.py
:: loading settings :: url = jar:file:/C:/App/Spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: C:\Users\corp-user\.ivy2\cache
The jars for the packages stored in: C:\Users\corp-user\.ivy2\jars
io.acryl#acryl-spark-lineage added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-5c2b449d-2797-4ea5-9e7e-478da13c56be;1.0
        confs: [default]
        found io.acryl#acryl-spark-lineage;0.2.11 in central
:: resolution report :: resolve 184ms :: artifacts dl 5ms
        :: modules in use:
        io.acryl#acryl-spark-lineage;0.2.11 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-5c2b449d-2797-4ea5-9e7e-478da13c56be
        confs: [default]
        0 artifacts copied, 1 already retrieved (0kB/8ms)
24/07/03 13:52:21 INFO SparkContext: Running Spark version 3.5.1
24/07/03 13:52:21 INFO SparkContext: OS info Windows 10, 10.0, amd64
24/07/03 13:52:21 INFO SparkContext: Java version 1.8.0_411
24/07/03 13:52:21 INFO ResourceUtils: ==============================================================
24/07/03 13:52:21 INFO ResourceUtils: No custom resources configured for spark.driver.
24/07/03 13:52:21 INFO ResourceUtils: ==============================================================
24/07/03 13:52:21 INFO SparkContext: Submitted application: test-application
24/07/03 13:52:21 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
24/07/03 13:52:21 INFO ResourceProfile: Limiting resource is cpu
24/07/03 13:52:21 INFO ResourceProfileManager: Added ResourceProfile id: 0
24/07/03 13:52:22 INFO SecurityManager: Changing view acls to: corp-user
24/07/03 13:52:22 INFO SecurityManager: Changing modify acls to: corp-user
24/07/03 13:52:22 INFO SecurityManager: Changing view acls groups to:
24/07/03 13:52:22 INFO SecurityManager: Changing modify acls groups to:
24/07/03 13:52:22 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: corp-user; groups with view permissions: EMPTY; users with modify permissions: corp-user; groups with modify permissions: EMPTY
24/07/03 13:52:23 INFO Utils: Successfully started service 'sparkDriver' on port 56245.
24/07/03 13:52:23 INFO SparkEnv: Registering MapOutputTracker
24/07/03 13:52:23 INFO SparkEnv: Registering BlockManagerMaster
24/07/03 13:52:23 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
24/07/03 13:52:23 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
24/07/03 13:52:23 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
24/07/03 13:52:23 INFO DiskBlockManager: Created local directory at C:\Users\corp-user\AppData\Local\Temp\blockmgr-cc24e426-5fbe-4ad3-a7c7-fe596d24e1db
24/07/03 13:52:23 INFO MemoryStore: MemoryStore started with capacity 366.3 MiB
24/07/03 13:52:23 INFO SparkEnv: Registering OutputCommitCoordinator
24/07/03 13:52:24 INFO JettyUtils: Start Jetty 0.0.0.0:4040 for SparkUI
24/07/03 13:52:24 INFO Utils: Successfully started service 'SparkUI' on port 4040.
24/07/03 13:52:24 INFO SparkContext: Added JAR file:///C:/Users/corp-user/.ivy2/jars/io.acryl_acryl-spark-lineage-0.2.11.jar at spark://host.docker.internal:56245/jars/io.acryl_acryl-spark-lineage-0.2.11.jar with timestamp 1720003941735
24/07/03 13:52:24 INFO SparkContext: Added file file:///C:/Users/corp-user/.ivy2/jars/io.acryl_acryl-spark-lineage-0.2.11.jar at file:///C:/Users/corp-user/.ivy2/jars/io.acryl_acryl-spark-lineage-0.2.11.jar with timestamp 1720003941735
24/07/03 13:52:24 INFO Utils: Copying C:\Users\corp-user\.ivy2\jars\io.acryl_acryl-spark-lineage-0.2.11.jar to C:\Users\corp-user\AppData\Local\Temp\spark-bd8845e0-8c64-4fdc-8816-8f0368409c05\userFiles-008b1250-27e0-4f3d-b2cd-c39a634142fa\io.acryl_acryl-spark-lineage-0.2.11.jar
24/07/03 13:52:24 INFO Executor: Starting executor ID driver on host host.docker.internal
24/07/03 13:52:24 INFO Executor: OS info Windows 10, 10.0, amd64
24/07/03 13:52:24 INFO Executor: Java version 1.8.0_411
24/07/03 13:52:24 INFO Executor: Starting executor with user classpath (userClassPathFirst = false): ''
24/07/03 13:52:24 INFO Executor: Created or updated repl class loader org.apache.spark.util.MutableURLClassLoader@57088cff for default.
24/07/03 13:52:24 INFO Executor: Fetching file:///C:/Users/corp-user/.ivy2/jars/io.acryl_acryl-spark-lineage-0.2.11.jar with timestamp 1720003941735
24/07/03 13:52:25 INFO Utils: C:\Users\corp-user\.ivy2\jars\io.acryl_acryl-spark-lineage-0.2.11.jar has been previously copied to C:\Users\corp-user\AppData\Local\Temp\spark-bd8845e0-8c64-4fdc-8816-8f0368409c05\userFiles-008b1250-27e0-4f3d-b2cd-c39a634142fa\io.acryl_acryl-spark-lineage-0.2.11.jar
24/07/03 13:52:25 INFO Executor: Fetching spark://host.docker.internal:56245/jars/io.acryl_acryl-spark-lineage-0.2.11.jar with timestamp 1720003941735
24/07/03 13:52:25 INFO TransportClientFactory: Successfully created connection to host.docker.internal/192.168.1.134:56245 after 59 ms (0 ms spent in bootstraps)
24/07/03 13:52:25 INFO Utils: Fetching spark://host.docker.internal:56245/jars/io.acryl_acryl-spark-lineage-0.2.11.jar to C:\Users\corp-user\AppData\Local\Temp\spark-bd8845e0-8c64-4fdc-8816-8f0368409c05\userFiles-008b1250-27e0-4f3d-b2cd-c39a634142fa\fetchFileTemp2387115782614911163.tmp
24/07/03 13:52:26 INFO Utils: C:\Users\corp-user\AppData\Local\Temp\spark-bd8845e0-8c64-4fdc-8816-8f0368409c05\userFiles-008b1250-27e0-4f3d-b2cd-c39a634142fa\fetchFileTemp2387115782614911163.tmp has been previously copied to C:\Users\corp-user\AppData\Local\Temp\spark-bd8845e0-8c64-4fdc-8816-8f0368409c05\userFiles-008b1250-27e0-4f3d-b2cd-c39a634142fa\io.acryl_acryl-spark-lineage-0.2.11.jar
24/07/03 13:52:26 INFO Executor: Adding file:/C:/Users/corp-user/AppData/Local/Temp/spark-bd8845e0-8c64-4fdc-8816-8f0368409c05/userFiles-008b1250-27e0-4f3d-b2cd-c39a634142fa/io.acryl_acryl-spark-lineage-0.2.11.jar to class loader default
24/07/03 13:52:26 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 56300.
24/07/03 13:52:26 INFO NettyBlockTransferService: Server created on host.docker.internal:56300
24/07/03 13:52:26 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
24/07/03 13:52:26 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, host.docker.internal, 56300, None)
24/07/03 13:52:26 INFO BlockManagerMasterEndpoint: Registering block manager host.docker.internal:56300 with 366.3 MiB RAM, BlockManagerId(driver, host.docker.internal, 56300, None)
24/07/03 13:52:26 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, host.docker.internal, 56300, None)
24/07/03 13:52:26 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, host.docker.internal, 56300, None)
24/07/03 13:52:27 INFO SparkContext: Registered listener datahub.spark.DatahubSparkListener
24/07/03 13:52:27 INFO DatahubSparkListener: Application start called
24/07/03 13:52:28 INFO ArgumentParser: Couldn't log config from file, will read it from SparkConf
24/07/03 13:52:29 INFO RedshiftVendor: Checking if Redshift classes are available
24/07/03 13:52:29 INFO RedshiftVendor: Checking if Redshift classes are available
24/07/03 13:52:29 INFO RedshiftVendor: Checking if Redshift classes are available
24/07/03 13:52:29 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.
24/07/03 13:52:29 INFO SharedState: Warehouse path is 'file:/C:/App/Spark/bin/spark-warehouse'.
24/07/03 13:52:32 INFO ConsoleTransport: {"eventTime":"2024-07-03T10:52:21.735Z","producer":"https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark","schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent","eventType":"START","run":{"runId":"01907839-71fe-7545-88cd-174f02242fea","facets":{"spark_properties":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","properties":{"spark.master":"local[*]","spark.app.name":"test-application"}},"processing_engine":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet","version":"3.5.1","name":"spark"},"spark_applicationDetails":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","master":"local[*]","appName":"test-application","applicationId":"local-1720003944734","deployMode":"client","driverHost":"host.docker.internal","userName":"corp-user","uiWebUrl":"http://host.docker.internal:4040"},"environment-properties":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","environment-properties":{}}}},"job":{"namespace":"default","name":"test_application","facets":{"jobType":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json#/$defs/JobTypeJobFacet","processingType":"NONE","integration":"SPARK","jobType":"APPLICATION"}}},"inputs":[],"outputs":[]}
24/07/03 13:52:32 INFO DatahubSparkListener: onApplicationStart completed successfully in 4518 ms
24/07/03 13:52:32 INFO AsyncEventQueue: Process of event SparkListenerApplicationStart(test-application,Some(local-1720003944734),1720003941735,corp-user,None,None,None) by listener DatahubSparkListener took 4.5174741s.
24/07/03 13:52:35 ERROR ContextFactory: Query execution is null: can't emit event for executionId 0
24/07/03 13:52:35 INFO RedshiftVendor: Checking if Redshift classes are available
24/07/03 13:52:35 INFO SparkSQLExecutionContext: OpenLineage received Spark event that is configured to be skipped: SparkListenerSQLExecutionEnd
root
 |-- id: long (nullable = true)
 |-- label: string (nullable = true)

24/07/03 13:52:36 INFO SparkContext: Invoking stop() from shutdown hook
24/07/03 13:52:36 INFO SparkContext: SparkContext is stopping with exitCode 0.
24/07/03 13:52:36 INFO RedshiftVendor: Checking if Redshift classes are available
24/07/03 13:52:36 INFO ConsoleTransport: {"eventTime":"2024-07-03T10:52:36.081Z","producer":"https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark","schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent","eventType":"COMPLETE","run":{"runId":"01907839-71fe-7545-88cd-174f02242fea","facets":{"spark_properties":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","properties":{"spark.master":"local[*]","spark.app.name":"test-application"}},"processing_engine":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet","version":"3.5.1","name":"spark"},"environment-properties":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","environment-properties":{}}}},"job":{"namespace":"default","name":"test_application","facets":{"jobType":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json#/$defs/JobTypeJobFacet","processingType":"NONE","integration":"SPARK","jobType":"APPLICATION"}}},"inputs":[],"outputs":[]}
24/07/03 13:52:36 WARN DatahubSparkListener: Emitter is not initialized, unable to emit coalesced events
24/07/03 13:52:36 INFO SparkUI: Stopped Spark web UI at http://host.docker.internal:4040
24/07/03 13:52:36 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
24/07/03 13:52:36 INFO MemoryStore: MemoryStore cleared
24/07/03 13:52:36 INFO BlockManager: BlockManager stopped
24/07/03 13:52:36 INFO BlockManagerMaster: BlockManagerMaster stopped
24/07/03 13:52:36 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
24/07/03 13:52:36 INFO SparkContext: Successfully stopped SparkContext
24/07/03 13:52:36 INFO ShutdownHookManager: Shutdown hook called
24/07/03 13:52:36 INFO ShutdownHookManager: Deleting directory C:\Users\corp-user\AppData\Local\Temp\spark-67ecc045-e663-43b4-bcb3-e8408da8af00
24/07/03 13:52:36 INFO ShutdownHookManager: Deleting directory C:\Users\corp-user\AppData\Local\Temp\spark-bd8845e0-8c64-4fdc-8816-8f0368409c05\pyspark-820c2f9e-c5a0-4f02-993c-3ded6415dbf4
24/07/03 13:52:36 INFO ShutdownHookManager: Deleting directory C:\Users\corp-user\AppData\Local\Temp\spark-bd8845e0-8c64-4fdc-8816-8f0368409c05

To Reproduce

  1. Run the Spark job using the given spark-submit command
  2. Wait until it finishes

Expected behavior Datahub shows the metadata extracted from the Spark application.

Screenshots If applicable, add screenshots to help explain your problem.

Desktop (please complete the following information):

Python 3.12.4 (tags/v3.12.4:8e8a4ba, Jun  6 2024, 19:30:16)
Spark version 3.5.1 Using Scala version 2.12.18, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_411
Java version "22.0.1" 2024-04-16
Java(TM) SE Runtime Environment (build 22.0.1+8-16)
Java HotSpot(TM) 64-Bit Server VM (build 22.0.1+8-16, mixed mode, sharing)
DataHub 0.13.1 (in Docker)
Docker Desktop 4.31.1 (153621)
treff7es commented 3 months ago

Can you try to write the content of the dataframe somewhere and check if it can send metadata in that case? In the meantime I will check and try to fix with your example. If seems like we initialize the DataHub emitter in an event which only get triggered if you read from or write to somewhere.

Lockdain commented 3 months ago

Hi @treff7es,

Thanks for your reply. I've updated the Spark job so it saves the data frame to a PostgreSQL table and the agent has started to push metadata to DataHub.

Now it seems to me that this behavior of the Spark agent is rather correct since the main goal of the agent is to provide a lineage, which is based upon a "source->transformation->destination" chain.\

Am I right?

treff7es commented 3 months ago

Yes, it is not really useful to use the agent to capture the Spark job without inputs and outputs.

treff7es commented 2 months ago

I fixed this issue as well, now you should be able to see the spark job without inputs.