Closed samLozier closed 2 years ago
i think i found the answer to my first question, but it prompts another. If anyone else finds this, if you want to use the deps operator, you need to read this
So my follow up questions are:
Here's my current understanding of how it works:
hey @samLozier! Seems like you found the answer to your first question, but just to add more details:
As Tasks in Airflow are supposed to run independently, we cannot ensure that the artifacts produced by one task will be available to a downstream task[1]. At least, not without us saving the artifacts somewhere: which is what we do when running DbtDepsOperator
with an S3 project URL. This way we can have a task run dbt deps
, and push back the artifacts to the original S3 project URL for downstream tasks to pull.
The solution is not perfect, as we don't detect when a dependency is already available (to save some work). As an alternative: you could also run dbt deps
as part of your CI/CD pipeline and just push the dbt_packages
directory together with the rest of your project to S3.
With that clarified, let's go to your questions:
Is this flow circular?
I'm not seeing any circular flows: essentially, DbtDepsOperator
does:
dbt deps
in temporary directory.May I ask for clarification?
Subsequent runs will repeat this flow as there is currently no way for DbtDepsOperator
to check if the dbt_packages are already installed. We could add this if you would find it useful. Alternatively, you could write a branch task that checks the S3 Project URL first to see if the deps are there, and skip DbtDepsOperator
if they do.
Can a zipped dbt project work with this flow, or does it need to be uncompressed?
Both zipped and uncompressed should be supported, we use zipped!
push back to 1? Or can/should we aim for another bucket so that flows are acyclic ?
Pushing to a different bucket/S3 path is not supported. There is no technical limitation to adding support for something like this. It would require you to have two copies of your dbt project: one basic (w/o deps installed) project to use with DbtDepsOperator
and one full project to use with downstream tasks. DbtDepsOperator
would then pull from the basic project and push to the full project.
However what advantage would this workflow bring over just using one project? I'm personally not seeing the value but I'm more than willing to support other workflows if the need is there!
next run will have deps installed from prior run?
As long as your S3 project URL stays the same (and the files were not manually deleted and/or modified), you should expect deps to be installed from a previous run.
Hoping some of these answers help you! Thank you for your reports.
1: If you are running with a local executor or running only 1 worker with a remote executor, you could probably ensure this, but MWAA, and most production deployments, will not be using local executors.
There were a lot of points, so I replied inline below. Happy to clarify further if it would be helpful.
hey @samLozier! Seems like you found the answer to your first question, but just to add more details:
As Tasks in Airflow are supposed to run independently, we cannot ensure that the artifacts produced by one task will be available to a downstream task[1]. At least, not without us saving the artifacts somewhere: which is what we do when running
DbtDepsOperator
with an S3 project URL. This way we can have a task rundbt deps
, and push back the artifacts to the original S3 project URL for downstream tasks to pull.The solution is not perfect, as we don't detect when a dependency is already available (to save some work). As an alternative: you could also run
dbt deps
as part of your CI/CD pipeline and just push thedbt_packages
directory together with the rest of your project to S3.
this is what I've been doing, but I was curious to test the operator and see if there was a benefit. It sounds like maybe not.
With that clarified, let's go to your questions:
Is this flow circular?
I'm not seeing any circular flows: essentially,
DbtDepsOperator
does:
- Create temporary directory.
- Fetch project from S3 project URL into temp dir.
- Run
dbt deps
in temporary directory.- (Optional) zip (if the project was zipped).
- Push project back to S3 project URL.
May I ask for clarification?
Yes, but circular all I meant was that it appears to read and then write back to the same directory. While unlikely, this could in theory create a situation where an error that corrupts the project in airflow gets pushed back to s3 and then corrupts all subsequent runs of the same dag. I view the ci flow uni directional: push code > test code> push code to s3 > airflow reads from s3 > airflow runs dbt. In a way, the current structure could allow the code to modify itself which, IME, could create a tricky to find bug. ^^ this is what I was thinking when I asked the initial question.
Now that I know that it doesn't check for dependencies prior to subsequent runs, I'm less worried about it, but still view it as a possible edge case.
Subsequent runs will repeat this flow as there is currently no way for
DbtDepsOperator
to check if the dbt_packages are already installed. We could add this if you would find it useful. Alternatively, you could write a branch task that checks the S3 Project URL first to see if the deps are there, and skipDbtDepsOperator
if they do.
The latter may be interesting, but the former is actually what I was thinking should be avoided.
Can a zipped dbt project work with this flow, or does it need to be uncompressed?
Both zipped and uncompressed should be supported, we use zipped!
so it pulls from s3, unzips, runs, re-zips and pushes back to s3? Based on your comment this is now my understanding, I just wanted to be 100% clear.
push back to 1? Or can/should we aim for another bucket so that flows are acyclic ?
Pushing to a different bucket/S3 path is not supported. There is no technical limitation to adding support for something like this. It would require you to have two copies of your dbt project: one basic (w/o deps installed) project to use with
DbtDepsOperator
and one full project to use with downstream tasks.DbtDepsOperator
would then pull from the basic project and push to the full project.However what advantage would this workflow bring over just using one project? I'm personally not seeing the value but I'm more than willing to support other workflows if the need is there!
I was just thinking about the issue I raised above. IMO this is not worth spending time on since it can be avoided by just running dbt deps in CI
next run will have deps installed from prior run?
As long as your S3 project URL stays the same (and the files were not manually deleted and/or modified), you should expect deps to be installed from a previous run.
Hoping some of these answers help you! Thank you for your reports.
1: If you are running with a local executor or running only 1 worker with a remote executor, you could probably ensure this, but MWAA, and most production deployments, will not be using local executors.
Yes, but circular all I meant was that it appears to read and then write back to the same directory. While unlikely, this could in theory create a situation where an error that corrupts the project in airflow gets pushed back to s3 and then corrupts all subsequent runs of the same dag.
Ah, yeah, I see what you mean. Of course, errors while executing dbt are safe as the operator will just fail without pushing anything back to S3, but a bug on the operator itself could certainly cause issues. Even though I don't think such a bug exists, we could extend our unit test suite to avoid introducing bugs. For example, We currently only check for the existence of files after the operator runs, without reviewing the rest of the project.
Moreover, before pushing anything back, we could also ensure that DbtDepsOperator
only modified files in your dbt packages directory. This way, in the worst case, we corrupt dbt packages that can be easily cleaned and re-downloaded.
I can use this issue to track development towards more testing for the operator and making this pre-push check.
so it pulls from s3, unzips, runs, re-zips and pushes back to s3? Based on your comment this is now my understanding, I just wanted to be 100% clear.
Exactly.
I'm doing some development of project safety checks over at: https://github.com/tomasfarias/airflow-dbt-python/tree/safety-checks-for-deps-and-docs.
Branch also extends unit testing to check for modified files when running DbtDepsOperator
.
Just brainstorming here, but is it possible to pass the deps directory via xcom? IMO that might offer a few benefits. It would remove any "write back to source" issues, and it would make the state of the deps directory more intentionally ephemeral so that artifacts of a prior dbt deps run couldn't cause any issues with a subsequent node.
One thing i'm unclear about (and I haven't had time to check the code) is how exactly the files are copied to s3. EG is it a file by file copy only, or does it empty the directory first and then copy over the newly generated files?
but is it possible to pass the deps directory via xcom?
I'd say it depends on your XCom backend: if you are using the default Airflow metadata database, that may not be the best place for a directory of unknown size, leaving aside the issues of serializing a directory as JSON.
However, if you are using a custom XCom backend, then it may be very doable. In fact, S3 may be used as a custom backend and then it would be pretty much like what we are doing right now (but with the XCom API in front): see this example from Astronomer that even uses the same S3Hook we do here.
Personally, I think there is some value in supporting this for custom XCom backends, but leaving it up to the user to implement an XCom backend that works for them and can handle directories.
One thing i'm unclear about (and I haven't had time to check the code) is how exactly the files are copied to s3. EG is it a file by file copy only, or does it empty the directory first and then copy over the newly generated files?
We do a file by file copy, replacing files in case of conflict. Relatively safe if you have S3 versioning enabled, but not so much if you don't, so I can consider making the replace optional and ignoring files that already exist in S3.
This issue has been open for quite a while and there have been no new questions.
I've taken some feedback to produce more unit testing to ensure we maintain a project status when running DbtDepsOperator
, and have also gone with the idea of supporting optional replacing (by adding a new parameter). PR #31 will be closing this issue as it implements said testing and feature.
As also discussed in this issue, we may later add support for custom backend, but first I'll need to look into it a bit more (PRs and new issues are very welcomed!).
To avoid de-railing the issue too much, feel free to open a new issue if you have any more questions @samLozier. Thanks for your input!
@tomasfarias I'm playing around with trying to use the dbt_deps operator to install packages during mwaa run. So far I've been able to get the deps operator to run correctly, but the downstream task, dbt_run will fail due to not being able to find the packages that were just installed? Do I need to hand off the dbt_packages folder via xcom somehow?
I'm happy to add some documentation for the next person, I'm just hoping to get pointed in the right direction.
Thanks again for making this library!