gocardless / airflow-dbt

Apache Airflow integration for dbt
https://pypi.org/project/airflow-dbt/
MIT License
395 stars 62 forks source link

Decouple hook executor from the operator, add Cloud Build execution and adjust testing #48

Open dinigo opened 3 years ago

dinigo commented 3 years ago

Follow #45 which was closed and locked when I accidentally renamed the branch it was pointing to Closes #46, #44, #43, #36, #30

andrewrjones commented 3 years ago

Hey @dinigo,

Thanks for the contribution! This looks really good and it's great you've been able to make these changes without changing the API.

I've had a quick look through the code and it looks good. Given there are quite a few changes I want to have another look tomorrow and test it a bit more, but not expecting any problems. I'll get back to you by the end of day tomorrow.

Thanks again!

dinigo commented 3 years ago

@andrewrjones about the README, definitely, it is in the oven now. Working on the pydoc.

If you think something should be done different please say so :) don't restrain. Critics are much welcome

I have this example DAG

"""Example dag to illustrate usage of the library"""
from datetime import datetime
from pathlib import Path
from uuid import uuid4

from airflow import DAG
from airflow.hooks.subprocess import SubprocessHook
from airflow.providers.google.cloud.transfers.local_to_gcs import \
    LocalFilesystemToGCSOperator

from airflow_dbt.operators.dbt_operator import (
    DbtTestOperator,
)
from airflow_dbt.operators.google import DbtCloudBuildOperator

CURR_FOLDER = path = str(Path(__file__).parent.absolute())
DBT_PROJECT_FOLDER = 'jaffle-shop'
DS_NODASH = '{{ ds_nodash }}'
DBT_PROJECT_PATH = f'{CURR_FOLDER}/{DBT_PROJECT_FOLDER}'
BLOB_NAME = f'{DBT_PROJECT_FOLDER}-{DS_NODASH}.tar.gz'
BUCKET_NAME = '{{ var.value.dbt_staging_bucket }}'
BLOB_URL = f'gs://{BUCKET_NAME}/{BLOB_NAME}'
GOOGLE_APPLICATION_CREDENTIALS = '{{ var.value.google_app_cred_path }}'

with DAG('dbt_dag_example', start_date=datetime(2021, 10, 22)) as dag:
    # run dbt test in local
    dbt_test = DbtTestOperator(
        task_id='dbt_test',
        project_dir=DBT_PROJECT_PATH,
        profiles_dir=DBT_PROJECT_PATH,
        dbt_bin='{{ var.value.dbt_bin }}',  # where pip installs dbt
        use_colors=False,
        env={
            'GOOGLE_APPLICATION_CREDENTIALS': GOOGLE_APPLICATION_CREDENTIALS
        }
    )

    COMPRESSED_FILE_PATH = f'jaffle-shop-{uuid4().hex}.tar.gz'

    # upload the current dbt project folder gzipped to a bucket
    upload_dbt_project = LocalFilesystemToGCSOperator(
        task_id="upload_dbt_project",
        src=f'{CURR_FOLDER}/{COMPRESSED_FILE_PATH}',
        dst=BLOB_NAME,
        bucket=BUCKET_NAME,
        pre_execute=lambda *_: SubprocessHook().run_command(
            command=['tar', '-czvf', COMPRESSED_FILE_PATH, DBT_PROJECT_FOLDER],
            cwd=CURR_FOLDER
        ),
        post_execute=lambda *_: SubprocessHook().run_command(
            command=['rm', COMPRESSED_FILE_PATH],
            cwd=CURR_FOLDER,
        )
    )

    # execute dbt run in cloud build
    dbt_run_on_cloud_build = DbtCloudBuildOperator(
        task_id='dbt_run_on_cloud_build',
        gcs_staging_location=BLOB_URL,
        command='run',
        project_id='{{ var.value.project_id }}',  # optional
        service_account='{{ var.value.cloud_build_sa }}',  # optional
        use_colors=False,  # dbt config
        profiles_dir=DBT_PROJECT_FOLDER,  # dbt config
        project_dir=DBT_PROJECT_FOLDER,  # dbt config
        vars={'some ': 1},
        data=True,
        selector='this select',
        select=True,
    )

    dbt_test >> upload_dbt_project >> dbt_run_on_cloud_build

You can see that I

  1. Run dbt test in local
  2. Upload the local dbt directory gzipped to GCS
  3. Run dbt run in Cloud Build
dinigo commented 3 years ago

Providers version is now 6.0.0 but GCP Composer (Google managed Airflow) uses google provider 5.1.0. There are some cool features in the new one and I don't want to be writing if 5 do that elif 6 do this other thing. As typically provider versions go hand by hand (you install all the providers with the same version). I was thinking on turning this package into the DBT provider.

This way we don't have to restrict the google provider version (and potentially aws and others). We can break the API between versions.

What are your thoughts on turning this into a apache-airflow-providers-dbt

fenimore commented 2 years ago

Is this still a WIP or ready for merge?

dinigo commented 2 years ago

@fenimore, sorry, I've been out of it for some time. I know this is quite a big change so I'd be a confidence booster if you could do a critical code review. Try it out with your DAGs, help me validate that the API hasn't changed...

fenimore commented 2 years ago

Any updates on if this can be merged? LGTM

RowdyHowell commented 2 years ago

Following this as I have a MWAA deployment that could absolutely use this functionality to manage passwords. Appreciate the effort on all parts!

drackham commented 2 years ago

Also following for the same use case as @RowdyHowell

dinigo commented 2 years ago

@fenimore, ~sorry for the delay. I see feature is ready to ship.~

We're going to use a private fork until the new version reaches pypi.

Thanks @RowdyHowell and @drackham for your patience too.

🚀

dinigo commented 2 years ago

There's still some work going on as you can see

ghost commented 2 years ago

@dinigo is this ready for final review?

RowdyHowell commented 2 years ago

Bump! Would there be interest in pulling out the environment variable functionality into a new PR so we can close #36?

RafalSiwek commented 2 years ago

Bump! Would there be interest in pulling out the environment variable functionality into a new PR so we can close #36?

Hi @RowdyHowell, I was able to create a PR #60 with the env functionality, please take a look and feel free to leave a 👍

dinigo commented 2 years ago

It's been some time since I put some time into this issue. I think the PR is too big, I'm glad some other people took the time to implement the issues separated, like the env variables.

This PR addresses quite too many things in one go:

Each of those deserves (in my opinion) to be implemented, however they might require to be split into smaller PRs for us to review easily. This has been stalled for some time now and I think it is due to the complexity in the review process. Difficult to approve so many changes at once.

What do you (core devs, people from GoCardless and from the community) think? Could you give a hand in the thinking process?

I've been using these features and they work fine, however that's not enough. They also have to look good. Tell me what you think and If I could source some of your time

andrewrjones commented 2 years ago

Hey @dinigo,

Sorry also from outside for not taking a look at these contributions sooner, which is partly why they have grown so big.

I agree this is too hard to review now, whereas smaller PRs like #60 are much easier.

I think all the changes you are proposing are the right ones to do (from your bullet points above), so if we had PRs for each we should be able to review and merge them.

Thanks!

dinigo commented 2 years ago

@andrewrjones , I have split some of the functionality in three other PRs, namely #67 , #68 and #69. Prior to do anything else I need some assessment on them