astronomer / astronomer-cosmos

Run your dbt Core projects as Apache Airflow DAGs and Task Groups with a few lines of code
https://astronomer.github.io/astronomer-cosmos/
Apache License 2.0
516 stars 131 forks source link

[feature] Support operator_arguments injection at a node level #881

Open linchun3 opened 4 months ago

linchun3 commented 4 months ago

Context

Currently, we can use operator_args to pass in task-level arguments to all tasks in a DbtTaskGroup or DbtDAG.

However, this applies across all tasks in the DAG/TaskGroup. Sometimes, we'd like granular control over what is passed into each model's operator.

Use case(s)

Potential Solution

Currently, we can achieve this by doing the following:

  1. Define airflow operator args in model's yml file under config (or config.meta)

    version: 2
    models:
    - name: model_a
    config:
      alias: model_a
      operator_args:
        retry_delay: 500
        retries: 10
  2. Leverage node_converters

from airflow.models import BaseOperator
from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup
from cosmos.airflow.graph import (
    generate_task_or_group,
)
from cosmos.dbt.graph import DbtNode

def inject_operator_args(
    dag: DAG,
    task_group: TaskGroup,
    node: DbtNode,
    **kwargs,
) -> BaseOperator | TaskGroup | None:
    """
    Return a task or group after injecting operator args
    if present in a model/test config
    """
    try:
        operator_args = node.config["operator_args"]
        kwargs["task_args"] = kwargs["task_args"] | operator_args
    except KeyError:
        # skip injection if `operator_args` not available
        pass
    except Exception as e:
        raise
    finally:
        task = generate_task_or_group(
            dag=dag,
            task_group=task_group,
            node=node,
            **kwargs,
        )

    return task

Conclude

The above solution works. However, this feels like it should be a default behaviour of cosmos.

dosubot[bot] commented 4 months ago

That sounds like a great idea! Your proposed solution seems well thought out and could definitely add value to the project. We encourage you to go ahead and create a pull request with your changes. Your contribution would be much appreciated by the community. Thank you for your willingness to improve the project!

#### About Dosu This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant. If you want to continue the conversation, start your reply with @dosu-bot.
dwreeves commented 4 months ago

If Astronomer were to commit to an API that seeps more into the Dbt schema (note, with your suggestion this would be the first time Cosmos directly dictates how dbt code is written), I actually think that placing all Cosmos configuration inside of config.cosmos rather than config.operator_args would be a safer bet. So something like this:

version: 2
models:
  - name: model_a
    config:
      alias: model_a
      cosmos:
        operator_args:
          retry_delay: 500
          retries: 10

It is not unreasonable to think of a world where Cosmos requires more keys than just operator_args, and keeping them nice and tidy inside a cosmos: mapping is probably the safer option.

With that slight change in the API I would fully endorse this feature. It comes up super often that you need to do stuff like this. (I actually have a related but separate issue I was intending on opening in the near future relating to providing easier access to customization of how Cosmos runs...)

linchun3 commented 4 months ago

Hey @dwreeves ,

Thanks for reviewing this issue quickly.

I agree entirely with your proposed change in API 😀

Most configurations are "clobbered" when applied hierarchically. Whenever a more specific value is available, it will completely replace the less specific value. Note that a few configs have different merge behavior:

See https://docs.getdbt.com/reference/configs-and-properties#combining-configs for details.

A user may expect the cosmos key/value(s) to be merged instead of overwritten entirely if they define it in both dbt_project.yml and model.yml.

We could either accept this as default behaviour - as with the default behaviour for keys not specified by dbt-core or make merge a default behaviour. WDYT?

Either ways, we could document this behaviour in the cosmos docs.

Should I make an attempt to contribute this feature with your proposed API change?

dwreeves commented 4 months ago

I'm not a maintainer of this repo, so I won't be the right person to ask that. I was just putting in an API suggestion and a yes vote for the feature.

tatiana commented 3 months ago

@linchun3 this is an excellent proposal - including the suggestion by @dwreeves. We'd love to have this feature! Please, feel free to work on this!