dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.19k stars 1.41k forks source link

in-process execution API that caches materialized assets #13333

Open sryza opened 1 year ago

sryza commented 1 year ago

Relevant requests

zach-jablons-hinge commented 1 year ago

Hate to bother, but has there been any update on this? I'm approaching a point in my work where I need to make a decision on what DAG executor we're using

sryza commented 1 year ago

@zach-jablons-hinge if the following worked, would that satisfy what you're looking for?

You've got a .py file with these contents:

from dagster import asset, DagsterInstance, find_stale_and_missing_assets

@asset(code_version="a")
def asset1():
    ...

@asset(code_version="1")
def asset2():
    ...

if __name__ == "__main__":
    with DagsterInstance.get() as instance:
        materialize(
            [asset1, asset2],
            selection=find_stale_and_missing_assets(instance, [asset1, asset2]),
            instance=instance,
        )

Behavior:

sryza commented 1 year ago

@smackesey - how difficult do you think it would be to implement what's above?

zach-jablons-hinge commented 1 year ago

This largely would work, but I want to clarify a few notes:

If the above two conditions are met, I think this would go a long way towards meeting my needs!

sryza commented 1 year ago

I'd of course want to bust the cache on more than just the code version changing, but also the inputs (either upstream assets or primitive types passed in). Would this support that? I imagine it would based on how Dagster processes this generally, but I'd want to make sure

For upstream assets, that would be possible with observable source assets. For primitive types, you would need to pass those in with config. Our versioning system doesn't currently incorporate config, but we plan to add it.

The instance object gives me a bit of pause, but just to be clear, there's no notion of a permanent instance object that this relies on - i.e., once the instance is destroyed, the caching behavior still works (ideally, based on a cache path specified?)

The instance object is a pointer to some persistent storage that holds metadata about what materializations occurred. By default, it's a sqlite database in whatever directory that DAGSTER_HOME is assigned to, but it can also be backed by Postgres or MySQL. This metadata is needed to track what code and upstream data versions were used when materializing each asset, to determine whether the asset is stale.

smackesey commented 1 year ago

how difficult do you think it would be to implement what's above?

Not hard, but I'd prefer adding methods to AssetSelection over a new top-level function, something like:

with DagsterInstance.get() as instance:
    materialize(
        [asset1, asset2],
        selection=AssetSelection.new_upstream_data() | AssetSelection.new_upstream_code() | AssetSelection.missing()
        instance=instance,
    )

It would be nice to have one word/phrase to encompass the varieties of new stuff upstream, but after discarding "stale" what would that be-- upstream_changed?

zach-jablons-hinge commented 1 year ago

For upstream assets, that would be possible with observable source assets. For primitive types, you would need to pass those in with config. Our versioning system doesn't currently incorporate config, but we plan to add it.

I'm not super clear about the "observable source assets" part of this - if in your example I changed asset1 to be def asset1(non_asset=4), would it re-run asset1? (I would expect so)

What if I then wanted to provide non_asset = 7 when I materialized it? (I would expect asset1 and asset2 to re-run, although it's unclear if this would mean config).

I can write out a few sort of 'test cases' of caching behavior I'm seeing in another option that would be great to see in Dagster, if that would be helpful.

sryza commented 1 year ago

I can write out a few sort of 'test cases' of caching behavior I'm seeing in another option that would be great to see in Dagster, if that would be helpful.

That would definitely be helpful. I'm also curious about what the other option is that you're considering.

sryza commented 1 year ago

Not hard, but I'd prefer adding methods to AssetSelection over a new top-level function, something like:

This is probably the right way to do things in the long run, but this is a change that I wouldn't want to rush, because it's much more invasive than a new top-level function: it means that AssetSelection.resolve needs to take a DagsterInstance, and, more abstractly, the asset selection becomes relative to a particular deployment at a particular time.

smackesey commented 1 year ago

the asset selection becomes relative to a particular deployment at a particular time.

Isn't that already the case? e.g. if I change the dependencies on an asset then AssetSelection.upstream() will resolve differently.

zach-jablons-hinge commented 1 year ago

I'm also curious about what the other option is that you're considering.

There were a few (e.g. Prefect, Metaflow, Argo, SageMaker Pipelines), but the behavior I'm really looking for is from something I wrote a while back that I'm not planning/hoping to re-use.

That would definitely be helpful.

I took some of the test cases I had for the other option and adapted them to maybe make more sense in a Dagster context: https://gist.github.com/zach-jablons-hinge/08e3c954fcc2cf482e7a1e47ea782564

sryza commented 1 year ago

Isn't that already the case? e.g. if I change the dependencies on an asset then AssetSelection.upstream() will resolve differently.

I'm distinguishing between the "deployment" and the "code location". The code location is just the Python package that's importable and contains all the definitions. The deployment includes the event log. A couple reasons this distinction matters:

sryza commented 1 year ago

@zach-jablons-hinge - here's some code that accomplishes much of what's discussed above:

from typing import Sequence, Union

from dagster import (
    asset,
    DagsterInstance,
    AssetsDefinition,
    SourceAsset,
    materialize,
    load_assets_from_current_module,
    AssetSelection,
)
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.data_version import CachingStaleStatusResolver, StaleStatus

@asset(code_version="a")
def code_versioned_asset_1():
    ...

@asset(code_version="1")
def code_versioned_asset_2(code_versioned_asset_1):
    ...

def materialize_stale_and_missing(
    assets: Sequence[Union[AssetsDefinition, SourceAsset]], instance: DagsterInstance
):
    asset_graph = AssetGraph.from_assets(assets)
    staleness_resolver = CachingStaleStatusResolver(instance, asset_graph)
    stale_and_missing_selection = AssetSelection.assets(
        *(
            asset
            for asset in assets
            if isinstance(asset, AssetsDefinition)
            and any(staleness_resolver.get_status(key) != StaleStatus.FRESH for key in asset.keys)
        )
    )

    return materialize(
        assets,
        selection=stale_and_missing_selection,
        instance=instance,
    )

if __name__ == "__main__":
    assets = load_assets_from_current_module()

    with DagsterInstance.get() as instance:
        materialize_stale_and_missing(assets, instance)

We ultimately want this to be part of Dagster's public API so you don't need to include this boilerplate, but finalizing user-facing can take some time because we want to make sure the names are consistent with other parts of our naming scheme, etc. So I wanted to share this with you to unblock you.

Happy to find a time to chat through this on Zoom if it would be helpful.

zach-jablons-hinge commented 1 year ago

Hey thanks - I'll try this out (and can always wrap it up into something nice myself in the meantime) and let you know if it passes the tests I have in mind.

zach-jablons-hinge commented 1 year ago

Hey @sryza, so I've been trying this out and am hitting a few snags that might be easy to resolve, but I'm not familiar enough w/Dagster to do so:

Configuration

At a high level, how would one go about passing in something like a configuration s.t. the caching behavior is preserved? Looking through the documentation, it's not clear to me how one would pass it in through e.g. run_config. If I try something simple like

@asset
def code_versioned_asset1(a=4):
  return 1 + a

I get dagster._core.errors.DagsterInvalidDefinitionError: Input asset '["a"]' for asset '["code_versioned_asset_1"]' is not produced by any of the provided asset ops and is not one of the provided sources. What's the best way to pass something like that in?

The way I've done it in the other framework is to pass in a nested dictionary that specifies, for each named node, the arguments (as seen in the test cases I provided) - is it possible to do something like that with Dagster?

For the below, I've setup with this script.

Getting results from cached nodes

I've found that if I run

dagster_test.py code_versioned_asset_1 twice, the first time I'll get the output I expect, but when it's now cached, output_for_node no longer has that, and I get dagster._check.CheckError: Invariant failed. Description: __ephemeral_asset_job__ has no op named code_versioned_asset_1.

Is there a way to get results from both cached and not cached nodes? One could imagine iterating on a function that requires the outputs from 2 nodes (e.g., working on something that runs inference for a model on a given input) and one of the nodes is cached.

Is there a way to maybe get the value of a stored asset? Maybe this is obvious from the documentation, but I haven't found it.

Running only upstream assets

This might be a bit simpler, but I've tried (and you can see the commented out code there) to have stale_and_missing_selection come from only the upstream assets for a node. One could imagine a case where you're iterating on a specific feature set, but don't yet want to run the rest of your DAG - in this situation, ensuring that you're not just re-running everything (including downstream dependencies) can save a lot of time.

To do this, I tried to use the upstream Asset selection, but the simple issue is that integrating that into stale_and_missing doesn't work as I can't iterate over it. Maybe there's a more correct way to solve this, but the simplest option of turning that KeysAssetSelection into something iterable would solve for this.

I'm happy to take some time to go over this stuff on a Zoom call, I'm fairly free tomorrow (4/21) or next week.

sryza commented 1 year ago

At a high level, how would one go about passing in something like a configuration s.t. the caching behavior is preserved?

Right now, config is not incorporated into data versions. Here's an issue to track adding this: https://github.com/dagster-io/dagster/issues/13826. We might be able to add this soon.

In terms of how to specify config in general, here are docs: https://docs.dagster.io/concepts/configuration/config-schema#using-software-defined-assets.

Is there a way to maybe get the value of a stored asset? Maybe this is obvious from the documentation, but I haven't found it.

Would this work for you? https://docs.dagster.io/concepts/assets/software-defined-assets#loading-asset-values-outside-of-dagster-runs

This might be a bit simpler, but I've tried (and you can see the commented out code there) to have stale_and_missing_selection come from only the upstream assets for a node.

You can do something like:

final_selection = stale_and_missing_selection & upstream_assets

I'm happy to take some time to go over this stuff on a Zoom call, I'm fairly free tomorrow (4/21) or next week.

I'll ping you on Slack about finding a time.

zach-jablons-hinge commented 1 year ago

Alright, so I got the last two items addressed then in here - and except for the configuration aspect, this is working pretty well!