dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.54k stars 1.45k forks source link

Ability to create assets that can only be materialized once #13336

Open TimoVink opened 1 year ago

TimoVink commented 1 year ago

What's the use case?

TLDR: I propose adding a way to mark an asset (or partition of an asset) as non-re-materializable. i.e., it can be materialized once, but never updated afterwards.


Assets are a wonderful way to model your data in these cases:

  1. Your materialization function is pure (in the no-side-effects sense of the word) so running it multiple times with the same inputs and code produces the same output. Example: topstories_word_cloud from the Dagster tutorial.
  2. Your materialization function is not pure, but you only care about the present (happy to overwrite outputs from past materializations). Example: topstory_ids from the Dagster tutorial.

Now consider the following requirement, which is a slight twist on that tutorial:

I want to collect the top Hacker News stories daily, and then produce a word cloud using the content of all top stories ever collected.

Keep in mind here that the hacker news API endpoints used in the tutorial only provides the current top stories, and not historical top stories. In this case the function (grabbing top stories) is not pure, as the API will return different data as time goes on, but you also want to keep all the top stories you've collected in the past.

How do we model this in dagster? Two ways come to mind:

Model non-pure function as @asset naively

There's nothing stopping you from using non-pure code for an @asset and using partitions to represent the "batches" of data you collect daily (especially now that dynamic partitions are available 🎉), but the risk here is that if you were to ever accidentally re-materialize one of those partitions, you would overwrite that data (and it may not be obvious that you've done so).

Model Non-pure Function as @job

Collect the raw data using a @job, write the output to somewhere like S3 (one file or directory per run of the job), and then use a sensor + a dynamically partitioned @asset which simply loads the data from S3. This way running the @job only ever produces new data, and all your @assets are pure functions (in the sense that the same S3 files as input produces the same outputs).

This works, but is somewhat undesirable because now there is not single place in dagit for you to see the end-to-end health of your "pipeline", the job/runs views and asset graph each only tell part of the story and the link between them is implicit via S3, as opposed to explicit like the relationship between assets.

Ideas of implementation

Ideally it'd be possible to mark an asset, or partitions of an asset, as "non-re-materializable". Such assets can be materialized once, but never materialized again.

This way I could create a safe version of the "naively use assets" approach above. I could mark a partitioned asset as non-re-materializable and then use a schedule or other mechanism to create a new partition (+ kick off its materialization) any time I want to add a new batch of data.

Nice features of this approach are that:

Additional information

Examples of real-world scenarios where this would be useful

In any of these cases if your goal is to build a larger dataset over time this feature would be useful.

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

TimoVink commented 1 year ago

I hope I conveyed the type of pipeline I'm looking to model in dagster, and some of the issues you might face today. Of course there are many potential ways to address this, and which one is "correct" mostly depends on how much you believe this should be supported entirely out of the box by dagster, vs. something users need to build themselves with dagster primitives.

Alternative solutions that would also address my use case:

New Type of Asset

This could be modelled as an entirely new type of asset, which has something like the following semantics:

The above feels much more complex. Also I'm brand new to dagster so I could be way off, but my gut feel was that this doesn't fit with how assets and their partitions currently work (e.g. does it even make sense to talk about "materializing" a partitioned asset as whole? Or does it only make sense to talk about "materializing a partition of an asset" in this case?).

UI Changes Only

We could also accomplish a lot of this through UI changes only. e.g., if I had a way to tell dagster that my @job and @asset were linked (maybe through an annotation on the asset?), such that that I can see the status of (the most recent run of) my job in the asset graph, with a link to the run's logs, that would work too.

sryza commented 1 year ago

Thanks for the suggestion @TimoVink - I agree that we should have better support for these kinds of cases. We'll consider this in our next round of prioritization.

Here's an issue I filed a little while ago that I think might have some overlap: https://github.com/dagster-io/dagster/issues/12253.

TimoVink commented 1 year ago

Yes, I think we're describing the same thing. "collection of snapshots" is a great way to describe it!

I hacked together something that basically does what I need. Emphasis on hacked - I'm sure there's much better ways to do this. Sharing in case someone else finds this useful.

The idea is to create a decorator which behaves like @asset, but additionally does a check before running the asset function to see if the asset has been materialized before. If it has, it loads the previously materialized value instead.

from dagster import Definitions, asset
import functools

def _is_initial_materialization(asset, context):
    materialization_counts = context.instance.get_materialization_count_by_partition([asset.asset_key])
    materialization_count = materialization_counts.get(asset.asset_key, {}).get(context.partition_key, 0)
    return materialization_count <= 0

def _try_get_materialized_value(asset, context):
    if _is_initial_materialization(asset, context):
        return (False, None)
    else:
        defs = Definitions(assets=[asset], resources={ 'io_manager': context.resources.io_manager })
        asset_val = defs.load_asset_value(asset_key=asset.asset_key, partition_key=context.partition_key)
        return (True, asset_val)

def materialize_once_asset(*decorator_args, **decorator_kwargs):
    def _outer(asset_func):
        @asset(*decorator_args, **decorator_kwargs)
        @functools.wraps(asset_func)
        def _inner(context):
            is_cached, cached_val = _try_get_materialized_value(_inner, context)
            if is_cached:
                return cached_val

            return asset_func(context)

        return _inner
    return _outer

Usage:

my_partitions = StaticPartitionsDefinition(['foo', 'bar', 'baz'])

@materialize_once_asset(partitions_def=my_partitions)
def my_raw_data(context):
    return uuid.uuid4().hex

@asset
def my_processed_data(my_raw_data):
    return list(sorted(my_raw_data.values()))

If these were normal assets, you would get different UUIDs every time you rematerialized a partition of my_raw_data. But with this decorator you will always get the same 3 UUIDs (unless you wipe materializations for my_raw_data).

Limitations of this quick-and-dirty implementation: