astronomer / astronomer-cosmos

Run your dbt Core projects as Apache Airflow DAGs and Task Groups with a few lines of code
https://astronomer.github.io/astronomer-cosmos/
Apache License 2.0
639 stars 161 forks source link

Make `select`, `exclude`, `selector`, and `models` pseudo-parametrizable via skipping and `trigger_rule="none_failed"` #857

Open dwreeves opened 8 months ago

dwreeves commented 8 months ago

Right now it is impossible to pass a Param() into a DbtDag() or DbtTaskGroup() that selects and de-selects models.

The reason why is pretty obvious when you think about it: these params are used to render the tasks in the DbtDag, and rendering does not have access to Jinja templating (only tasks while they are being executed have access).

That said, it could be possible to make accessible some sort of mechanism for raise AirflowSkipException when a task is excluded via a different mechanism than the DAG rendering. In combination with trigger_rule="none_failed", this could provide a mechanism for running subsets of tasks in a DAG via Params, while still having each dbt node correspond with a single Airflow operator.

(The other way to do this, which is by far the easiest way right now and will still have many advantages over this proposal, is to run every selected dbt task inside a single Airflow operator.)

Because of Airflow's internals, the selection of tasks in a DAG cannot be determined via Params, except via dynamic task mapping, which does not support task dependencies. However because skipping and trigger_rule="none_failed" achieves a similar behavior, this is why I refer to this as making the DbtDag "pseudo-parametrizable".

There are some real challenges here, this isn't as simple as just passing some args into the operators, but it's super-duper feasible and possible to imagine this existing as a feature.


Addendum + notes:

dosubot[bot] commented 8 months ago

🤖

Given the limitations of Airflow's internals and the inability to determine the selection of tasks in a DAG via Params, making the select, exclude, selector, and models pseudo-parametrizable in a DbtDag() or DbtTaskGroup() is indeed a challenging task.

One possible workaround could be to use Airflow's Variables or XComs to store and retrieve these parameters dynamically. Here's an example of how you can use Airflow's Variables:

from airflow.models import Variable

# Set the parameters as Airflow Variables
Variable.set("select", "your_select_value")
Variable.set("exclude", "your_exclude_value")
Variable.set("selector", "your_selector_value")
Variable.set("models", "your_models_value")

# Retrieve the parameters from Airflow Variables
select = Variable.get("select")
exclude = Variable.get("exclude")
selector = Variable.get("selector")
models = Variable.get("models")

# Use the parameters in your DbtDag() or DbtTaskGroup()
dag = DbtDag(
    select=select,
    exclude=exclude,
    selector=selector,
    models=models,
    ...
)

However, this approach has some limitations:

As an alternative, you can use Airflow's XComs to pass these parameters between tasks. However, this approach also has its own limitations and complexities, and it may not be suitable for all use cases.

Regarding your idea of using raise AirflowSkipException when a task is excluded via a different mechanism than the DAG rendering, it could be a viable solution. However, it would require careful handling to ensure that the DAG execution logic is not disrupted.

I hope this helps! If you have any further questions or need more clarification, feel free to ask.

Sources

#### About Dosu This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant. If you want to continue the conversation, start your reply with @dosu-bot.