gocardless / airflow-dbt

Apache Airflow integration for dbt
https://pypi.org/project/airflow-dbt/
MIT License
392 stars 62 forks source link

Add the functionality to parse env parameter to the airflow worker #60

Closed RafalSiwek closed 2 years ago

RafalSiwek commented 2 years ago

When working with DBT projects, a need arises to be able to pass environment variables to the worker context on which the given operator is executed.

This pull request introduces a solution, similar to the BashOperator, with an additional kwarg parameter, called env which allows parsing env variables to the context using existing operators:

 import os
...

dbt_run = DbtRunOperator(
  task_id='dbt_run',
  env={
    'DBT_ENV_SECRET_DATABASE': '<DATABASE>',
    'DBT_ENV_SECRET_PASSWORD': '<PASSWORD>',
    'DBT_ENV_SECRET_SCHEMA': '<SCHEMA>',
    'USER_NAME': '<USER_NAME>',
    'DBT_THREADS': os.getenv('<DBT_THREADS_ENV_VARIABLE_NAME>'),
    'ENV_NAME': os.getenv('ENV_NAME')
  }
)
RafalSiwek commented 2 years ago

This PR should solve the Issue #36

mariotaddeucci commented 2 years ago

Hey @RafalSiwek, I loved that, this will really help me. I have a suggestion for this feature, can you add the env to the template fields? By adding template_fields = ("env",) as an static call atribute, this will allow us to access airflow secrets manager by typing the expression: "DBT_HOST": "{{ conn.my_dbt_secret.host }}". with this parameter it's possible to solves #39 too

RafalSiwek commented 2 years ago

Hey @RafalSiwek, I loved that, this will really help me. I have a suggestion for this feature, can you add the env to the template fields? By adding template_fields = ("env",) as an static call atribute, this will allow us to access airflow secrets manager by typing the expression: "DBT_HOST": "{{ conn.my_dbt_secret.host }}". with this parameter it's possible to solves #39 too

Hey @mariotaddeucci, thank you for the suggestion, I've added the env parameter to the template fields list - feel free to check the last commit.

dinigo commented 2 years ago

@RafalSiwek , nice indeed

cvlendistry commented 2 years ago

@andrewrjones is there an ETA for the release with this PR included?

dinigo commented 2 years ago

@cvlendistry , it should be supported now since it was merged with the main branch

AndyBys commented 2 years ago

Hello,

Unfortunately, I am getting airflow.exceptions.AirflowException: Invalid arguments were passed to DbtRunOperator (task_id: dbt_run). Invalid arguments were: **kwargs: {'env': {here dict of my credentials}}. It's a bit strange, I just try to reproduce an example from docs.

airflow-dbt==0.4.0 apache-airflow==2.2.0

Have someone encountered same behaviour once?

Not sure if this a good place for discussing this, let me know if this more appropriate places to talk about it.

Thank you in advance!

dinigo commented 2 years ago

@AndyBys , it hasn't been released yet. I just did a PR to try to work with gocardless in a more agile release system #67

lrahal commented 1 year ago

Hi! Do you have an ETA for when the env parameter will be released? It prevents me from using the package for now....

emily-flambe commented 1 year ago

Hello,

Unfortunately, I am getting airflow.exceptions.AirflowException: Invalid arguments were passed to DbtRunOperator (task_id: dbt_run). Invalid arguments were: **kwargs: {'env': {here dict of my credentials}}. It's a bit strange, I just try to reproduce an example from docs.

airflow-dbt==0.4.0 apache-airflow==2.2.0

Have someone encountered same behaviour once?

Not sure if this a good place for discussing this, let me know if this more appropriate places to talk about it.

Thank you in advance!

I am running into this exact same issue right now - is there a known workaround?

RafalSiwek commented 1 year ago

Hello, Unfortunately, I am getting airflow.exceptions.AirflowException: Invalid arguments were passed to DbtRunOperator (task_id: dbt_run). Invalid arguments were: **kwargs: {'env': {here dict of my credentials}}. It's a bit strange, I just try to reproduce an example from docs. airflow-dbt==0.4.0 apache-airflow==2.2.0 Have someone encountered same behaviour once? Not sure if this a good place for discussing this, let me know if this more appropriate places to talk about it. Thank you in advance!

I am running into this exact same issue right now - is there a known workaround?

Hi, this issue appears to be still related to not updated PyPi airflow-dbt package. As a workaround, you can always inject the plugin code manually into your airflow instance.

emily-flambe commented 1 year ago

you can always inject the plugin code manually into your airflow instance

Would be curious to see if anyone has an example of this - I can save the files and import them as local dependencies, but then I get this error:


[2022-10-13, 17:21:34 UTC] {taskinstance.py:1911} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/gcs/dags/cta-dags/dbt/dependencies/airflow_dbt_local/operators/dbt_operator.py", line 154, in execute
    self.create_hook().run_cli('deps')
  File "/home/airflow/gcs/dags/cta-dags/dbt/dependencies/airflow_dbt_local/hooks/dbt_hook.py", line 130, in run_cli
    sp = subprocess.Popen(
  File "/opt/python3.8/lib/python3.8/subprocess.py", line 858, in __init__
    self._execute_child(args, executable, preexec_fn, close_fds,
  File "/opt/python3.8/lib/python3.8/subprocess.py", line 1704, in _execute_child
    raise child_exception_type(errno_num, err_msg, err_filename)
FileNotFoundError: [Errno 2] No such file or directory: 'dbt'```
qgaborit-ledger commented 1 year ago

Latest airflow-dbt official release dates back to September 2021 so indeed the env parameter proposed in this PR is not included yet. Anyone have a workaround that still relies on dbtrunoperator and env vars? Or an ETA for this PR?

kubaracek commented 1 year ago

@andrewrjones Can you release this to pip please?

RafalSiwek commented 1 year ago

you can always inject the plugin code manually into your airflow instance

Would be curious to see if anyone has an example of this - I can save the files and import them as local dependencies, but then I get this error:

[2022-10-13, 17:21:34 UTC] {taskinstance.py:1911} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/gcs/dags/cta-dags/dbt/dependencies/airflow_dbt_local/operators/dbt_operator.py", line 154, in execute
    self.create_hook().run_cli('deps')
  File "/home/airflow/gcs/dags/cta-dags/dbt/dependencies/airflow_dbt_local/hooks/dbt_hook.py", line 130, in run_cli
    sp = subprocess.Popen(
  File "/opt/python3.8/lib/python3.8/subprocess.py", line 858, in __init__
    self._execute_child(args, executable, preexec_fn, close_fds,
  File "/opt/python3.8/lib/python3.8/subprocess.py", line 1704, in _execute_child
    raise child_exception_type(errno_num, err_msg, err_filename)
FileNotFoundError: [Errno 2] No such file or directory: 'dbt'```

To inject the custom operator into the Airflow setup I used the tips on https://docs.astronomer.io/learn/airflow-importing-custom-hooks-operators For AWSs MWAA it required importing the files into the S3 bucket, MWAA is linked to: https://docs.aws.amazon.com/mwaa/latest/userguide/configuring-dag-folder.html

The FileNotFoundError: [Errno 2] No such file or directory: 'dbt' issue appears to be related to not specifying the full executable path in the injected command, as it is recommended in the subprocess.Popen documentation In our MWAA implementation, we specified the dbt_bin parameter as an absolute path to the dbt executable:

dbt_run = DbtRunOperator(
            task_id='dbt_run',
            env=env_dict,
            dbt_bin='/usr/local/airflow/.local/bin/dbt',
            profiles_dir=RESOURCES_PATH,
            dir=RESOURCES_PATH
        )
alexanderspen commented 1 year ago

Hi - Also running into this issue

https://github.com/gocardless/airflow-dbt/pull/60#issuecomment-1276730670

Any idea when this is going to be released and the airflow-dot package updated for pip ? Seems like this would solve a bunch of issues for lot of people

Cheers