MarquezProject / marquez

Collect, aggregate, and visualize a data ecosystem's metadata
https://marquezproject.ai
Apache License 2.0
1.7k stars 295 forks source link

Spark Integration - Schema for text file and column level lineage is not captured #2800

Open TridevNS opened 3 months ago

TridevNS commented 3 months ago

Reference: https://github.com/OpenLineage/OpenLineage/issues/2573

val spark = SparkSession
  .builder()
  .appName("dataset_example").master("local[*]")
  .config("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener")
  .config("spark.openlineage.transport.type", "http")
  .config("spark.openlineage.transport.url", "http://localhost:5000")
  .config("spark.openlineage.namespace", "spark_integration")
  .config("spark.openlineage.facets.disabled", "[spark.logicalPlan]")
  .config("spark.openlineage.debugFacet", "enabled")
  .getOrCreate()

      val inputData = spark.read.option("header", "true").option("inferSchema", "true").csv("/Users/tridev/Downloads/housing.csv")

val tempDF = inputData.withColumn("total_price", col("price")*col("lotsize"))

tempDF.write.format("parquet").mode("overwrite").save("/Users/tridev/Downloads/new")

In lineage: no column lineage info like(https://openlineage.io/docs/spec/facets/dataset-facets/column_lineage_facet)

Screenshot 2024-04-15 at 12 55 45 PM Spark version: 3.3.1

In logs I can see the column level lineage:

[spark-listener-group-shared] DEBUG io.openlineage.spark.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager - Connection released: [id: 0][route: {}->http://localhost:5000][total available: 1; route allocated: 1 of 2; total allocated: 1 of 20] 2024-04-15 12:54:05,801 [spark-listener-group-shared] DEBUG io.openlineage.spark.agent.EventEmitter - Emitting lineage completed successfully: {"eventTime":"2024-04-15T07:24:04.365Z","producer":"https://github.com/OpenLineage/OpenLineage/tree/1.9.1/integration/spark","schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent","eventType":"COMPLETE","run":{"runId":"068416bd-5416-48bf-89df-46f36449eac0","facets":{"parent":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.9.1/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-0/ParentRunFacet.json#/$defs/ParentRunFacet","run":{"runId":"a6c8696b-8073-4d71-bcee-1bde4442842f"},"job":{"namespace":"spark_integration","name":"dataset_example"}},"debug":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.9.1/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","classpath":{"openLineageVersion":"1.9.1","sparkVersion":"3.3.1","scalaVersion":"2.12.15","jars":[],"classDetails":[{"className":"org.apache.spark.sql.delta.catalog.DeltaCatalog","packageVersion":"2.1.0","onClasspath":true},{"className":"org.apache.iceberg.catalog.Catalog","onClasspath":false},{"className":"com.google.cloud.spark.bigquery.BigQueryRelation","packageVersion":"0.27.0","onClasspath":true}]},"system":{"sparkDeployMode":"client","javaVersion":"1.8.0_181","javaVendor":"Oracle Corporation","osArch":"x86_64","osName":"Mac OS X","osVersion":"10.16","userLanguage":"en","userTimezone":"Asia/Kolkata"},"config":{"extraListeners":"io.openlineage.spark.agent.OpenLineageSparkListener","openLineageConfig":{"debugFacet":"enabled","facets.disabled":"[spark.logicalPlan]","namespace":"spark_integration","transport.type":"http","transport.url":"[http://localhost:5000"},"catalogClass":"org.apache.spark.sql.internal.CatalogImpl"},"logicalPlan":{"nodes":[{"id":"InsertIntoHadoopFsRelationCommand@1508650397","desc":"InsertIntoHadoopFsRelationCommand](http://localhost:5000%22%7D,%22catalogClass%22%3A%22org.apache.spark.sql.internal.CatalogImpl%22%7D,%22logicalPlan%22%3A%7B%22nodes%22%3A%5B%7B%22id%22%3A%22InsertIntoHadoopFsRelationCommand@1508650397","desc":%22InsertIntoHadoopFsRelationCommand/) file:/Users/tridev/Downloads/new, false, Parquet, [path=/Users/tridev/Downloads/new], Overwrite, [_c0, price, lotsize, bedrooms, bathrms, stories, driveway, recroom, fullbase, gashw, airco, garagepl, prefarea, total_price]\n+- Project [_c0#17, price#18, lotsize#19, bedrooms#20, bathrms#21, stories#22, driveway#23, recroom#24, fullbase#25, gashw#26, airco#27, garagepl#28, prefarea#29, (price#18 cast(lotsize#19 as double)) AS total_price#43]\n +- Relation [_c0#17,price#18,lotsize#19,bedrooms#20,bathrms#21,stories#22,driveway#23,recroom#24,fullbase#25,gashw#26,airco#27,garagepl#28,prefarea#29] csv\n","children":["Project@-915422472"]},{"id":"Project@-915422472","desc":"Project [_c0#17, price#18, lotsize#19, bedrooms#20, bathrms#21, stories#22, driveway#23, recroom#24, fullbase#25, gashw#26, airco#27, garagepl#28, prefarea#29, (price#18 cast(lotsize#19 as double)) AS total_price#43]\n+- Relation [_c0#17,price#18,lotsize#19,bedrooms#20,bathrms#21,stories#22,driveway#23,recroom#24,fullbase#25,gashw#26,airco#27,garagepl#28,prefarea#29] csv\n","children":["LogicalRelation@1693633821"]},{"id":"LogicalRelation@1693633821","desc":"Relation [_c0#17,price#18,lotsize#19,bedrooms#20,bathrms#21,stories#22,driveway#23,recroom#24,fullbase#25,gashw#26,airco#27,garagepl#28,prefarea#29] csv\n","children":[]}]}},"spark_version":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.9.1/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","spark-version":"3.3.1","openlineage-spark-version":"1.9.1"},"processing_engine":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.9.1/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-0/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet","version":"3.3.1","name":"spark","openlineageAdapterVersion":"1.9.1"},"environment-properties":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.9.1/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","environment-properties":{}}}},"job":{"namespace":"spark_integration","name":"dataset_example.execute_insert_into_hadoop_fs_relation_command.Downloads_new","facets":{"jobType":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.9.1/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json#/$defs/JobTypeJobFacet","processingType":"BATCH","integration":"SPARK","jobType":"JOB"}}},"inputs":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","facets":{"dataSource":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.9.1/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-0/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet","name":"file","uri":"file"},"schema":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.9.1/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet","fields":[{"name":"_c0","type":"integer"},{"name":"price","type":"double"},{"name":"lotsize","type":"integer"},{"name":"bedrooms","type":"integer"},{"name":"bathrms","type":"integer"},{"name":"stories","type":"integer"},{"name":"driveway","type":"string"},{"name":"recroom","type":"string"},{"name":"fullbase","type":"string"},{"name":"gashw","type":"string"},{"name":"airco","type":"string"},{"name":"garagepl","type":"integer"},{"name":"prefarea","type":"string"}]}},"inputFacets":{}}],"outputs":[{"namespace":"file","name":"/Users/tridev/Downloads/new","facets":{"dataSource":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.9.1/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-0/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet","name":"file","uri":"file"},"schema":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.9.1/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet","fields":[{"name":"_c0","type":"integer"},{"name":"price","type":"double"},{"name":"lotsize","type":"integer"},{"name":"bedrooms","type":"integer"},{"name":"bathrms","type":"integer"},{"name":"stories","type":"integer"},{"name":"driveway","type":"string"},{"name":"recroom","type":"string"},{"name":"fullbase","type":"string"},{"name":"gashw","type":"string"},{"name":"airco","type":"string"},{"name":"garagepl","type":"integer"},{"name":"prefarea","type":"string"},{"name":"total_price","type":"double"}]},"columnLineage":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.9.1/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-1/ColumnLineageDatasetFacet.json#/$defs/ColumnLineageDatasetFacet","fields":{"_c0":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"_c0"}]},"price":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"price"}]},"lotsize":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"lotsize"}]},"bedrooms":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"bedrooms"}]},"bathrms":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"bathrms"}]},"stories":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"stories"}]},"driveway":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"driveway"}]},"recroom":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"recroom"}]},"fullbase":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"fullbase"}]},"gashw":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"gashw"}]},"airco":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"airco"}]},"garagepl":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"garagepl"}]},"prefarea":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"prefarea"}]},"total_price":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"price"},{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"lotsize"}]}}},"lifecycleStateChange":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.9.1/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-0/LifecycleStateChangeDatasetFacet.json#/$defs/LifecycleStateChangeDatasetFacet","lifecycleStateChange":"OVERWRITE"}},"outputFacets":{}}]} 2024-04-15 12:54:05,802 [spark-listener-group-shared] DEBUG io.openlineage.client.OpenLineageClient - OpenLineageClient will emit lineage event: {"eventTime":"2024-04-15T07:24:04.374Z","producer":"https://github.com/OpenLineage/OpenLineage/tree/1.9.1/integration/spark","schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent","eventType":"COMPLETE","run":{"runId":"a6c8696b-8073-4d71-bcee-1bde4442842f"},"job":{"namespace":"spark_integration","name":"dataset_example"},"inputs":[],"outputs":[]}

pawel-big-lebowski commented 1 month ago

The column level lineage facet is present within the event:

[{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.9.1/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-1/ColumnLineageDatasetFacet.json#/$defs/ColumnLineageDatasetFacet","fields":{"_c0":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"_c0"}]},"price":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"price"}]},"lotsize":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"lotsize"}]},"bedrooms":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"bedrooms"}]},"bathrms":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"bathrms"}]},"stories":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"stories"}]},"driveway":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"driveway"}]},"recroom":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"recroom"}]},"fullbase":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"fullbase"}]},"gashw":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"gashw"}]},"airco":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"airco"}]},"garagepl":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"garagepl"}]},"prefarea":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"prefarea"}]},"total_price":{"inputFields":[{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"price"},{"namespace":"file","name":"/Users/tridev/Downloads/housing.csv","field":"lotsize"}]}}},

and the schema is present in the events too.

Please verify if you can see in the events in Marquez lineage events tab to make sure they were successfully sent to Marquez. Then you can investigate with manually created events (like having a single column instead of 20) to find out what makes your scenario different from the one that is working properly. Of course, make sure you're using latest Marquez version.

TridevNS commented 6 days ago

producer

@pawel-big-lebowski From API how to pull the particular run event info?

We are trying to Use the metadata from a different system.