astronomer / astronomer-cosmos

Run your dbt Core projects as Apache Airflow DAGs and Task Groups with a few lines of code
https://astronomer.github.io/astronomer-cosmos/
Apache License 2.0
757 stars 170 forks source link

Running Dbt test command using DbtDag in Airflow #1330

Open sbutol01 opened 2 hours ago

sbutol01 commented 2 hours ago

Hello experts,

I'm pretty new to Cosmos and trying to run my dbt commands using Cosmos in Airflow.

I'm trying to run dbt test command using Cosmos operators DbtDag. But to my surprise it doesn't run dbt test, instead its using the dbt run command. Is there a way to pass in an argument to force it to run the command I want it to run. I have below code snippet, I tried using operator_args, but looks like it doesn't accept dbt_command parameter.

dbt_dag = DbtDag( project_config=ProjectConfig(local_dir), profile_config=profile_config, execution_config=execution_config, operator_args={ "dbt_command": "test", "dbt_args": ["--select", "model_name"], }, dag_id="dbt_dag", ) dbt_dag.test()

gives me below error. airflow.exceptions.AirflowException: Invalid arguments were passed to DbtRunLocalOperator (task_id: run). Invalid arguments were: [2024-11-18, 15:43:54 ] {{process_utils.py:190}} INFO - kwargs: {'dbt_command': 'test', 'dbt_args': ['--select', 'model_name']}**

Earlier I had used below code and it did run but it ran with below command. Looks like test() function isn't doing anything. INFO - Trying to run dbtRunner with: [2024-11-15, 13:55:44 ] {{process_utils.py:190}} INFO - ['run', '--models', 'model_name'.....]

dbt_dag = DbtDag( project_config=ProjectConfig(local_dir), profile_config=profile_config, execution_config=execution_config, dag_id="dbt_dag", ) dbt_dag.test()

I know there is DbtTestLocalOperator, but I want to achieve test command execution using DbtDag.

Any help would be highly appreciated.

dosubot[bot] commented 2 hours ago

Hey @sbutol01! I'm here to help you with your Cosmos issue. Let's tackle this together!

To run the dbt test command using the DbtDag operator in Cosmos, you need to use the appropriate operator designed for the dbt test command. The DbtDag operator itself does not directly support changing the dbt command through arguments like dbt_command. Instead, you should use the DbtTestLocalOperator if you want to execute the dbt test command specifically.

However, if you want to achieve this within the context of a DAG setup similar to DbtDag, you might need to create a custom DAG or task that uses the DbtTestLocalOperator to run the dbt test command. Here's an example of how you can use the DbtTestLocalOperator:

from cosmos.operators.local import DbtTestLocalOperator

dbt_test_task = DbtTestLocalOperator(
    task_id='dbt_test',
    # Add any additional parameters you need here
)

If you are set on using DbtDag and want to customize it to run dbt test, you might need to look into customizing the DAG creation process to include tasks that specifically run the dbt test command using the appropriate operator. Unfortunately, DbtDag does not natively support switching commands like this through its configuration alone [1].

To continue talking to Dosu, mention @dosu.

sbutol01 commented 2 hours ago

Thanks @dosu for your quick response. Actually I also used DbtTestLocalOperator, it does work also, but at the end it just fails my dag, giving me error like this - ERROR - Error during DBT run: Operator <Task(DbtTestLocalOperator): test> has not been assigned to a DAG yet

I get this error after the test execution are successfully ran. Earlier I thought this will be a quick fix, but then unfortunately I couldn't get this fixed. Below is the skeleton code I'm using for this. Also please note that I'm doing all this within my virtualenv using @task.virtualenv.

Below code does run the dag, but fails with above mentioned error, I tried modifying the code, but everytime I get error saying operator has not been assigned to dag yet. Could there be a quick fix. Thanks.

@dag(
    dag_id=Path(__file__).stem,
    start_date=pendulum.datetime(2024, 9, 12, tz="Pacific/Auckland"),
    tags=["test-tag"],
    # schedule="0 * * * *",
    max_active_runs=1,  # Prevent multiple runs of DAG, if previous run did not complete
    catchup=False,
)
def dbt_test_process():
    """
    The Airflow Dag to invoke dbt tests
    """
    @task.virtualenv(
        task_id="dbt_test_using_operator",
        requirements=[
            "dbt-snowflake==1.7.0",
            "astronomer-cosmos==1.7.0",
        ],
    )
    def run_dbt_using_operators():
        from cosmos import DbtTestLocalOperator, ProfileConfig

        test_operator = DbtTestLocalOperator(
            profile_config=profile_config,
            project_dir=local_dir,
            task_id="test",
            dbt_cmd_flags=["--models", "modle_name"],
            # install_deps=True,
            append_env=True,
            # dag=dbt_test_process,
        )

        try:
            logger.info("Executing DBT test operator...")
            test_operator.execute(context={})
            logger.info("DBT run completed successfully.")
        except Exception as e:
            logger.error(f"Error during DBT run: {e}")
            raise

        return "DBT task completed successfully"
    run_dbt_using_operators()

dbt_test_process()
dosubot[bot] commented 2 hours ago

To resolve the error "Operator <Task(DbtTestLocalOperator): test> has not been assigned to a DAG yet," ensure that the DbtTestLocalOperator is properly associated with a DAG. This error typically occurs when an operator is instantiated without being added to a DAG context.

In your code, the DbtTestLocalOperator should be instantiated within the DAG context. Here's how you can modify your code to ensure the operator is correctly assigned to the DAG:

@dag(
    dag_id=Path(__file__).stem,
    start_date=pendulum.datetime(2024, 9, 12, tz="Pacific/Auckland"),
    tags=["test-tag"],
    max_active_runs=1,
    catchup=False,
)
def dbt_test_process():
    """
    The Airflow Dag to invoke dbt tests
    """
    from cosmos import DbtTestLocalOperator, ProfileConfig

    test_operator = DbtTestLocalOperator(
        profile_config=profile_config,
        project_dir=local_dir,
        task_id="test",
        dbt_cmd_flags=["--models", "model_name"],
        append_env=True,
    )

    @task.virtualenv(
        task_id="dbt_test_using_operator",
        requirements=[
            "dbt-snowflake==1.7.0",
            "astronomer-cosmos==1.7.0",
        ],
    )
    def run_dbt_using_operators():
        try:
            logger.info("Executing DBT test operator...")
            test_operator.execute(context={})
            logger.info("DBT run completed successfully.")
        except Exception as e:
            logger.error(f"Error during DBT run: {e}")
            raise

        return "DBT task completed successfully"

    run_dbt_using_operators()

dbt_test_process()

Ensure that the DbtTestLocalOperator is instantiated directly within the DAG function, not inside the @task.virtualenv function. This way, it is automatically associated with the DAG context [1].

To continue talking to Dosu, mention @dosu.

sbutol01 commented 2 hours ago

This gets me import error. ModuleNotFoundError: No module named 'cosmos' it can't identify them because I'm installing them in virtualenv in later stage. @dosu, @tatiana

dosubot[bot] commented 2 hours ago

To resolve the ModuleNotFoundError: No module named 'cosmos', ensure that the Cosmos library is installed in the environment where your Airflow tasks are running. Since you're using a virtual environment, you need to make sure that Cosmos is included in the requirements list of your @task.virtualenv decorator. Here's how you can adjust your code:

  1. Include Cosmos in the Virtual Environment Requirements:

    Make sure that astronomer-cosmos is listed in the requirements parameter of the @task.virtualenv decorator:

    @task.virtualenv(
       task_id="dbt_test_using_operator",
       requirements=[
           "dbt-snowflake==1.7.0",
           "astronomer-cosmos==1.7.0",
       ],
    )
  2. Ensure Proper Import:

    Import the necessary modules inside the function decorated with @task.virtualenv to ensure they are available in the virtual environment:

    @task.virtualenv(
       task_id="dbt_test_using_operator",
       requirements=[
           "dbt-snowflake==1.7.0",
           "astronomer-cosmos==1.7.0",
       ],
    )
    def run_dbt_using_operators():
       from cosmos import DbtTestLocalOperator, ProfileConfig
       # Rest of your code...

By ensuring that astronomer-cosmos is included in the requirements list, it will be installed in the virtual environment, making the cosmos module available for import during task execution.

To continue talking to Dosu, mention @dosu.