I am using the Astronomer-Cosmos integration to run dbt models in my Airflow setup. Recently, we decided to integrate with DataHub, and I'm currently working on implementing this.
The issue I'm facing is that lineage metadata is not being exported to DataHub. According to the documentation, the v2 plugin I'm using relies on the OpenLineage extraction, which should work with the Cosmos integration.
From the Cosmos code, I've confirmed that it uses OpenLineage's processor to extract information about inlets and outlets and emit them at the end of the execute() method. You can also see that It also calculates OpenLineage events and makes them available for extractors via get_openlineage_facets_on_complete method.
In debugger mode, I see that all this metadata is available for the DataHub listener when on_task_instance_finished is triggered. However, the problem is that it attempts to take it from the task created earlier when on_task_instance_running was triggered. This creates a copy of the task and puts it in the TaskHolder, which is never updated during runtime. As a result, the DefaultExtractor is associated with the "old" task before execution, and neither the direct operator's inlets/outlets nor OpenLineage events are available to it. Consequently, the listener at the end of the task execution exports empty results to the server.
Any ideas on what might be wrong? Is this a bug, or have I misconfigured something?
Hi team!
I am using the Astronomer-Cosmos integration to run dbt models in my Airflow setup. Recently, we decided to integrate with DataHub, and I'm currently working on implementing this.
The issue I'm facing is that lineage metadata is not being exported to DataHub. According to the documentation, the v2 plugin I'm using relies on the OpenLineage extraction, which should work with the Cosmos integration. From the Cosmos code, I've confirmed that it uses OpenLineage's processor to extract information about inlets and outlets and emit them at the end of the
execute()
method. You can also see that It also calculates OpenLineage events and makes them available for extractors viaget_openlineage_facets_on_complete
method. In debugger mode, I see that all this metadata is available for the DataHub listener whenon_task_instance_finished
is triggered. However, the problem is that it attempts to take it from the task created earlier whenon_task_instance_running
was triggered. This creates a copy of the task and puts it in the TaskHolder, which is never updated during runtime. As a result, the DefaultExtractor is associated with the "old" task before execution, and neither the direct operator's inlets/outlets nor OpenLineage events are available to it. Consequently, the listener at the end of the task execution exports empty results to the server.Any ideas on what might be wrong? Is this a bug, or have I misconfigured something?
Airflow: 2.6.3 Cosmos: 1.3.1 acryl-datahub-airflow-plugin[plugin-v2]: 0.13.3.3 Datahub: v0.13.3rc1
Slack Message