dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.53k stars 1.45k forks source link

FreshnessPolicy in MultiPartitioned assets #13219

Open rubenbriones opened 1 year ago

rubenbriones commented 1 year ago

What's the use case?

It would be great to attach a FreshnessPolicy to assets with a MultiPartitionsDefinition, in which one of the partitions is a TimeWindowPartition.

Ideas of implementation

No response

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

smackesey commented 1 year ago

cc @OwenKephart

OwenKephart commented 1 year ago

To gather some more information here -- what would be your desired behavior? The most natural thing (to my me, at least) would be something along the lines of only treating a given time window partition as "filled" if all of the secondary partitions were filled.

With this definition of filled, the freshness policy would dictate that all time window partitions that are at least N minutes old (or whatever cron schedule) must be filled.

Is this essentially what you had in mind?

rubenbriones commented 1 year ago

Hi @OwenKephart, totally agree.

mgierada commented 1 year ago

Hi @rubenbriones I have been thinking about similar feature request but my use case is more broad. Anyway, great idea for FR 💪

@OwenKephart Let me add a few more use cases to give you some food for though:

Let's say I have a StaticPartitionsDefinition which is partitioned by custom identifier. I would love to attach FreshnessPolicy to asset that implements that partition definition, so I can define how late partitions can be based on some upstream materialisations.

Here's some pseudo code:

def custom_static_partition():
    return StaticPartitionDefinition(array_of_ids)

@asset(partition_def=custom_static_partition(), ins={"dep_asset": AssetIn("dep_asset")})
def some_asset(dep_asset):
    ...
    return pd.DataFrame(some_data)

reconciliation_sensor = build_asset_reconciliation_sensor(
    asset_selection=AssetSelection.all(),
    name="some_sensor",
    minimum_interval_seconds=# whatever, let's say half a year, I want run it not so often.
    default_status=DefaultSensorStatus.RUNNING,
)

my_definitions = Definitions(
assets = [some_asset],
sensors = [reconciliation_sensor])

My ultimate goal is to get rid of schedules completely and use build_asset_reconciliation_sensor to manage my partition state. In that specific case, I am more interested in FreshnessPolicy for all partitions of some_asset, rather than for a given partition. Let's say there are some new materialisation in dep_asset and I want it to trigger materialisation of some_asset partitions only if the minimum_interval_seconds is met. In the other words, some_assets should be run less frequently and I want to have a control on that. To give you a brief use case, let's say some_assets partitions execute API call to some external API to get historical data so we don't want to run it frequently and on each materialisation of dep_asset. Currently, I am not able to attach FreshnessPolicy for StaticPartitionDefinition and if there is a new materialisation of dep_asset, all partitions of some_asset are rerun. If there is a better way to achieve that, please let me know.

Anyway, @OwenKephart the solution you propose also aligns with some use cases I can think of in my pipeline. I just wanted to add a broader context.

aksestok commented 7 months ago

This missing feature is currently blocking us from fully embracing auto-materialization and freshness policies. It's annoying to have to implement custom asset sensors for multi-partitioned assets when there already is a great API in place!