astronomer / astronomer-cosmos

Run your dbt Core projects as Apache Airflow DAGs and Task Groups with a few lines of code
https://astronomer.github.io/astronomer-cosmos/
Apache License 2.0
767 stars 170 forks source link

[Feature] Automate the dependency resolution between `DbtDag`s that represent sub-parts of a dbt project #1321

Open tatiana opened 1 week ago

tatiana commented 1 week ago

Description

Automatically create cross-DAG scheduling dependencies when the user splits a dbt project into multiple DbtDag instances.

Use case

Cosmos 1.7 allows users to map a dbt project into an Airflow DAG or TaskGroup. It also allows users to easily convert a subset of dbt nodes into an Airflow DAG or Task Group using selectors. In both cases, it maps 1:1 dbt resources, such as models, into Airflow tasks and resolves their dependencies within the same Task Group or DAG.

However, the use case that Cosmos does not cover yet is to resolve the dependency of a dbt project split into multiple Airflow DAGs, affecting how they are triggered/scheduled. This feature aims to handle this use case by creating cross-DAG dependency on behalf of the user.

This feature assumes the DAGs are running within the same Airflow installation (deployment, in the case of Astro).

By resolving dependencies cross-DAGs automatically, it would also be great if we also allow users to easily:

While delegating the responsibility to define the dbt models dependency between those DAGs to Cosmos.

Current approach

Users can currently handle cross-DAG dependency manually with Cosmos 1.7. One approach is to leverage the Airflow Datasets generated by Cosmos and Airflow's Data-Aware scheduling.

We've included below the use-case with Airflow Datasets using the Jaffle shop:

screenshot

Let's say a team is responsible for maintaining the upstream seeds (raw_customers, raw_orders, raw_payments), and another team is responsible for maintaining the downstream transformations (the other downstream nodes). Seeds are expected to be processed hourly, and once they are ready, the transformations are supposed to be triggered. The organisation wants to have separate DAGs per team for accountability.

This use-case could be accomplished by the following DAGs, adapted from the Cosmos Demo project:

(1) Upstream seeds DAG

This DAG only executes the project's dbt seeds:

from datetime import datetime

from cosmos import DbtDag, ProjectConfig, RenderConfig

from include.profiles import airflow_db
from include.constants import jaffle_shop_path, venv_execution_config

only_seeds = DbtDag(
    project_config=ProjectConfig(jaffle_shop_path),
    profile_config=airflow_db,
    execution_config=venv_execution_config,

    # only select seed nodes
    render_config=RenderConfig(
        select=["path:seeds"],
    ),

    # Schedule on an hourly basis
    schedulel="@hourly",
    start_date=datetime(2023, 1, 1),
    catchup=False,
    dag_id="only_seeds",
    tags=["filtering"],
)

As of Cosmos 1.7, this first DAG emits three Airflow Datasets once the respective tasks are executed, depending on the Airflw connection (or dbt profile) and dbt models configuration:

(2) Downstream transformations DAG

This second DAG will only be triggered once all the following datasets are updated:

from datetime import datetime

from cosmos import DbtDag, ProjectConfig, RenderConfig

from include.profiles import airflow_db
from include.constants import jaffle_shop_path, venv_execution_config

only_models = DbtDag(
    project_config=ProjectConfig(jaffle_shop_path),
    profile_config=airflow_db,
    execution_config=venv_execution_config,

    # only select model nodes
    render_config=RenderConfig(
        select=["path:models"],
    ),

    #  this DAG is scheduled to run after all Datasets in the list have received at least one update:
    schedule=[
        Dataset("postgres://0.0.0.0:5432/postgres.public.raw_customers"),
        Dataset("postgres://0.0.0.0:5432/postgres.public.raw_orders"),
        Dataset("postgres://0.0.0.0:5432/postgres.public.raw_payments")
    ),
    start_date=datetime(2023, 1, 1),
    catchup=False,
    dag_id="only_models",
    tags=["filtering"],
)

An alternative schedule for the downstream DAG could be to be run if any of the upstream Datasets was updated using a conditional dataset expression:

    schedule=(
        Dataset("postgres://0.0.0.0:5432/postgres.public.raw_customers")) | \
        Dataset("postgres://0.0.0.0:5432/postgres.public.raw_orders") |  \
        Dataset("postgres://0.0.0.0:5432/postgres.public.raw_payments")
    ),

Or a combined time-based schedule with dataset expression (Airflow 2.9+), for triggering the downstream DAG either every day during midnight or when the dataset condition is met:

    schedule=DatasetOrTimeSchedule(
        timetable=CronTriggerTimetable("0 0 * * *", timezone="UTC"),
        datasets=(
            Dataset("postgres://0.0.0.0:5432/postgres.public.raw_customers")) | \
            Dataset("postgres://0.0.0.0:5432/postgres.public.raw_orders") |  \
            Dataset("postgres://0.0.0.0:5432/postgres.public.raw_payments")
    ),

Challenges with current approach

While users can accomplish what they need with the current state, it is fragile. Users are expected to understand the syntax of Cosmos-generated datasets URIs. If the DbtDag RenderConfig.select clause changes, the schedules will likely have to change. If the dbt project changes, the schedule must be updated. It can be painful for end-users to manage this.

Additionally, as of Cosmos 1.7, the datasets can be specific to each environment, since they rely on properties of the Airflow connection or dbt profile.

Proposed solution

Allow users to use Cosmos to inject Dataset dependencies automatically. This has to be opt-in in Cosmos 1.x, so it is backwards-compatible.

A possible syntax could be:

    render_config=RenderConfig(
        select=["path:models"],
        auto_schedule=DbtUpstreamUpdated.AND, # only triggers the DAG when all upstream dbt models are updated
    ),

Where DbtUpstreamUpdated is a Python enum.

With this, the downstream DAG previously illustrated in (2) could be represented by:

only_models = DbtDag(
    project_config=ProjectConfig(jaffle_shop_path),
    profile_config=airflow_db,
    execution_config=venv_execution_config,

    render_config=RenderConfig(
        select=["path:models"],
        auto_schedule=DbtUpstreamUpdated.AND,
    ),
    start_date=datetime(2023, 1, 1),
    catchup=False,
    dag_id="only_models",
    tags=["filtering"],
)

With this, Cosmos would append to the Dag schedule:

     (
            Dataset("postgres://0.0.0.0:5432/postgres.public.raw_customers")) | \
            Dataset("postgres://0.0.0.0:5432/postgres.public.raw_orders") |  \
            Dataset("postgres://0.0.0.0:5432/postgres.public.raw_payments")
    ),

Extending the user-defined schedule if the user also had a set schedule beforehand.

Limitations with the proposed solution

The current proposal does not solve the following scenario:

Airflow 2.x does not support natively this use case since there is no AND equivalent for DatasetOrTimeSchedule.

However, users could overcome this limitation by creating a DAG that runs daily and emits a "daily" dataset. The DbtDags that rely on this could be set to schedule=Dataset("daily") - in addition to the Cosmos-generated dbt Datasets.

We may need to also allow users to have the ability to say how Cosmos dependencies relate to other user-defined scheduled datasets (AND or OR).

This approach does not handle the "freshness" of previous sources.

Alternative implementations

(a) Use TriggerDagRun operators

Another alternative would be to use TriggerDagRun operators. This approach seems less flexible and would not leverage conditional dataset scheduling, that can be very valuable.

The other challenge is that Cosmos would need to be aware of how the users define their DAGs and their scope, and it would have to analyse the DAG topology and dependency resolution more complexly.

(b) Introduce a DbtDagGroup or DbtDagFamily class

We could introduce a new concept for creating a group or family of DAGs for a specific dbt project.

Users would instantiate:

DbtDagGroup(
    project_config=,
    profile_config=,
   ...
)

And Cosmos would magically create those DAGs.

There are a few ways of accomplishing this.

(i) Leveraging dbt tags

Let's say the dbt_project.yml contains something like:

version: 2
models:

    staging:
      +tags:
        - "hourly"

    marts:
      +tags:
        - "hourly"
        - "published"

    metrics:
      +tags:
        - "daily"
        - "published"

Cosmos could try to automatically split based on "daily" and "hourly" into separate DAGs. Some of the challenges may be:

(ii) Explicitly configure dbt models using a Cosmos config

Let's say users were willing to annotate their models with a dedicated Cosmos configuration:

version: 2
models:
  - name: model_a
    config:
      cosmos:
        group_id:
        schedule:

Like approach (i), Cosmos would decide how to group things using this Cosmos-specific config. Some of the challenges:

(iii) Allow users to configure multiple DAGs within a bigger DAGGrouper:

Using Python, users would define their groups, and Cosmos would resolve the dependencies between them.

cosmos_dag_group = DbtDagGroup(
   DbtDag(
     select="tag:daily",
     schedule="@daily"
   ],
   DbtDag(
     select="tag:hourly",
     schedule=DbtUpstreamUpdated.OR
   ]
)

Are you willing to submit a PR?

uranusjr commented 4 days ago

Random thoughts below… text probably does not make sense unless you have context on this.

The DatasetAndTimeSchedule time can be emulated by putting a custom operator as the first task in the generated Airflow DAG. The task sets the depended assets as inlets, and use logis similar to dags_needing_dagruns to determine if the asset event conditions have been fulfilled. Skip all the tasks if the conditions are not satisfied.

Currently dags_needing_dagruns relies on AssetDagRunQueue so the task can’t just use it (it won’t be properly populated anyway). It needs to process the events directly, and call evaluate on its own. Fortunately it’s not too complicated…

(This is pseudo code)

def asset_condition_satisfied():
    def _has_events_in_period(events):
        last_event = next(iter(reversed(events)))
        return self._is_later_than_cutoff(last_event.timestamp)  # TODO: Need to implement this.

inlet_statuses = {
    inlet.uri: _has_events_in_period(inlet_events[inlet])
    for inlet in self.inlets
    if isinstance(inlet, BaseAsset)
}
if not self._condition.evaluate(inlet_statuses):
    # Skip all tasks...
    return self.skip(dag_run, ...)
DetermineAssetCondition(
    task_id="cosmos__determine_asset_condition",
    inlets=list(dbt_dag.asset_condition.iter_assets()),  # Gets a list of assets from the condition.
    cutoff=timedelta(hours=2),  # This can also be "schedule" to infer from the DAG schedule.
)
tatiana commented 4 days ago

That's very helpful, thanks a lot, @uranusjr!

Some notes from our discussion earlier today involving also:

It would be great if Apache Airflow itself expose this sort of interface:

    schedule=DatasetAndTimeSchedule(
        timetable=CronTriggerTimetable("0 0 * * *", timezone="UTC"),
        datasets=(
            Dataset("postgres://0.0.0.0:5432/postgres.public.raw_customers"))
    ),

With the following behaviour:

A bonus would be:

Summarising the feature using other words:

While Airflow does not expose this feature, as @uranusjr detailed, Cosmos could use a branch operator at the beginning of the downstream DAG DatasetAndTimeSchedule. We'll need to check how much of Dataset Events are currently exposed in the Airflow interface.