astronomer / astronomer-cosmos

Run your dbt Core projects as Apache Airflow DAGs and Task Groups with a few lines of code
https://astronomer.github.io/astronomer-cosmos/
Apache License 2.0
657 stars 168 forks source link

[Bug] Error when using ExecutionMode.VIRTUALENV #1248

Open nm419 opened 1 month ago

nm419 commented 1 month ago

Astronomer Cosmos Version

Other Astronomer Cosmos version (please specify below)

If "Other Astronomer Cosmos version" selected, which one?

1.7.0

dbt-core version

1.8.5

Versions of dbt adapters

dbt-athena-community==1.8.3

LoadMode

AUTOMATIC

ExecutionMode

VIRTUALENV

InvocationMode

None

airflow version

2.10.1

Operating System

Ubuntu 20.04.6 LTS

If a you think it's an UI issue, what browsers are you seeing the problem on?

No response

Deployment

Amazon (AWS) MWAA

Deployment details

No response

What happened?

Upgraded our MWAA env to 2.10.1 and reviewed our requirements.txt to upgrade the relevant packages. Upgraded astronomer-cosmos from 1.5.1 to 1.7.0. Attempted to follow the https://github.com/astronomer/astronomer-cosmos/blob/main/dev/dags/example_virtualenv.py example dag as we previously used the ExecutionMode.VIRTUALENV and it has changed since 1.6.0 (by the looks of it).

Relevant log output

[2024-10-09, 07:21:22 UTC] {taskinstance.py:2612} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: aca.cached-venv-group.road_links.run manual__2024-10-09T07:21:06.879266+00:00 [queued]>
[2024-10-09, 07:21:22 UTC] {taskinstance.py:2612} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: aca.cached-venv-group.road_links.run manual__2024-10-09T07:21:06.879266+00:00 [queued]>
[2024-10-09, 07:21:22 UTC] {taskinstance.py:2865} INFO - Starting attempt 1 of 1
[2024-10-09, 07:21:22 UTC] {taskinstance.py:2888} INFO - Executing <Task(DbtRunVirtualenvOperator): cached-venv-group.road_links.run> on 2024-10-09 07:21:06.879266+00:00
[2024-10-09, 07:21:22 UTC] {standard_task_runner.py:72} INFO - Started process 1253 to run task
[2024-10-09, 07:21:22 UTC] {standard_task_runner.py:104} INFO - Running: ['airflow', 'tasks', 'run', 'aca', 'cached-venv-group.road_links.run', 'manual__2024-10-09T07:21:06.879266+00:00', '--job-id', '199', '--raw', '--subdir', 'DAGS_FOLDER/aca/dag.py', '--cfg-path', '/tmp/tmppy92y3u7']
[2024-10-09, 07:21:22 UTC] {standard_task_runner.py:105} INFO - Job 199: Subtask cached-venv-group.road_links.run
[2024-10-09, 07:21:22 UTC] {task_command.py:467} INFO - Running <TaskInstance: aca.cached-venv-group.road_links.run manual__2024-10-09T07:21:06.879266+00:00 [running]> on host ip-10-36-106-148.ap-southeast-2.compute.internal
[2024-10-09, 07:21:22 UTC] {taskinstance.py:3131} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='' AIRFLOW_CTX_DAG_OWNER='TAIM' AIRFLOW_CTX_DAG_ID='aca' AIRFLOW_CTX_TASK_ID='cached-venv-group.road_links.run' AIRFLOW_CTX_EXECUTION_DATE='2024-10-09T07:21:06.879266+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-10-09T07:21:06.879266+00:00'
[2024-10-09, 07:21:22 UTC] {taskinstance.py:731} ▲▲▲ Log group end
[2024-10-09, 07:21:22 UTC] {baseoperator.py:405} WARNING - DbtRunVirtualenvOperator.execute cannot be called outside TaskInstance!
[2024-10-09, 07:21:22 UTC] {virtualenv.py:114} INFO - Checking if the virtualenv lock /tmp/persistent-venv2/cosmos_virtualenv.lock exists
[2024-10-09, 07:21:22 UTC] {virtualenv.py:120} INFO - Acquiring the virtualenv lock
[2024-10-09, 07:21:22 UTC] {virtualenv.py:189} INFO - Acquiring lock at /tmp/persistent-venv2/cosmos_virtualenv.lock with pid 1253
[2024-10-09, 07:21:22 UTC] {virtualenv.py:147} INFO - Creating or updating the virtualenv at `/tmp/persistent-venv2
[2024-10-09, 07:21:22 UTC] {process_utils.py:186} INFO - Executing cmd: python3 /usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv /tmp/persistent-venv2 --python=python3
[2024-10-09, 07:21:22 UTC] {process_utils.py:190} INFO - Output:
[2024-10-09, 07:21:22 UTC] {process_utils.py:194} INFO - python3: can't open file '/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv': [Errno 2] No such file or directory
[2024-10-09, 07:21:22 UTC] {virtualenv.py:125} INFO - Releasing virtualenv lock
[2024-10-09, 07:21:22 UTC] {taskinstance.py:3310} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/utils/operator_helpers.py", line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 406, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/cosmos/operators/virtualenv.py", line 138, in execute
    output = super().execute(context)
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 406, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/cosmos/operators/base.py", line 268, in execute
    self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags())
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/cosmos/operators/local.py", line 645, in build_and_run_cmd
    result = self.run_command(cmd=dbt_cmd, env=env, context=context)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/cosmos/operators/virtualenv.py", line 122, in run_command
    self._py_bin = self._prepare_virtualenv()
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/cosmos/operators/virtualenv.py", line 148, in _prepare_virtualenv
    py_bin = prepare_virtualenv(
             ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/utils/python_virtualenv.py", line 105, in prepare_virtualenv
    execute_in_subprocess(virtualenv_cmd)
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/utils/process_utils.py", line 175, in execute_in_subprocess
    execute_in_subprocess_with_kwargs(cmd, cwd=cwd, env=env)
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/utils/process_utils.py", line 198, in execute_in_subprocess_with_kwargs
    raise subprocess.CalledProcessError(exit_code, cmd)
subprocess.CalledProcessError: Command '['python3', '/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', '/tmp/persistent-venv2', '--python=python3']' returned non-zero exit status 2.
[2024-10-09, 07:21:22 UTC] {taskinstance.py:1225} INFO - Marking task as FAILED. dag_id=aca, task_id=cached-venv-group.road_links.run, run_id=manual__2024-10-09T07:21:06.879266+00:00, execution_date=20241009T072106, start_date=20241009T072122, end_date=20241009T072122

How to reproduce

  1. Use https://github.com/aws/aws-mwaa-local-runner
  2. set environment vars in docker/config/.env.localrunner. ` export AIRFLOW_HOME="/usr/local/airflow"

export DBT_VENV_PATH="${AIRFLOW_HOME}/dbt_venv"

export PIP_USER=false

export PIP_USER=true

export DAGS_HOME="${AIRFLOW_HOME}/dags"

export DBT_EXECUTABLE_PATH="${DBT_VENV_PATH}/bin/dbt"

export AWS_DEFAULT_REGION='ap-southeast-2'

export AIRFLOWCOSMOSDBT_DOCS_CONN_ID="aws_default"

`

  1. Add the following to requirements.txt

--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.1/constraints-3.11.txt"

airflow-dbt-python==2.1.0 airflow-provider-great-expectations==0.2.7 apache-airflow-providers-amazon==8.28.0 apache-airflow-providers-apprise==1.4.0 apache-airflow-providers-atlassian-jira==2.7.0 apache-airflow-providers-microsoft-mssql==3.9.0 apache-airflow-providers-openlineage==1.11.0 apache-airflow-providers-oracle==3.11.0 apache-airflow-providers-postgres==5.12.0 apache-airflow-providers-sftp==4.11.0 apache-airflow-providers-tableau==4.6.0 astronomer-cosmos==1.7.0 awswrangler[openpyxl]==3.9.0 dbt-athena-community==1.8.4 dbt-core==1.8.7 dbt-postgres==1.8.2 email-validator==2.2.0 geopandas==1.0.1 geoalchemy2==0.14.1 jira==3.6.0 loguru==0.7.2 Office365-REST-Python-Client==2.5.9 openpyxl==3.1.5 pyarrow==14.0.2 pydantic==2.8.2 pydantic-settings==2.1.0 pyogrio==0.7.2 pytest==8.3.2 python-benedict==0.33.1 schema==0.7.5 virtualenv==20.26.3 xxhash==3.4.1 protobuf < 5.26.0

  1. set up example dag: example_virtualenv.py https://github.com/astronomer/astronomer-cosmos/blob/main/dev/dags/example_virtualenv.py

Anything else :)?

Not sure if i have something incorrectly set up but the following code seems to work when running with ExecutionMode.LOCAL> Not sure if I should just go with that or because we're running on MWAA we should continue to use Virtualenv.

My dag.py file:

"""
ACA ingestion and breakdown to 10m level
"""

import os
import sys
from pathlib import Path

import pendulum
import virtualenv
from aca.version import environment, version_str
from airflow import Dataset
from airflow.decorators import dag, task

from cosmos import (
    DbtTaskGroup,
    ExecutionConfig,
    ProfileConfig,
    ProjectConfig,
    RenderConfig,
)
from cosmos.constants import ExecutionMode
from loguru import logger
from utils.callback.jira import failure_callback, success_callback

id = "aca"
dag_path = f"{ os.getenv('DAGS_HOME') }/{ id }"

profile_config = ProfileConfig(
    profile_name="athena",
    target_name=environment,
    profiles_yml_filepath=f"{ dag_path }/profiles.yml",
)

@dag(
    dag_id=id,
    schedule=[Dataset("rcs_bronze")],
    start_date=pendulum.datetime(2021, 1, 1, tz="Australia/Adelaide"),
    catchup=False,
    max_active_runs=1,
    max_active_tasks=5,
    dagrun_timeout=pendulum.duration(minutes=60),
    on_success_callback=success_callback,
    tags=["silver", "TAIM", version_str, environment],
    default_args={
        "owner": "TAIM",
        "depends_on_past": False,
        "email": [""],
        "email_on_failure": True,
        "email_on_retry": False,
        "retries": 0,
        "email_on_success": False,
        "on_failure_callback": failure_callback,
    },
    doc_md=__doc__,
)
def aca():
    cached_venv_task_group = DbtTaskGroup(
        group_id="cached-venv-group",
        # dbt/cosmos-specific parameters
        project_config=ProjectConfig(dag_path),
        profile_config=profile_config,
        execution_config=ExecutionConfig(
            execution_mode=ExecutionMode.VIRTUALENV,
            virtualenv_dir=Path("/tmp/virtual_env"),
        ),
        operator_args={
            "py_system_site_packages": False,
            "py_requirements": ["dbt-athena-community==1.8.3"],
            "install_deps": True,
        },
    )
    cached_venv_task_group

aca()

Are you willing to submit PR?

Contact Details

nickmalandris@gmail.com

dosubot[bot] commented 1 month ago

Hello, @nm419! I'm here to help you with your issue. Let's work together to get it sorted out.

To resolve the error with DbtRunVirtualenvOperator when using ExecutionMode.VIRTUALENV in Astronomer Cosmos 1.7.0 on MWAA, ensure the following:

  1. Ensure virtualenv is Installed: The error indicates that the virtualenv package might not be found. Make sure virtualenv==20.26.3 is included in your requirements.txt and properly installed in your Airflow environment.

  2. Set py_requirements: Ensure you specify the required Python packages in py_requirements. For example:

    operator_args={
       "py_system_site_packages": False,
       "py_requirements": ["dbt-athena-community==1.8.3"],
       "install_deps": True,
    }
  3. Check virtualenv_dir: Ensure the virtualenv_dir is correctly set and accessible. If it's temporary, ensure it is created and managed properly.

  4. Verify Python Path: The error log shows an attempt to execute a command using Python 3.7. Ensure that the correct Python version is being used and that the path to the virtualenv script is correct.

  5. Lock Handling: Ensure that the virtual environment lock is managed correctly to avoid conflicts.

If these steps do not resolve the issue, further investigation into the specific error message and logs would be necessary. The error suggests a potential misconfiguration or missing dependencies in your setup [1][2].

To continue talking to Dosu, mention @dosu.

pankajkoti commented 4 weeks ago

hi @nm419 , just wish to check if this was due to a change introduced in the latest release of Cosmos 1.7. Could you please try once with an earlier version astronomer-cosmos==1.6.0 and see if you still get that error?

nm419 commented 3 weeks ago

Hi @pankajkoti , I have updated my requirements with astronomer-cosmos==1.6.0 and the same error has occurred.