dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.64k stars 1.47k forks source link

FreshnessPolicy cron schedule not respected for auto-materialise evaluations (dagster-dbt) #20002

Open jPinhao-Rover opened 8 months ago

jPinhao-Rover commented 8 months ago

Dagster version

1.5.13, 1.6.5

What's the issue?

When using FreshnessPolicy with a cron_schedule defined, and AutoMaterializeRule.materialize_on_required_for_freshness(), the cron schedule isn't respected and instead every tick tries to re-materialise the asset.

eg. using both these freshness policies:

# freshness policy specifying asset shouldn't be more than 10min out of date, and cron schedule to check every 5 minutes
FreshnessPolicy(maximum_lag_minutes=10, cron_schedule=(
            "*/5 * * * *"
        ))

# freshness policy specifying asset shouldn't be more than 10min out of date, and cron schedule to check every 5 minutes, explicitly listing the evaluation times to 5,10, etc past the hour
FreshnessPolicy(maximum_lag_minutes=10, cron_schedule=(
            "*/5 * * * *"
        ))

Results in the following triggered runs at :27, :30, :32, :35, :36, etc image

We wanted to adopt freshness for our dbt assets, allowing us to set a retry/check cadence more frequent than freshness requirement (to eg. handle intermittent failures) and this makes it pretty much impossible, as a failing asset will constantly hammer our data warehouse until it is manually fixed.

What did you expect to happen?

With the FreshnessPollicies specified above, I expect the asset to not be selected for materialisation more than once every 5min.

How to reproduce?

Create a dagster-dbt asset that fails to materialise, and set the following AutoMaterialisePolicy and FreshnessPolicy:

        AutoMaterializePolicy.lazy(max_materializations_per_minute=10)
        .without_rules(
        )
        .with_rules(
            # AutoMaterializeRule.materialize_on_missing(),
            AutoMaterializeRule.skip_on_parent_missing(),
            AutoMaterializeRule.skip_on_parent_outdated(),
            AutoMaterializeRule.skip_on_required_but_nonexistent_parents(),
            AutoMaterializeRule.skip_on_backfill_in_progress(all_partitions=True),
            AutoMaterializeRule.materialize_on_required_for_freshness(),
            AutoMaterializeRule.skip_on_not_all_parents_updated_since_cron(),
        )

      FreshnessPolicy(maximum_lag_minutes=10, cron_schedule=(
            "0,5,10,15,20,25,30,35,40,45,50,55 * * * *"
        ))

Notice the asset gets repeatedly selected for re-materialisation, rather than every 5 minutes.

Deployment type

Dagster Helm chart

Deployment details

Tested local, observed same behavior in Helm deployment

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

OwenKephart commented 8 months ago

Hi @jPinhao-Rover , this seems to be an issue with how the freshness-based scheduling system attempts to achieve the goal of having the materializations complete by each cron tick, rather than starting on each cron tick. Because it is unknown how long the cron tick will take, it builds in some buffer time, with the net result being more frequent materialization than necessary.

In your case, I think a better combination of rules would be:

every_five_minutes_cron = "*/5 * * * *"
policy = AutoMaterializePolicy(
    rules={
        AutoMaterializeRule.materialize_on_cron(every_five_minutes_cron),
        AutoMaterializeRule.skip_on_not_all_parents_updated_since_cron(every_five_minutes_cron),
    }
)

The net result will be that your assets will be scheduled to materialize once per 5 minute interval, and within that interval they will wait for their parents to complete.

However, if you're actually trying to materialize every 5 minutes (and that's not a shorter interval that you've been using for testing), it's possible that your parent assets might not complete materializing within the 5 minute time frame (e.g. you can imagine a situation where a parent takes 6 minutes to materialize). If that's the case (or more generally it may take longer for data to propagate through the graph than your desired frequency), then you can replace AutoMaterializeRule.skip_on_not_all_parents_updated_since_cron(every_five_minutes_cron) with AutoMaterializeRule.skip_on_not_all_parents_updated

jPinhao-Rover commented 8 months ago

Thanks for the input @OwenKephart , unfortunately this isn't quite what we're after. Fundamentally what we want is to schedule retries more frequently than the expected freshness cadence, eg. check every 20min if the asset meets 1h freshness check :

So effectively, check if a retry is needed at 2-3x the expected freshness cadence. This is particularly relevant for partitioned assets, where we want the last closed partition window to be fresh ahead of the next window.

Your example with just cron rules will instead trigger the materialisations 2-3x the expected cadence (00:10, 00:30, 00:50) even if the materialisation succeeded, which is what we'd like to avoid.

Do let me know if I misunderstood the suggestion or if there's a different way to encode these retry rules to ensure data is fresh ahead of next window close.

OwenKephart commented 8 months ago

Hi @jPinhao-Rover -- even with the existing freshness-based scheduling implementation, it explicitly does not retry within a given window (i.e. if a materialization has failed within the freshness window, it will not try to materialize it again until the next window).

The reasoning is that there is already an existing system for handling retry logic: https://docs.dagster.io/concepts/ops-jobs-graphs/op-retries#op-retries, and so having an additional system responsible for retrying after a failure would make things more confusing.

So now that I understand your use case a bit better, the actual recommendation would be to set the cron cadence to "once an hour" then include a retry policy on your assets specifying that they should be retried after a delay of (e.g.) 20 minutes, with a maximum of 2 retries. That way, the system will launch a run as soon as upstream data is available on the hour, but if the op fails, it will be executed again 20 minutes later.

Just out of curiosity, is there a reason that you'd want to wait so long between retries?

jPinhao-Rover commented 8 months ago

The issue with using retry policies / conditional retry requests is regarding dagster-dbt definitions and implementation, where you have common definitions for several assets, so any job/op/asset retry will actually re-trigger all of the models (including the ones that succeeded). Otherwise it would be a good solution, particularly for conditional retries based on error codes. This retry shortcoming was actually one of the reasons for starting to investigate auto-materialise and freshness - we wanted to identify which specific models within a dbt dag needed running/retrying independently of how they're grouped asset-definition wise.

is there a reason that you'd want to wait so long between retries - not really, we just want to be able to retry the specific model(s) ahead of the next DAG refresh schedule (without re-running models that have succeeded), and be able to constrain how many times it retries (to prevent hammering the warehouse with requests that will not succeed until manually intervened).

with the existing freshness-based scheduling implementation, it explicitly does not retry within a given window - personally I'd be happy with this behavior if the freshness CRONs worked as expected (ie. check every X minutes if the asset is more than Y minutes old), as that would allow us to encode the "retry up to x times within a time window if not fresh" as described above :)

OwenKephart commented 8 months ago

Ah ok thanks for that extra info -- so the issue is stemming from the fact that the framework automatically counts requested materializations as "successful" materializations, in order to prevent automatic retries. The condition that you really want would basically be:

But the framework only lets you express:

In your case, would immediately retrying on failure be sufficient? That would essentially look like the above condition with the addition of "or ( and not )", or would that potentially result in too many retries?

jPinhao-Rover commented 8 months ago

If we could do retries from the point of failure that would suffice (ie. retry failed asset and all downstream), but unfortunately with dagster-dbt this doesn't seem to be an option.