dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.51k stars 1.45k forks source link

Possibility to define dependency accepted lag #21151

Open astronautas opened 6 months ago

astronautas commented 6 months ago

What's the use case?

In machine learning, sometimes it's better to run a model even if upstream features are a bit late, instead of halting the pipeline, especially for features that don't change often, since the objective is usually to send fresh predictions to clients ASAP. This (let's call it the accepted lag) can be quantitatively estimated with backtesting. It depends on the feature though itself, hence it is not a universal truth - some features are more sensitive to lag, some less.

Ideas of implementation

This is only relevant for partitioned assets.

I believe there might be a possibility to assemble such partitioning relationship with low-level Dagster primitives (is there?), but it would be very convenient (at least from my POV) to have something first class to handle this e.g. an argument for AssetDep or anywhere else within deps (or ins?) of an asset that would allow to specify accepted lag / tolerance of a concrete dependency in relation to the current asset

Concretely, an illustration:

  @asset(
      partitions_def=DailyPartitionsDefinition(start_date="2020-01-01"),
      ins={"feature_asset_1": AssetIn(..., accepted_lag=5}, # 5 days of lag is fine OR 5 * upstream_granularity full upstream partitions can still be missing until my_model halts, the partition type is inherited from the current asset, not the upstream asset
  )
  def my_model():
  ...

Additional information

Let me know if you need any additional context or clarification! Maybe it's perfectly achievable with the current Dagster APIs, let me know if so.

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

ion-elgreco commented 6 months ago

You can set the eager automaterialize policy, which will run an asset everytime a single partition or upstream asset got refreshed

astronautas commented 6 months ago

You can set the eager automaterialize policy, which will run an asset everytime a single partition or upstream asset got refreshed

That's not an issue. I want to be able to run the asset as long as upstream partitions are not too old, even in case they haven't materialized yet. And stop if they are too old.

ion-elgreco commented 6 months ago

@astronautas just set a freshness policy?

astronautas commented 6 months ago

@ion-elgreco

@astronautas just set a freshness policy?

Would that allow me to run my asset as long as the upstream is not too old? I don't think either freshness policies even make sense for partitioned assets, no...?

How would you model that with freshness policies? Assume DAG as follows:

[features_1] (hourly) -> [model] (hourly).

I want to halt [model] materialization if [features_1] is missing >= 2 (hourly) partitions. If it's missing 1, it's fine to run, no big deal. How would you write the asset code for that...?

astronautas commented 6 months ago

Maybe someone from Dagster maintainers can share docs on how to achieve such functionality with existing APIs?

astronautas commented 5 months ago

Up! Any updates on this?

OwenKephart commented 4 months ago

Hi @astronautas -- we're in the midst of a larger overhaul of the auto-materialization system, and this is something that we would have the flexibility to handle, but this is not something that is currently possible in the framework.

APIs have not been finalized by any means, but in the new system, you'd be able to specify that the latest partition of your asset should materialize if:

as something similar to the following:

# latest partition is missing
SchedulingCondition.in_latest_time_window() &
SchedulingCondition.missing() &
# any parents within the last n days have been filled in
SchedulingCondition.any_deps_match(
    SchedulingCondition.in_latest_time_window(lookback_delta=timedelta(days=n)) &
    ~SchedulingCondition.missing()
)

I think this would essentially capture what you're looking for, does that sounds accurate to you?

astronautas commented 4 months ago
  • at least one of the parent partitions from the last N days has been filled in

Brilliant, that would work ❤️ .