MarquezProject / marquez

Collect, aggregate, and visualize a data ecosystem's metadata
https://marquezproject.ai
Apache License 2.0
1.78k stars 320 forks source link

Column data for a dataset doesn't seem to be captured right #2968

Open Doehla opened 2 weeks ago

Doehla commented 2 weeks ago

Observed something in the UI and upon looking at the endpoint response think this highlights something more.

Steps to reproduce:
Observations:

Both the Option[T] and non-Option[T] data types appear to be associated with the same schema definition and columns -- they were just additive -- not sure if this was intentional or not: image

Only 1 reported schema exists: image

In looking at the json payload that captures the table data we retrieve this:

{
  "id": {
    "namespace": "file",
    "name": "/opt/spark/data/sensor_data.json"
  },
  "type": "DB_TABLE",
  "name": "/opt/spark/data/sensor_data.json",
  "physicalName": "/opt/spark/data/sensor_data.json",
  "createdAt": "2024-11-01T19:38:51.440991Z",
  "updatedAt": "2024-11-01T19:39:43.043Z",
  "namespace": "file",
  "sourceName": "/opt/spark/data/sensor_data.json",
  "fields": [
    {
      "name": "id",
      "type": "Option[long]",
      "tags": [],
      "description": null
    },
    {
      "name": "location",
      "type": "Option[struct]",
      "tags": [],
      "description": null
    },
    {
      "name": "readings",
      "type": "Option[struct]",
      "tags": [],
      "description": null
    },
    {
      "name": "sensor_type",
      "type": "Option[string]",
      "tags": [],
      "description": null
    },
    {
      "name": "status",
      "type": "Option[string]",
      "tags": [],
      "description": null
    },
    {
      "name": "timestamp",
      "type": "Option[string]",
      "tags": [],
      "description": null
    },
    {
      "name": "id",
      "type": "long",
      "tags": [],
      "description": null
    },
    {
      "name": "location",
      "type": "struct",
      "tags": [],
      "description": null
    },
    {
      "name": "readings",
      "type": "struct",
      "tags": [],
      "description": null
    },
    {
      "name": "sensor_type",
      "type": "string",
      "tags": [],
      "description": null
    },
    {
      "name": "status",
      "type": "string",
      "tags": [],
      "description": null
    },
    {
      "name": "timestamp",
      "type": "string",
      "tags": [],
      "description": null
    }
  ],
  "tags": [],
  "lastModifiedAt": null,
  "lastLifecycleState": "CREATE",
  "description": null,
  "currentVersion": "6ef09594-6305-4a39-88f5-e6f48fff5f2c",
  "columnLineage": null,
  "facets": {
    "schema": {
      "fields": [
        {
          "name": "id",
          "type": "long"
        },
        {
          "name": "location",
          "type": "struct",
          "fields": [
            {
              "name": "altitude",
              "type": "double"
            },
            {
              "name": "latitude",
              "type": "double"
            },
            {
              "name": "longitude",
              "type": "double"
            }
          ]
        },
        {
          "name": "readings",
          "type": "struct",
          "fields": [
            {
              "name": "humidity",
              "type": "double"
            },
            {
              "name": "other_data",
              "type": "struct",
              "fields": [
                {
                  "name": "current",
                  "type": "double"
                },
                {
                  "name": "signal_strength",
                  "type": "long"
                },
                {
                  "name": "voltage",
                  "type": "double"
                }
              ]
            },
            {
              "name": "pressure",
              "type": "double"
            },
            {
              "name": "temperature",
              "type": "double"
            }
          ]
        },
        {
          "name": "sensor_type",
          "type": "string"
        },
        {
          "name": "status",
          "type": "string"
        },
        {
          "name": "timestamp",
          "type": "string"
        }
      ],
      "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark",
      "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet"
    },
    "dataSource": {
      "uri": "file",
      "name": "file",
      "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/integration/spark",
      "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet"
    },
    "lifecycleStateChange": {
      "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/client/python",
      "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/LifecycleStateChangeDatasetFacet.json#/$defs/LifecycleStateChangeDatasetFacet",
      "lifecycleStateChange": "CREATE"
    },
    "file": {
      "hash": "150cd27894500bc7b2ec5dae2a664512",
      "Creation Date": "2024-10-23T22:23:48.447890",
      "Modified Date": "2024-09-30T20:48:42.520602",
      "File Size (Mb)": 0.0795145034790039,
      "Last Access Date": "2024-10-25T17:14:03.251640"
    },
    "ownership": {
      "owners": [
        {
          "name": "XXX"
        }
      ],
      "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.23.0/client/python",
      "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/OwnershipDatasetFacet.json#/$defs/OwnershipDatasetFacet"
    }
  },
  "deleted": false
}

Here, the schema facet was updated from the first supplied value to the new value (while keeping the other facets that were not sent automatically from spark intact -- which is great so we can keep those). The overwriting of the schema I can understand as that was provided from the spark job too. Would be great if the nullable flag was captured in the data types from the event emitted from OpenLineage -- the consideration for where the Option[T] types came from for the first call. However, the fields data has been added together, even with the same column names being used. Interestingly though the graph visual doesn't have these repeated.

What I would expect:

As the schema is being overwritten would think the fields would be updated too rather than added together. With the data types changing I could see this being captured in some versioned change. Perhaps though there might be some desire to have some additive data capture present within the code to merge schemas together in case multiple events are omitted that both contain partial schema definitions?

Think that the correct path forward with where things stand presently though is that we need to ignore the nullable flag on the data and not make Option[T] data types to represent this. Would be nice to have the possibility of missing data though captured in the schema.

boring-cyborg[bot] commented 2 weeks ago

Thanks for opening your first issue in the Marquez project! Please be sure to follow the issue template!