dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.19k stars 1.41k forks source link

Runtime / dynamic asset partitions #7943

Closed sryza closed 1 year ago

sryza commented 2 years ago

I've seen a couple cases recently where it would be helpful to have "runtime" asset partitions. I.e. a partition is added to an asset when a job runs, rather than at definition time.

What we've heard

We have discussed having a static config and generating the assets but our customer base is not static (and infact we actually partition again inside a particular customer).

We would effectively have to redeploy the entire repository anytime we created a new customer, which in the long run is unsustainable.

Are there plans to support dynamically partitioned Assets in the future or a way to do the following with current features? In essence, I need to process experiment results and would like to use partitioned assets for that. Every few days (inconsistent) an experiment result will be made available for Dagster to process, and I would like to handle each result as a separate partition. The list of partitions would have to grow dynamically and be re-fetched before kicking off runs for missing partitions.

I am facing a similar use case. We have external service that generate datasets, and the dagster pipeline is to pick up each dataset and run through a set of ops.

hi, I am trying to pass input files (file key really) through a processing pipeline, generating the original and each intermediate step as an asset (with keys like file1_step1, file1_step2) so I have an ingest job that takes the file key and applies the graph, and the steps are called correctly, I'm just not sure how to use assets in this case

as far as I can tell, the docs treat software defined assets as a fixed thing, with a key given by the function name -- not a template for generating dynamic assets based on input (edited)

that's the part that kills it for me -- if I come back later and materialize one of those virtual assets, it's not possible (I think) to automatically rematerialize downstream assets that depend on it, because they're not linked -- so what's the value of having assets in the first place then?

there are a few assets that it's helpful for me to write custom logic for, but mostly it's just the same logic over and over again on a different S3 prefix. not sure if I'm misunderstanding something fundamental but none of the docs seem to really cover it

the data I’m working with is sets of microscope images that are pretty diverse in their context (in terms of acquisition, subject, etc.) but for which there are also some standard algorithms that can be run over them. I’m not sure what the implications of trying to group them all together into a single “image” asset and then attempting to partition them appropriately would mean for us

Briefly, I have a pipeline for processing the files in a manifest, and the number of manifests will grow over time. Each manifest should be processed only once unless it’s updated. I’m wondering how to best handle the dynamic nature of my inputs (i.e. the file manifests).

Hello, I would appreciate if someone could point me in the right direction here as I am new to Dagster. I want to create a new asset (text file 2) from a source asset (text file 1). Text file 1 is stored in AWS s3 and I also want to store text file 2 in AWS s3. The files are named by a unique ID (e.g. id123456.txt), and new files show up in AWS s3 daily. The id also shows up in a database before the file shows up in AWS s3. I would like to be able to create the new asset (text file 2) from all existing source assets (text file 1) and from any new source assets (text file 1) that show up each day. Could anyone describe to me how I should be thinking about this? In my head I am thinking that I should start by using the ids in the database table to define a partitioned asset? (edited)

Design / implementation

Two flavors of this we could imagine:

The value of modeling these as partitions, vs., say, just config:

You could imagine an interaction between this and dynamic mapping - where each mapped step corresponds to one of the dynamic asset partitions.

We might want a special PartitionsDefinition for this. E.g.

@asset(partitions_def=RuntimePartitionsDefinition())
def my_asset():
    ...
kevinjdolan commented 2 years ago

I would like to second this. There is the abstract base class PartitionsDefinition, which can easily be extended by a user, but this will throw the error: dagster._core.errors.DagsterInvalidDefinitionError: Only static partition and time window partitions are currently supported..

I have a partition use-case that is essentially the cross-product of quarters and targets, and I had expected this to work, but the above error clearly does not allow for such arbitrary use-cases:

class CustomPartitionsDefinition(dagster.PartitionsDefinition[CustomPartition]):

    def __init__(
        self, 
        start_date: datetime.date = DEFAULT_START_DATE,
        categories: list[Category] = CATEGORIES,
    ):
        self.start_date = pendulum.instance(start_date, tz='utc')
        self.categories = categories

    @iterutil.list_func
    def get_partitions(
        self, 
        current_time=None,
    ):
        for quarter in self.get_quarters(current_time):
            for category in self.categories:
                yield CustomPartition(
                    category=category.category,
                    year=quarter.year,
                    quarter=quarter.quarter,
                )

    @iterutil.list_func
    def get_quarters(
        self, 
        current_time=None,
    ):
        date = self.start_date
        if not current_time:
            current_time = datetime.datetime.now()
        end = pendulum.instance(current_time, tz='utc')
        print(date, end)
        while date <= end:
            yield date
            date = date + QUARTER_DELTA

I would be happy to contribute to the project to make a fix, if I could better understand why this limitation exists in the first place. I don't see any explanation in the code.

kevinjdolan commented 2 years ago

An alternative for my particular use-case would be able to chain multiple partition defs together such that all combinations are produced.

sryza commented 2 years ago

Hey @kevinjdolan - this issue might also be related to what you're looking for: https://github.com/dagster-io/dagster/issues/4591

If I could better understand why this limitation exists in the first place.

It's essentially for performance / reliability reasons in Dagit. Right now, loading pages in Dagit (almost) never requires invoking user code. Supporting arbitrary PartitionsDefinitions would mean that every time someone goes to an Asset Details page, we'd need to call out to the user code container.

As a workaround for the short term, would it make sense to define a StaticPartitionsDefinition that includes the cross-product of all the partitions?

nickvazz commented 1 year ago

To add another use case from a question I asked on slack

Is it possible to add a partition when a job is created? If I had a job that had a dynamic partitioning scheme that was based off of buckets sub-directories that exist, and the job can/will create one of those sub-directories on its first run, can the job when it runs add an additional key to that dynamic partition?

askvinni commented 1 year ago

Summarizing the offline discussion I had with @sryza here to ensure it doesn't get lost after the slack time window:

For the specific use cases at @mushlabs, the bulk of our data is coming from lab experiments; a mix of excel and xml files from the lab systems and manually inputted by the research scientists. This means any data source will lead to a number of assets.

The partition identifier for each of these assets should always match. Technically speaking, they're currently materialized with @multi_assets, meaning all assets inside the multi_asset will have the same partition definition. Some of these assets have downstream assets that would have the same partition, but not all. That depends on how much processing is necessary before loading to the DB. From the moment they get loaded into snowflake, they become unpartitioned dbt models.

I could see a case coming in the future for different partition definitions between related assets, if I think of the incoming use case, I could see there being a different mapping between runtime partitions. These would likely only be decided once the downstream asset gets materialized. In practical terms, the results of an experiment/production run would pass on to the product development team, that would reuse them and possibly generate N assets with independent data.

There's two ways I could see the partition being defined:

  1. Through a sensor, schedule or GraphQL — file gets uploaded to shared storage, sensor picks it up (or request is sent to the graphql webhook) and extracts the partition definition from the filename to kick off a run
  2. During the asset runtime — extracted from some information inside the asset

In case the partition definition is set during asset runtime, it could be a little tricky to do backfills as there would be no way of targeting a specific partition, but I could see it being necessary to support both to some extent.

haydenflinner commented 1 year ago

I'm new to dagster so I apologize if this isn't relevant, but what is the difference between this ticket and dynamic_partitioned_config? Doc:

The provided partition_fn returns a list of strings identifying the set of partitions, given an optional datetime argument (representing the current time). The list of partitions returned may change over time.

My impression from reading this was that my function is called pretty often and thus should be fast/cached-by-me. But that in exchange, in real-time date T+6 I could decide that for date T, there is a new partition that wasn't there the last time I processed date T, and that would show up as an unmaterialized asset, at least if I triggered a backfill of my asset for (date T, foo_partition_tag_2 *) (which, if it's not a feature, should be 😄).

Is my callback perhaps not called on some schedule / when the asset is poked, and thus changing the output for a particular input-date requires a reload? The below quote makes me think so but this was in the context of static config, I thought.

We would effectively have to redeploy the entire repository anytime we created a new customer, which in the long run is unsustainable.

Anyway, the above use cases sound very similar/identical to mine, but I see dynamic partition config has been around for a while so I must be missing something. This feature is pretty important for me, to the point that the most critical piece of my data framework will have to run outside of dagster until I find some way to make this work*, so please consider me interested/following 👍

* My partitions vary wildly in size, for my asset customer A on date T might take 2 seconds to process, customer Z 12 hours. I don't want to lump these all together into a daily partition because logically they are not one partition. Customer Z's data might not show up for a day after customer A, but I don't want to waste work re-processing customers A-Z when Z's data dump shows up a day later than everyone else's. I might be able to model this whole thing as a daily/hourly Op that determines at runtime what needs to be updated but then I'm missing out on all of the Asset + partition modelling within Dagster and using it as more of a task-runner.

Edit: This seems to have a lot of overlap with graphs / runtime graphs, even more advanced 🤯 https://github.com/dagster-io/dagster/issues/462#issuecomment-634851792

alvinyeap commented 1 year ago

I also have use case for this. I have a pipeline that goes something like this:

Currently:

So, instead, what I’ve done is:

However, this is not ideal. It'd be great to have runtime asset partitions implemented so I can get all the benefits of software-defined assets for my pipeline.

thalesfm commented 1 year ago

Same issue here. We'd love to model our data pipelines using software-defined assets, but at the moment that's simply not possible without dynamic partitions... In our case the partitions need to be configurable by the end-user, and we can't have end-users editing our code now, can we :sweat_smile:

kevinschaich commented 1 year ago

Adding my 2¢ here – I think there is a lot we can learn from the world of distributed computing systems.

Each of these are at their core a set of arbitrary input files which can be collected together to form the entire contents of a dataset, where each partition/file is mapped to executors to enable operations through tasks (filtering, joining, adding columns, etc.). In the case of a single-executor cluster (your laptop), the map step still happens, but all on one machine (i.e. if you have 100 input partitions/files, it will run 100 times). Each step can repartition() its inputs and the number of output partitions can differ from the number of input partitions – this is actually critical because if you have a step where the amount of data increases by an order of magnitude, you need to increase the number of partitions in order to make computation of a single partition feasible on a single executor.

I'm new to Dagster, so forgive me if I've gotten this wrong – but I think the architecture decision that's making this FR challenging to implement is that Dagster's Software-Defined Assets refer to a single file/resource:

An asset is an object in persistent storage, ... such as a file

I'm not intimately familiar with how partitions are regitsered and accessed within Dagster, but to build a flexible partitioning system on top of that constraint + having to know the set of partitions prior to materialization is challenging. What if we instead extend the definition of an "Asset" to:

A collection of objects in persistent storage, ... such as a set of files

The API doesn't need to change drastically here. Each asset has an AssetKey and a PartitionKey – in the case of single-file assets, PartitionKey can be set to something static such as SinglePartition or None. To illustrate how this might look for the Hello Dagster example:

from dagster import Output, asset

@asset
def hackernews_top_story_ids():
    """
    Get top stories from the HackerNews top stories endpoint.
    """
    top_story_ids = requests.get("https://hacker-news.firebaseio.com/v0/topstories.json").json()
    for story in top_story_ids:
        yield Output(value=story, asset_key='hn_stories', partition_key=story.id)

Now, instead of a single file with all IDs, the hackernews_top_story_ids asset is comprised of N files where each contains the ID for one story. If we register this metadata in the Assets Catalog, any downstream dependency that later looks up hn_stories could understand _"this is a multi-partition asset, and I should also fetch any related partition_keys to get the entire contents of this asset"_. This is a pretty flexible API that can support a wide range of users' needs – your partition key could be anything you can represent in Python primitives like int, string, or a combination like users-pageNumber-date – and your number of input/output partitions for a single op can differ.

One more helpful abstraction here might be to introduce an operation= keyword argument to @asset and @op to tell that function whether it should operate on all partitions or a single partition independently (I think the default behavior currently in Dagster for time-based partitions is analagous to map since you can backfill only a selected set of partitions):

@asset(operation='reduce')
def my_function():

# or

@asset(operation='map')
def my_function():

Now you'd have total control at each step of execution – an op can either match, increase, or decrease the number of output partitions dynamically.

Let me know what you think.

sryza commented 1 year ago

Hey @kevinschaich

I'm not intimately familiar with how partitions are regitsered and accessed within Dagster, but to build a flexible partitioning system on top of that constraint + having to know the set of partitions prior to materialization is challenging. What if we instead extend the definition of an "Asset" to: A collection of objects in persistent storage, ... such as a set of files

We need to message this in our docs better, but this idea of an asset as being able to represent a collection of objects is actually where we're at currently. I recently wrote up this Github discussion on how to think about this here: https://github.com/dagster-io/dagster/discussions/12061.

As many have pointed out in this thread, we still have the limitation that the set of partitions must be static. But we're aiming to remove this in the next few weeks by introducing MutablePartitionsDefinition: https://github.com/dagster-io/dagster/pull/12000.

Regarding your "operation" suggestion, the way we would expect this to be implemented right now in Dagster is with a PartitionMapping. E.g. right now, if a non-partitioned asset depends on a partitioned asset, the default partition mapping will have the non-partitioned asset depend on all partitions of the upstream partitioned asset.

That all said, I don't think we ever expect to implement a MapReduce/Spark-style "shuffle" in Dagster. Dagster is an orchestrator and not a data-processing engine itself.

sryza commented 1 year ago

Closing this issue because this is now in Dagster! (With an experimental tag).

Check out the docs here: https://docs.dagster.io/concepts/partitions-schedules-sensors/partitions#dynamically-partitioned-assets.

And an example here: https://github.com/dagster-io/dagster/tree/master/examples/assets_dynamic_partitions