OpenLineage / OpenLineage

An Open Standard for lineage metadata collection
http://openlineage.io
Apache License 2.0
1.66k stars 283 forks source link

Spark Integration - Schema for text file and column level lineage is not captured #2573

Open Anitha-KS opened 3 months ago

Anitha-KS commented 3 months ago

We tried the below code for POC:

_from pyspark.sql import SparkSession

spark = (SparkSession.builder.master('local').appName('openlineage_spark_test')
.getOrCreate())
spark.sparkContext.setLogLevel("TRACE")

empDF = spark.read.option("header",True).csv("file:///emp.1.txt") #.selectExpr("emp_id","name","superior_emp_id","year_joined","emp_dept_id","gender","salary")
empDF.printSchema()
empDF.count()

deptDF = spark.read.option("header",True).csv("file:///dept.1.txt") #.selectExpr("dept_name","dept_id")
deptDF.printSchema()

finalDF = empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id,"inner")
finalDF.printSchema()
finalDF.write.options(header='True', delimiter=',').mode("overwrite").csv("ol_test_output")_

spark.read.csv("ol_test_output").count()

Execute: spark-submit --master yarn --deploy-mode client --queue default --conf "spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener" --conf "spark.openlineage.transport.type=http" --conf "spark.openlineage.transport.url=<>" --jars "openlineage-spark_2.13-1.10.2.jar" --conf "spark.openlineage.namespace=pyspark_integration" ol-test.py

This code is producing job level lineage, but

boring-cyborg[bot] commented 3 months ago

Thanks for creating your first OpenLineage issue! Your feedback is valuable and improves the project. If you haven't already, please be sure to follow the issue template!

mobuchowski commented 3 months ago

@Anitha-KS what's the Spark version you're using?

TridevNS commented 3 months ago

Faced similar kind of issue: @mobuchowski and @Anitha-KS

val spark = SparkSession
  .builder()
  .appName("dataset_example").master("local[*]")
  .config("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener")
  .config("spark.openlineage.transport.type", "http")
  .config("spark.openlineage.transport.url", "http://localhost:5000")
  .config("spark.openlineage.namespace", "spark_integration")
  .config("spark.openlineage.facets.disabled", "[spark.logicalPlan]")
  .config("spark.openlineage.debugFacet", "enabled")
  .getOrCreate()

      val inputData = spark.read.option("header", "true").option("inferSchema", "true").csv("/Users/tridev/Downloads/housing.csv")

val tempDF = inputData.withColumn("total_price", col("price")*col("lotsize"))

tempDF.write.format("parquet").mode("overwrite").save("/Users/tridev/Downloads/new")

In lineage: no column lineage info like(https://openlineage.io/docs/spec/facets/dataset-facets/column_lineage_facet)

Screenshot 2024-04-15 at 12 55 45 PM

Spark version: 3.3.1

In logs I can see the column level lineage:

[spark-listener-group-shared] DEBUG io.openlineage.spark.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager - Connection released: [id: 0][route: {}->http://localhost:5000][total available: 1; route allocated: 1 of 2; total allocated: 1 of 20] 2024-04-15 12:54:05,801 [spark-listener-group-shared] DEBUG io.openlineage.spark.agent.EventEmitter - Emitting lineage completed successfully: {"eventTime":"2024-04-15T07:24:04.365Z","producer":"https://github.com/OpenLineage/OpenLineage/tree/1.9.1/integration/spark","schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent","eventType":"COMPLETE","run":{"runId":"068416bd-5416-48bf-89df-46f36449eac0","facets":{"parent":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.9.1/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-0/ParentRunFacet.json#/$defs/ParentRunFacet","run":{"runId":"a6c8696b-8073-4d71-bcee-1bde4442842f"},"job":{"namespace":"spark_integration","name":"dataset_example"}},"debug":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.9.1/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","classpath":{"openLineageVersion":"1.9.1","sparkVersion":"3.3.1","scalaVersion":"2.12.15","jars":[],"classDetails":[{"className":"org.apache.spark.sql.delta.catalog.DeltaCatalog","packageVersion":"2.1.0","onClasspath":true},{"className":"org.apache.iceberg.catalog.Catalog","onClasspath":false},{"className":"com.google.cloud.spark.bigquery.BigQueryRelation","packageVersion":"0.27.0","onClasspath":true}]},"system":{"sparkDeployMode":"client","javaVersion":"1.8.0_181","javaVendor":"Oracle Corporation","osArch":"x86_64","osName":"Mac OS X","osVersion":"10.16","userLanguage":"en","userTimezone":"Asia/Kolkata"},"config":{"extraListeners":"io.openlineage.spark.agent.OpenLineageSparkListener","openLineageConfig":{"debugFacet":"enabled","facets.disabled":"[spark.logicalPlan]","namespace":"spark_integration","transport.type":"http","transport.url":"http://localhost:5000"},"catalogClass":"org.apache.spark.sql.internal.CatalogImpl"},"logicalPlan":{"nodes":[{"id":"InsertIntoHadoopFsRelationCommand@1508650397","desc":"InsertIntoHadoopFsRelationCommand file:/Users/tridev/Downloads/new, false, Parquet, [path=/Users/tridev/Downloads/new], Overwrite, [_c0, price, lotsize, bedrooms, bathrms, stories, driveway, recroom, fullbase, gashw, airco, garagepl, prefarea, total_price]\n+- Project [_c0#17, price#18, lotsize#19, bedrooms#20, bathrms#21, stories#22, driveway#23, recroom#24, fullbase#25, gashw#26, airco#27, garagepl#28, prefarea#29, (price#18 cast(lotsize#19 as double)) AS total_price#43]\n +- Relation [_c0#17,price#18,lotsize#19,bedrooms#20,bathrms#21,stories#22,driveway#23,recroom#24,fullbase#25,gashw#26,airco#27,garagepl#28,prefarea#29] csv\n","children":["Project@-915422472"]},{"id":"Project@-915422472","desc":"Project [_c0#17, price#18, lotsize#19, bedrooms#20, bathrms#21, stories#22, driveway#23, recroom#24, fullbase#25, gashw#26, airco#27, garagepl#28, prefarea#29, (price#18 cast(lotsize#19 as double)) AS total_price#43]\n+- Relation [_c0#17,price#18,lotsize#19,bedrooms#20,bathrms#21,stories#22,driveway#23,recroom#24,fullbase#25,gashw#26,airco#27,garagepl#28,prefarea#29] csv\n","children":["LogicalRelation@1693633821"]},{"id":"LogicalRelation@1693633821","desc":"Relation [_c0#17,price#18,lotsize#19,bedrooms#20,bathrms#21,stories#22,driveway#23,recroom#24,fullbase#25,gashw#26,airco#27,garagepl#28,prefarea#29] csv\n","children":[]}]}},"spark_version":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.9.1/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","spark-version":"3.3.1","openlineage-spark-version":"1.9.1"},"processing_engine":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.9.1/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-0/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet","version":"3.3.1","name":"spark","openlineageAdapterVersion":"1.9.1"},"environment-properties":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.9.1/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","environment-properties":{}}}},"job":{"namespace":"spark_integration","name":"dataset_example.execute_insert_into_hadoop_fs_relation_command.Downloads_new","facets":{"jobType":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.9.1/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json#/$defs/JobTypeJobFacet","processingType":"BATCH","integration":"SPARK","jobType":"JOB"}}},"inputs":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","facets":{"dataSource":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.9.1/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-0/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet","name":"file","uri":"file"},"schema":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.9.1/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet","fields":[{"name":"_c0","type":"integer"},{"name":"price","type":"double"},{"name":"lotsize","type":"integer"},{"name":"bedrooms","type":"integer"},{"name":"bathrms","type":"integer"},{"name":"stories","type":"integer"},{"name":"driveway","type":"string"},{"name":"recroom","type":"string"},{"name":"fullbase","type":"string"},{"name":"gashw","type":"string"},{"name":"airco","type":"string"},{"name":"garagepl","type":"integer"},{"name":"prefarea","type":"string"}]}},"inputFacets":{}}],"outputs":[{"namespace":"file","name":"/Users/tridev/Downloads/new","facets":{"dataSource":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.9.1/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-0/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet","name":"file","uri":"file"},"schema":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.9.1/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet","fields":[{"name":"_c0","type":"integer"},{"name":"price","type":"double"},{"name":"lotsize","type":"integer"},{"name":"bedrooms","type":"integer"},{"name":"bathrms","type":"integer"},{"name":"stories","type":"integer"},{"name":"driveway","type":"string"},{"name":"recroom","type":"string"},{"name":"fullbase","type":"string"},{"name":"gashw","type":"string"},{"name":"airco","type":"string"},{"name":"garagepl","type":"integer"},{"name":"prefarea","type":"string"},{"name":"total_price","type":"double"}]},"columnLineage":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.9.1/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-1/ColumnLineageDatasetFacet.json#/$defs/ColumnLineageDatasetFacet","fields":{"_c0":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"_c0"}]},"price":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"price"}]},"lotsize":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"lotsize"}]},"bedrooms":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"bedrooms"}]},"bathrms":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"bathrms"}]},"stories":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"stories"}]},"driveway":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"driveway"}]},"recroom":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"recroom"}]},"fullbase":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"fullbase"}]},"gashw":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"gashw"}]},"airco":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"airco"}]},"garagepl":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"garagepl"}]},"prefarea":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"prefarea"}]},"total_price":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"price"},{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"lotsize"}]}}},"lifecycleStateChange":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.9.1/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-0/LifecycleStateChangeDatasetFacet.json#/$defs/LifecycleStateChangeDatasetFacet","lifecycleStateChange":"OVERWRITE"}},"outputFacets":{}}]} 2024-04-15 12:54:05,802 [spark-listener-group-shared] DEBUG io.openlineage.client.OpenLineageClient - OpenLineageClient will emit lineage event: {"eventTime":"2024-04-15T07:24:04.374Z","producer":"https://github.com/OpenLineage/OpenLineage/tree/1.9.1/integration/spark","schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent","eventType":"COMPLETE","run":{"runId":"a6c8696b-8073-4d71-bcee-1bde4442842f"},"job":{"namespace":"spark_integration","name":"dataset_example"},"inputs":[],"outputs":[]}

pawel-big-lebowski commented 3 months ago

What version of Marquez are you using? Please mind column lineage support in Marquez has been added in 0.45 (https://github.com/MarquezProject/marquez/blob/main/CHANGELOG.md#0450---2024-03-07) which is relatively new.

TridevNS commented 3 months ago

What version of Marquez are you using? Please mind column lineage support in Marquez has been added in 0.45 (https://github.com/MarquezProject/marquez/blob/main/CHANGELOG.md#0450---2024-03-07) which is relatively new.

Using 0.46 version and still not seeing the column lineage

pawel-big-lebowski commented 3 months ago

They are two possible problems: (1) OpenLineage producer does not produce proper column lineage facet (2) Proper column lineage facet is not shown in Marquez UI

Are you able to find out if the issue is related to (1) or (2)? If it is (2), mind opening Marquez issue.

dolfinus commented 3 months ago

JSON from https://github.com/OpenLineage/OpenLineage/issues/2573#issuecomment-2055967107 contains columnLineage field, so this is not (1).

TridevNS commented 3 months ago

@dolfinus Thanks.

@pawel-big-lebowski will create the issue in Marquez.