datahub-project / datahub

The Metadata Platform for your Data Stack
https://datahubproject.io
Apache License 2.0
9.63k stars 2.85k forks source link

Failed to create a lineage between dataset and non-dataset using Python SDK #10856

Closed nephtyws closed 2 months ago

nephtyws commented 2 months ago

Describe the bug When I try to create a lineage between Airflow Task (urn:li:dataJob:(urn:li:dataFlow:()) and any dataset (e.g. urn:li:dataset:(urn:li:dataPlatform:mysql,database,table,prod) code gives error as below.

datahub.configuration.common.OperationalError: ('Unable to emit metadata to DataHub GMS: INTERNAL SERVER ERROR', {'exceptionClass': 'com.linkedin.restli.server.RestLiServiceException', 'message': 'INTERNAL SERVER ERROR', 'status': 500})

I can verify it can be directly created when I try on console, since console using GraphQL call to create lineage, as below. Note that I changed original DAG and task name to dummy.

{"operationName":"updateLineage","variables":{"input":{"edgesToAdd":[{"upstreamUrn":"urn:li:dataset:(urn:li:dataPlatform:mysql,database.table,prod)","downstreamUrn":"urn:li:dataJob:(urn:li:dataFlow:(airflow,dag,prod),task)"}],"edgesToRemove":[]}},"query":"mutation updateLineage($input: UpdateLineageInput!) {\n  updateLineage(input: $input)\n}\n"}

GMS logs said:

Caused by: com.linkedin.data.template.TemplateOutputCastException: Invalid URN syntax: Urn entity type should be 'dataset'.: urn:li:dataJob:(urn:li:dataFlow:(airflow,dag,prod),task)
Caused by: java.net.URISyntaxException: Urn entity type should be 'dataset'.: urn:li:dataJob:(urn:li:dataFlow:(airflow,dag,prod),task)

Is it intended or bug, or what is wrong with me? I don't see any clue on API document.

To Reproduce Steps to reproduce the behavior:

flow_urn = make_data_flow_urn("airflow", "TEST")
job_id = f"TEST"
target_airflow_task = make_data_job_urn_with_flow(flow_urn, job_id)
upstream_entity_urns = [make_dataset_urn("mysql", "TEST")]

emitter.emit_mce(make_lineage_mce(upstream_entity_urns, target_airflow_task)) # Boom!

Expected behavior Lineage create should be done between Airflow Task and any dataset using Python SDK, without error.

Additional context

nephtyws commented 2 months ago

I found below comment in make_lineage_mce:

Note: this function only supports lineage for dataset aspects. It will not update lineage for any other aspect types.

Is it intended or needs improvement to support other aspects?

hsheth2 commented 2 months ago

For datajobs, lineage is always assigned to the datajob, which points at the upstream/downstream datasets. We have some code samples listed here https://datahubproject.io/docs/api/tutorials/lineage/#add-lineage-to-non-dataset-entities

It will still show up in the UI as you'd expect.

nephtyws commented 2 months ago

Aha. Thanks for your answer, @hsheth2 !