MarquezProject / marquez

Collect, aggregate, and visualize a data ecosystem's metadata
https://marquezproject.ai
Apache License 2.0
1.74k stars 314 forks source link

Marquez does not take into account of input dataset version facet when creating lineage (it always use latest version of input datasets) #2733

Open williamsia opened 8 months ago

williamsia commented 8 months ago

This is the step to replicate the issue reported on this slack discussion.

TLDR; When emitting a Lineage Event for a Run that has older version (using the version facet) of input datasets, Marquez always use the latest version of the input datasets and it modifies the version facet of the input dataset to the previous version.

Step to replicate can be found below

Creating datasetA version 1

Payload is below

1.create_run1_datasetA.json

 "outputs": [
        {
            "namespace": "usa.california.sanjose",
            "name": "datasetA",
            "facets": {
                "version": {
                    "_producer": "https://some.producer.com/version/1.0",
                    "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DatasetVersionDatasetFacet.json",
                    "datasetVersion": "1"
                }
            },
            "ownership": {
                "_producer": "https://some.producer.com/version/1.0",
                "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/OwnershipJobFacet.json",
                "owners": [
                    {
                        "name": "/usa/california/sanjose",
                        "type": "GROUP"
                    }
                ]
            }
        }

Creating datasetB version 1

2.create_run1_datasetB.json

Notice that I want to explicitly use version 1 of datasetA

  "inputs": [
        {
            "namespace": "usa.california.sanjose",
            "name": "datasetA",
            "facets": {
                "version": {
                    "_producer": "https://some.producer.com/version/1.0",
                    "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DatasetVersionDatasetFacet.json",
                    "datasetVersion": "1"
                }
            }
        }
    ],

Updating datasetA to version 2

3.create_run2_datasetA.json

    "outputs": [
        {
            "namespace": "usa.california.sanjose",
            "name": "datasetA",
            "facets": {
                "version": {
                    "_producer": "https://some.producer.com/version/1.0",
                    "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DatasetVersionDatasetFacet.json",
                    "datasetVersion": "2"
                }
            },
            "ownership": {
                "_producer": "https://some.producer.com/version/1.0",
                "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/OwnershipJobFacet.json",
                "owners": [
                    {
                        "name": "/usa/california/sanjose",
                        "type": "GROUP"
                    }
                ]
            }
        }
    ],

Updating datasetB to version 2 but still using version 1 of dataset A

4.create_run2_datasetB.json

Querying datasetA versions

URL: http://localhost:3000/api/v1/namespaces/usa.california.sanjose/datasets/datasetA/versions

I removed some of the variables of the response payload so not to distract on the important bit. Notice how datasetA version 2 is now modified to version 1.

{
  "versions": [
    {
      "createdAt": "2024-01-11T22:18:00.001Z",
      "version": "12805e64-6a48-3bba-80c4-9ce513fdb069",
      "namespace": "usa.california.sanjose",
      "createdByRun": {
        "inputDatasetVersions": [
          {
            "datasetVersionId": {
              "namespace": "glue://glue.ap-southeast-2.amazonaws.com",
              "name": "ghg_emission_factor.egrid_data",
              "version": "3a7c4759-785b-3e05-a684-17ba703690ff"
            },
            "facets": {}
          }
        ],
        "outputDatasetVersions": [
          {
            "datasetVersionId": {
              "namespace": "usa.california.sanjose",
              "name": "datasetA",
              "version": "12805e64-6a48-3bba-80c4-9ce513fdb069"
            },
            "facets": {}
          }
        ],
        "facets": {}
      },
      "facets": {
        "version": {
          "_producer": "https://some.producer.com/version/1.0",
          "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DatasetVersionDatasetFacet.json",
          "datasetVersion": "1"
        }
      }
    },
    {
      "createdAt": "2024-01-11T01:18:00.001Z",
      "version": "0eda9f90-f79e-3686-8747-efaa09d1a404",
      "createdByRun": {
        "inputDatasetVersions": [
          {
            "datasetVersionId": {
              "namespace": "glue://glue.ap-southeast-2.amazonaws.com",
              "name": "ghg_emission_factor.egrid_dataV2",
              "version": "6340cce2-867b-3d53-96d5-a35de41fd9a1"
            },
            "facets": {}
          }
        ],
        "outputDatasetVersions": [
          {
            "datasetVersionId": {
              "namespace": "usa.california.sanjose",
              "name": "datasetA",
              "version": "0eda9f90-f79e-3686-8747-efaa09d1a404"
            },
            "facets": {}
          }
        ],
        "facets": {}
      },
      "facets": {
        "version": {
          "_producer": "https://some.producer.com/version/1.0",
          "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DatasetVersionDatasetFacet.json",
          "datasetVersion": "1"
        }
      }
    }
  ]
}

Querying the datasetB versions

Notice how the version of the input dataset of 2nd Run points to the latest version of datasetA even though we explicitly specify version 1 when emitting the Lineage Event.

{
  "versions": [
    {
      "createdAt": "2024-01-12T23:18:00.001Z",
      "version": "0c70a3fc-c242-3e6f-96fd-dc711fbea05f",
      "createdByRun": {
        "inputDatasetVersions": [
          {
            "datasetVersionId": {
              "namespace": "usa.california.sanjose",
              "name": "datasetA",
              "version": "0eda9f90-f79e-3686-8747-efaa09d1a404"
            },
            "facets": {}
          }
        ],
        "outputDatasetVersions": [
          {
            "datasetVersionId": {
              "namespace": "usa.california.sanjose",
              "name": "datasetB",
              "version": "0c70a3fc-c242-3e6f-96fd-dc711fbea05f"
            },
            "facets": {}
          }
        ],
        "facets": {}
      },
      "facets": {
        "version": {
          "_producer": "https://some.producer.com/version/1.0",
          "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DatasetVersionDatasetFacet.json",
          "datasetVersion": "2"
        }
      }
    },
    {
      "createdAt": "2024-01-11T23:18:00.001Z",
      "version": "491ac309-ed7d-3c3e-a11d-51fab7ce84f6",
      "createdByRun": {
        "inputDatasetVersions": [
          {
            "datasetVersionId": {
              "namespace": "usa.california.sanjose",
              "name": "datasetA",
              "version": "12805e64-6a48-3bba-80c4-9ce513fdb069"
            },
            "facets": {}
          }
        ],
        "outputDatasetVersions": [
          {
            "datasetVersionId": {
              "namespace": "usa.california.sanjose",
              "name": "datasetB",
              "version": "491ac309-ed7d-3c3e-a11d-51fab7ce84f6"
            },
            "facets": {}
          }
        ],
        "facets": {}
      },
      "facets": {
        "version": {
          "_producer": "https://some.producer.com/version/1.0",
          "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DatasetVersionDatasetFacet.json",
          "datasetVersion": "1"
        }
      }
    }
  ]
}
boring-cyborg[bot] commented 8 months ago

Thanks for opening your first issue in the Marquez project! Please be sure to follow the issue template!

dolfinus commented 6 months ago

Probably related: #2764