dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.49k stars 1.45k forks source link

Associate each model in dbt to its own `op`, rather than running the whole project in a single op #12070

Open nsfinkelstein opened 1 year ago

nsfinkelstein commented 1 year ago

What's the use case?

The current implementation of the dagster / dbt integration associates the full dbt project to a single operation. This creates a bottleneck in the graph, meaning that assets can no longer be run as soon as their asset dependencies are met if those dependencies include dbt models. Instead, they can only be run once the whole dbt run is finished.

For example, consider the asset graph:

fast_dbt_model -> slow_dagster_asset

slow_dbt_model -> fast_dagster_asset

Ideally we'd want the slow dagster asset to run as soon as the fast dbt model is done, but under the current implementation it has to wait until the slow dbt model is done as well.

In addition, by my understanding the current implementation can't allow for situations where a dagster asset is both downstream of upstream of dbt models from the same project. Because the dbt project is run by a single op, this would create a cycle in the op graph. This is a usecase that has come up in our pipeline.

Ideas of implementation

Instead of associating the full dbt project to a single op that invokes of dbt run, each model can be associated to its own op that invokes dbt run --select <model_name>, potentially with graph operators in certain uses.

Additional information

I'd be open to submitting a pull request, but would like to learn more about how the current design was decided, and would appreciate pointers about how to implement this feature efficiently.

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

jamiedemaria commented 1 year ago

Thanks for the request! Tagging @OwenKephart and @rexledesma for visibility

OwenKephart commented 1 year ago

Hi @nsfinkelstein ! At a high level, some quick reasoning for keeping things together in a single op:

  1. Per-model overhead -- there is inherent overhead involved in creating a new op. At the minimum, this involves forking a new process (with the multiprocess_executor), but other executors can incur even larger overhead per-step.
  2. dbt domain expertise -- dbt is able to make more informed decisions on how to efficiently execute nodes in the dbt graph (based on max parallelism of the underlying database) than Dagster is.
  3. Conceptual simplicity -- (this one is more debatable) but I think it's generally a bit easier to understand what's going on in your dbt execution if you have a single step you can point to that contains all of the relevant dbt logs. This way, dagster isn't doing anything particularly different from what you would do to run your dbt models locally, you can just copy/paste the exact command that Dagster is running and things will work as expected.

Regarding a model that is both upstream and downstream of a model in the dbt graph, this issue is actually handled automatically for you. When Dagster notices a cycle in the underlying operational graph, but no cycle exists in the asset grpah, we attempt to resolve this cycle by breaking up subsettable multi-assets (such as the dbt multi asset) into separate operations to resolve the op cycle (generally trying to keep larger chunks together where possible).

The initial issue, of downstream operations not starting until ALL outputs have been emitted from a step is a bit trickier, and I have two thoughts on this (echoing a similar comment from slack):

  1. We may want to change this behavior in the core code (I don't have a sense of how difficult this would be, but it does seem logical that downstream steps where all the upstream outputs have been emitted should be able to be kicked off)
  2. It may be possible to sidestep this problem with the help of declarative scheduling, which doesn't adhere to strict pipeline structures, and instead waits for upstream assets to be materialized before kicking off runs of downstream assets. This would result in the dbt assets, fast asset, and slow asset to be materialized by separate runs (at least in some cases), meaning that this in-run behavior would not be relevant.

I do think there are potential situations where splitting each model into its own op would make sense, but I think there are significant drawbacks, so I'd want to explore other options before recommending that in most cases.

nsfinkelstein commented 1 year ago

Thanks for the detailed reply, and for the explanation of the downsides to treating each dbt model as its own op. I completely understand why you would want to avoid this approach.

Does dagster-dbt have a way of registering when each dbt model is completed (e.g. by looking at the dbt logs)? If so, it seems that it should be possible to trigger downstream runs after only the appropriate dbt models have finished building. I understand this would require a change to core. Is this what you had in mind? I'd be very happy to take a look at putting in a pull request along these lines, if you have any pointers of where to start looking. Thanks.

------- Original Message ------- On Friday, February 3rd, 2023 at 4:09 PM, OwenKephart @.***> wrote:

Hi @.***(https://github.com/nsfinkelstein) ! At a high level, some quick reasoning for keeping things together in a single op:

  • Per-model overhead -- there is inherent overhead involved in creating a new op. At the minimum, this involves forking a new process (with the multiprocess_executor), but other executors can incur even larger overhead per-step.
  • dbt domain expertise -- dbt is able to make more informed decisions on how to efficiently execute nodes in the dbt graph (based on max parallelism of the underlying database) than Dagster is.
  • Conceptual simplicity -- (this one is more debatable) but I think it's generally a bit easier to understand what's going on in your dbt execution if you have a single step you can point to that contains all of the relevant dbt logs. This way, dagster isn't doing anything particularly different from what you would do to run your dbt models locally, you can just copy/paste the exact command that Dagster is running and things will work as expected.

Regarding a model that is both upstream and downstream of a model in the dbt graph, this issue is actually handled automatically for you. When Dagster notices a cycle in the underlying operational graph, but no cycle exists in the asset grpah, we attempt to resolve this cycle by breaking up subsettable multi-assets (such as the dbt multi asset) into separate operations to resolve the op cycle (generally trying to keep larger chunks together where possible).

The initial issue, of downstream operations not starting until ALL outputs have been emitted from a step is a bit trickier, and I have two thoughts on this (echoing a similar comment from slack):

  • We may want to change this behavior in the core code (I don't have a sense of how difficult this would be, but it does seem logical that downstream steps where all the upstream outputs have been emitted should be able to be kicked off)
  • It may be possible to sidestep this problem with the help of declarative scheduling, which doesn't adhere to strict pipeline structures, and instead waits for upstream assets to be materialized before kicking off runs of downstream assets. This would result in the dbt assets, fast asset, and slow asset to be materialized by separate runs (at least in some cases), meaning that this in-run behavior would not be relevant.

I do think there are potential situations where splitting each model into its own op would make sense, but I think there are significant drawbacks, so I'd want to explore other options before recommending that in most cases.

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you were mentioned.Message ID: @.***>

OwenKephart commented 1 year ago

Hi @nsfinkelstein ! We actually just merged in a change along those lines, allowing us to emit AssetMaterialization events as dbt creates the tables (rather than waiting until the end of execution). This behavior will only exist for dbt versions 1.4+ (as they added richer logging recently!)

nsfinkelstein commented 1 year ago

@OwenKephart Sounds great, thanks! That resolves the main feature I was asking about. Out of curiosity, can you share a link to the pull request you mentioned?

OwenKephart commented 1 year ago

Ah forgot to link the issue: https://github.com/dagster-io/dagster/issues/11752

owensilk commented 1 month ago

We would like the compute an asset within our dbt project using a python application. Is there a way to split all the models into separate assets so we can create a dagster asset with in?

OwenKephart commented 3 weeks ago

hi @owensilk -- if you define a python asset that has both upstream and downstream dependencies that are part of the same dbt multi-asset, then dagster will automatically break the dbt asset up into two separate operations to make the underlying compute graph work.

however, if you still want to split up your dbt models into individual assets for other reasons, there are not built-in utilities to do this.

dluo-sig commented 3 weeks ago

I stumbled upon this while researching a separate issue. The problem that I'm running into is that if the prerequisites on any of the dbt assets fail, then none of the dbt assets will run. If these were seperate ops, then I believe dagster figures that out and would only run the ops that don't have failed prerequisites. Can there be a setting to configure this behavior, or is there some other way to work around this? Maybe I'm missing something here for how we would schedule downstream dependencies.

OwenKephart commented 1 week ago

Hi @dluo-sig -- currently a failure in any op will cause the entire run to fail (although in-progress ops may be allowed to complete in some cases), so I would consider the issue you're describing as more general than what's being described here. There's unfortunately not currently configuration available to change this behavior, although this is something we've gotten frequent requests for and is on our radar to enable.

dluo-sig commented 1 week ago

From what we've observed, other ops in the job will still run, as long as dependencies haven't failed. So if there were separate ops instead of one op, I believe it should solve the problem. I don't think it makes sense to force run dependencies if something else fails, so that doesn't sound right either. For example, if a data check fails, we wouldn't want to run downstream models that depend on the failed validations, but if other sources are good, those dependent models should still run.