astronomer / astronomer-cosmos

Run your dbt Core projects as Apache Airflow DAGs and Task Groups with a few lines of code
https://astronomer.github.io/astronomer-cosmos/
Apache License 2.0
546 stars 137 forks source link

Airflow orphaning unreferenced Dataset URIs in Cosmos 1.1 #522

Open tatiana opened 11 months ago

tatiana commented 11 months ago

Context

While testing Cosmos 1.1, that implemented #485, I realized that the Dataset URIs were being deleted from the Airflow UI after a few seconds they had been added, and the scheduler would print messages like:

 scheduler | [2023-09-08T10:18:34.252+0100] {scheduler_job_runner.py:1742} INFO - Orphaning unreferenced dataset 'postgres://0.0.0.0:5432/postgres.public.stg_customers'
 scheduler | [2023-09-08T10:18:34.252+0100] {scheduler_job_runner.py:1742} INFO - Orphaning unreferenced dataset 'postgres://0.0.0.0:5432/postgres.public.stg_payments'
 scheduler | [2023-09-08T10:18:34.252+0100] {scheduler_job_runner.py:1742} INFO - Orphaning unreferenced dataset 'postgres://0.0.0.0:5432/postgres.public.stg_orders'
 scheduler | [2023-09-08T10:18:34.252+0100] {scheduler_job_runner.py:1742} INFO - Orphaning unreferenced dataset 'postgres://0.0.0.0:5432/postgres.public.customers'

I had a conversation with @ashb, one of the original developers of the Dataset feature in Airflow, and he mentioned that Airflow, by design, didn't allow setting Dataset URIs during task execution and that they are being overridden every time the scheduler parses the DAGs.

Ash agreed that it makes sense for Airflow to allow libraries to set Dataset URIs. In the past, there was the concern people would be creating Dataset URIs specific to timestamps, which could lead to other issues in Airflow.

I've logged a ticket in Airflow to support setting Datasets during DAG task executions: https://github.com/apache/airflow/issues/34206

But even if this were part of Airflow 2.8, we need to find a strategy that works for users using Cosmos with older versions of Airflow.

Recommendation

Set inlets and outlets Dataset URIs in Cosmos during DAG parsing time when users use LoadMethod.DBT_LS or LoadMethod.DBT_MANIFEST. We should still use the open lineage Processor class with the dbt Manifest, but we may need to change the code so it allows generating events when the run_results.json file is not available (to be checked).

Nodes would have two new properties: inlets and outlets, with the correct Dataset URIs.

tatiana commented 10 months ago

Follow up with the community in #airflow-dbt: https://apache-airflow.slack.com/archives/C059CC42E9W/p1696339581151849 https://apache-airflow.slack.com/archives/C059CC42E9W/p1696364828572859 https://apache-airflow.slack.com/archives/C059CC42E9W/p1696498945508569

tatiana commented 9 months ago

The work was started (and is unstable) in the branch set-inlets-during-runtime. Requires further work.

dosubot[bot] commented 4 months ago

Hi, @tatiana,

I'm helping the Cosmos team manage their backlog and am marking this issue as stale. From what I understand, the issue involves Dataset URIs being deleted from the Airflow UI shortly after being added, with the scheduler logging messages about orphaning unreferenced datasets. The recommendation is to set inlets and outlets Dataset URIs in Cosmos during DAG parsing time when users use LoadMethod.DBT_LS or LoadMethod.DBT_MANIFEST, and to modify the code to allow generating events when the run_results.json file is not available. I see that you've followed up with the community in #airflow-dbt and started work on the issue in the branch set-inlets-during-runtime, but it requires further work.

Could you please confirm if this issue is still relevant to the latest version of the Cosmos repository? If it is, please let the Cosmos team know by commenting on the issue. Otherwise, feel free to close the issue yourself or the issue will be automatically closed in 7 days.

Thank you for your understanding and cooperation. If you have any further questions or need assistance, feel free to reach out.

tatiana commented 2 months ago

Origin: #305 Airflow related issue: https://github.com/apache/airflow/issues/34206

tatiana commented 1 month ago

After a few discussions with Airflow core developers over the past months, including with @uranusjr, he came up with the proposal for Dataset Aliases, would would allow us to set Datasets at task execution.

@Lee-W is working on this: https://github.com/apache/airflow/pull/40478

This feature will likely be available in Airflow 2.10 - and we will be able to leverage this to overcome the current issues.