MarquezProject / marquez

Collect, aggregate, and visualize a data ecosystem's metadata
https://marquezproject.ai
Apache License 2.0
1.78k stars 320 forks source link

No Columns or ability to add field tags when using Job Event static lineage #2843

Open davidsharp7 opened 5 months ago

davidsharp7 commented 5 months ago

Given the following static lineage post

curl -X POST http://localhost:8080/api/v1/lineage \
  -i -H 'Content-Type: application/json' \
  -d '{
        "eventTime": "2024-12-28T20:52:00.001+10:00",
        "job": {
          "namespace": "my-namespace",
          "name": "newtestfoobarmeeeepppppppppp"
        },
        "outputs": [{
          "namespace": "my-namespace",
          "name": "pppppspooky",
          "facets": {
            "schema": {
              "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
              "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/spec/OpenLineage.json#/definitions/SchemaDatasetFacet",
              "fields": [
                { "name": "a", "type": "VARCHAR"},
                { "name": "b", "type": "VARCHAR"}
              ]
            }
          }
        }],
        "producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
        "schemaURL": "https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/JobEvent"
      }'

it appears the columns for the datasets won't render in the UI as well as an inability to add field level tags.

Upon investigation it looks like its to do with the current dataset version is not being updated in the OpenLineageDao for the Job Event

    if (event.getInputs() != null) {
      for (Dataset dataset : event.getInputs()) {
        DatasetRecord record = upsertLineageDataset(daos, dataset, now, null, true);
        datasetInputs.add(record);
        insertDatasetFacets(daos, dataset, record, null, null, now);
        insertInputDatasetFacets(daos, dataset, record, null, null, now);
      }
    }

by adding the following the current version is updated in the datasets table

        daos.getDatasetDao()
        .updateVersion(
            record.getDatasetVersionRow().getDatasetUuid(),
            Instant.now(),
            record.getDatasetVersionRow().getUuid());

which resolves the columns being displayed.

There is subsequent step where we would need to propagate the tags which are linked to to the dataset version fields. Looks like we can use the dao

        List<Field> dsvTags = daos.getDatasetFieldDao().findByDatasetVersion(record.getDatasetVersionRow().getUuid());
        daos.getDatasetVersionDao().updateFields(
record.getDatasetVersionRow().getUuid(), daos.getDatasetVersionDao().toPgObjectFields(dsvTags));

@wslulciuc does that sound like a fair way of doing it?