astronomer / astronomer-cosmos

Run your dbt Core projects as Apache Airflow DAGs and Task Groups with a few lines of code
https://astronomer.github.io/astronomer-cosmos/
Apache License 2.0
514 stars 130 forks source link

[feature] support for dbt clone or dbt build ? DbtCloneOperator #878

Open SPTKL opened 4 months ago

SPTKL commented 4 months ago

Hi there, our cicd runs "write-audit-publish", basically we do 1) a dbt build command to run and test models in staging 2) a dbt clone command to clone models created in step 1 to a production environment. Can we implement a DbtCloneOperator? If so what's the recommended place? Any suggestions on how to implement?

Also given the flexibility of cosmos, say if we want to change our process to 1) for all models, following the order of the dag, dbt run the model in staging 2) dbt test the model in staging 3) if test passes, dbt clone the model from staging to production

ideally, in the dag, each model should have their own task group with 4 tasks, run, test, snapshot, clone. It's impossible in the dbt cli because clone is a different command (but we get run, test, snapshot in dbt build). what's the recommended path for implementing something like this ^, I know it's difficult because given our current implementation, the environments are configured using different targets with different dbt profiles.

dosubot[bot] commented 4 months ago

Hey @SPTKL! Great to see you back with another interesting challenge! Hope you've been doing well.

To implement a DbtCloneOperator in the Cosmos repository to support a dbt clone command, you can follow the pattern established by the existing operators in the cosmos/operators/kubernetes.py file. Given that dbt clone is not a standard dbt command, this implementation assumes that you have a custom dbt command or script that performs the cloning operation, which can be executed via the dbt CLI.

Here's a step-by-step guide to creating the DbtCloneOperator:

  1. Define the DbtCloneMixin: This mixin will contain the specific logic for building the dbt clone command. Since dbt clone is not a standard command, you'll need to adapt this part based on your actual command or script.

  2. Implement the DbtCloneKubernetesOperator: This operator will use the DbtCloneMixin and DbtKubernetesBaseOperator to execute the clone command in a Kubernetes Pod.

Here's an example implementation based on the provided context:

class DbtCloneMixin:
    """
    Mixin for dbt clone command.
    """
    template_fields: Sequence[str] = ("clone_flags",)

    def build_clone_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> tuple[list[str], dict]:
        """
        Builds the dbt clone command.
        """
        cmd = [self.dbt_executable_path, "clone"]  # Assuming 'dbt clone' is the command.
        env_vars = self.build_env_vars(context)
        if cmd_flags:
            cmd.extend(cmd_flags)
        return cmd, env_vars

class DbtCloneKubernetesOperator(DbtCloneMixin, DbtKubernetesBaseOperator):
    """
    Executes a dbt clone command in a Kubernetes Pod.
    """
    template_fields: Sequence[str] = DbtKubernetesBaseOperator.template_fields + DbtCloneMixin.template_fields  # type: ignore[operator]

    def execute(self, context: Context) -> None:
        cmd, env_vars = self.build_clone_cmd(context=context, cmd_flags=self.cmd_flags)
        self.build_and_run_cmd(context=context, cmd_flags=cmd)

To modify your process to sequentially run, test, and clone each model, you can define an Airflow DAG that uses the DbtRunKubernetesOperator, DbtTestKubernetesOperator, and the newly created DbtCloneKubernetesOperator. Ensure you configure each operator with the appropriate profile_config to target the correct dbt profile for staging and production environments.

Here's a simplified example of how you might set up the DAG:

from airflow import DAG
from airflow.utils.dates import days_ago
from cosmos.operators.kubernetes import DbtRunKubernetesOperator, DbtTestKubernetesOperator, DbtCloneKubernetesOperator

with DAG("dbt_workflow", start_date=days_ago(1), schedule_interval=None) as dag:
    run_model = DbtRunKubernetesOperator(
        task_id="run_model",
        profile_config=ProfileConfig(profile_name="staging"),
        # Additional parameters as needed
    )

    test_model = DbtTestKubernetesOperator(
        task_id="test_model",
        profile_config=ProfileConfig(profile_name="staging"),
        # Additional parameters as needed
    )

    clone_model = DbtCloneKubernetesOperator(
        task_id="clone_model",
        profile_config=ProfileConfig(profile_name="production"),
        # Additional parameters as needed
    )

    run_model >> test_model >> clone_model

This example assumes that you have a ProfileConfig object or similar configuration management in place to handle dbt profiles and targets. Adjust the implementation details based on your actual command for cloning and your environment setup.

For more details, you can refer to the existing operators in the cosmos/operators/kubernetes.py file here.

Sources

#### About Dosu This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a šŸ‘ if this is helpful and šŸ‘Ž if it is irrelevant. If you want to continue the conversation, start your reply with @dosu-bot.
tatiana commented 1 month ago

HI @SPTKL, This is a very useful feature. I'm still learning about the dbt clone command.

Your feature request has similarities with #903 .

One of the challenges is that we may need to introduce Cosmos intermediate storage so that independent tasks, running potentially in different worker nodes/containers / VMs, could share the build artifacts needed by dbt clone. This is being discussed in a separate ticket: #894

Would you be interested in contributing to Cosmos in this area?

SPTKL commented 1 month ago

Yes, I'm interested!