astronomer / astronomer-cosmos

Run your dbt Core projects as Apache Airflow DAGs and Task Groups with a few lines of code
https://astronomer.github.io/astronomer-cosmos/
Apache License 2.0
767 stars 170 forks source link

[Feature] Resolve (some) templated fields during DAG Parsing #1289

Open tatiana opened 3 weeks ago

tatiana commented 3 weeks ago

Description

Allow users to configure DbtDag/DbtTaskGroup selectors via the Airflow UI (https://astronomer.github.io/astronomer-cosmos/configuration/selecting-excluding.html#selecting-excluding) and render the DAG based on this.

If we find a good solution for this, we could potentially extend it to other templated fields.

Use case/motivation

This request has popped up a few times, including by Emanuele in the #airflow-dbt channel on 29 October 2024:

Hello everyone! :sun_with_face: I am currently testing Cosmos on Airflow and I am having serious issues using the select parameter. What I'd like to is to be able, from the Airflow UI, to run the DBT DAG with a specific select param so that only some models are run. Does someone have a working example that they could share? The docs say that the select template field cannot be templated via DbtDag and DbtTaskGroup because both need to select dbt nodes during DAG parsing. (Docs) so I am not really sure how to do this. Thanks a lot for the help!

However, I believe we didn't log this request - yet.

Possible solution

We could try to do something similar to the following, but this will likely mean the DAG processor to make additional calls to the database. Would love thoughts and other ideas.

class TemplatedDAG(DAG):
    template_fields = ['dag_param']  # Specify fields that should be templated

    def __init__(self, *args, dag_param=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.dag_param = dag_param
        self.parse_dag_template_fields()

    def parse_dag_template_fields(self):
        """Render template fields for the DAG itself at parse time."""
        # Use a context for parsing, including DAG-level parameters and Airflow macros
        context = {
            'ds': '{{ ds }}',  # Default Airflow macros for example
            'execution_date': days_ago(1),  # Can be any default date
        }

        # Render each template field defined in `template_fields`
        for field in self.template_fields:
            value = getattr(self, field, None)
            if isinstance(value, str):  # Ensure the field is a string (template) before rendering
                rendered_value = Template(value).render(context)
                setattr(self, field, rendered_value)

Related issues

No response

Are you willing to submit a PR?