dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.52k stars 1.45k forks source link

AutomationCondtition's any_deps_updated().since_last_handled() doesn't work as expected. #24650

Open ssillaots-boku opened 4 weeks ago

ssillaots-boku commented 4 weeks ago

Dagster version

1.8.8

What's the issue?

I have a feeling that AutomationCondition.any_deps_updated().since_last_handled() doesn't work as expected. There is a chance that I'm just misunderstanding the definition and the flow/action is intended.

This example is based on default AutomationCondition.eager(). The scenario is that there are 2 upstream assets and 1 downstream asset (downstream asset depends on both upstream assets). For the sake of simplicity the 2 upstream assets are manually materialized and the downstream asset has AutomationCondition.eager(). All assets are daily partitioned. I'll include sensor ticks into the steps.

  1. Sensor tick
  2. I materialize both upstream assets
  3. Sensor tick
  4. Sensor triggers downstream asset materialization
  5. Before the next tick I rematerialize one of the upstream asset's partition.
  6. Sensor tick
  7. Nothing

For the 7th step I anticipated that downstream asset is triggered but it isn't.

What did you expect to happen?

I then created a custom AutomationCondition which is almost identical to eager() only difference being I removed .since_last_handled()

return (
     AutomationCondition.in_latest_time_window()
     & (
           AutomationCondition.newly_missing() | AutomationCondition.any_deps_updated()
     )
     & ~AutomationCondition.any_deps_missing()
     & ~AutomationCondition.any_deps_in_progress()
     & ~AutomationCondition.in_progress()
).with_label("eager")

With this AutomationCondition it worked as expected.

How to reproduce?

I created a test. Hopefully that is enough.

from datetime import date, datetime, timedelta

from dagster import (
    AutomationCondition,
    DagsterInstance,
    DailyPartitionsDefinition,
    asset,
    evaluate_automation_conditions,
    materialize_to_memory,
)
from dagster._utils.warnings import disable_dagster_warnings

part_def = DailyPartitionsDefinition(datetime(2024, 9, 14))

def test_automation_condition() -> None:
    instance = DagsterInstance.ephemeral()

    # on the first tick anything shouldn't be materialized
    result = evaluate_automation_conditions(defs=[upstream_1, upstream_2, downstream], instance=instance)
    assert result.total_requested == 0

    materialize_upstream(instance=instance, assets=[upstream_1, upstream_2])

    # fire downstream materialization request on the next tick
    result = evaluate_automation_conditions(defs=[upstream_1, upstream_2, downstream],
                                            instance=instance,
                                            cursor=result.cursor)
    assert result.total_requested == 1

    # in the same tick materialize upstream once again
    materialize_upstream_current_day(instance=instance, assets=[upstream_1])

    # downstream should be materialized again
    result = evaluate_automation_conditions(defs=[upstream_1, upstream_2, downstream],
                                            instance=instance,
                                            cursor=result.cursor)
    assert result.total_requested == 1

    # everything should be up to date
    result = evaluate_automation_conditions(defs=[upstream_1, upstream_2, downstream],
                                            instance=instance,
                                            cursor=result.cursor)
    assert result.total_requested == 0

@asset(partitions_def=part_def)
def upstream_1() -> None:
    return

@asset(partitions_def=part_def)
def upstream_2() -> None:
    return

def create_automation_condition():
    with disable_dagster_warnings():
        return (
                AutomationCondition.in_latest_time_window()
                & (
                        AutomationCondition.newly_missing() | AutomationCondition.any_deps_updated()
                )
                & ~AutomationCondition.any_deps_missing()
                & ~AutomationCondition.any_deps_in_progress()
                & ~AutomationCondition.in_progress()
        ).with_label("eager")

@asset(automation_condition=create_automation_condition(),
       partitions_def=part_def,
       deps=[upstream_1, upstream_2])
def downstream() -> None:
    return

def materialize_upstream(instance, assets):
    the_day_before = (date.today() - timedelta(days=1)).strftime('%Y-%m-%d')
    for asset_key in assets:
        materialize_to_memory(instance=instance,
                              partition_key=the_day_before,
                              assets=[asset_key])

Deployment type

Dagster Helm chart

Deployment details

No response

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a šŸ‘! We factor engagement into prioritization.

smackesey commented 2 weeks ago

cc @OwenKephart

OwenKephart commented 1 week ago

Hi @ssillaots-boku ! This is an interesting edge case -- in essence, if the child updates on the same tick that a new parent update comes in, then we treat the parent update as "handled by" the new child update (because the child updated "at the same time as" the parent), even if those executions were in different runs.

The eager policy waits for all in progress work to complete before kicking off a run, so by the time its run completes, everything appears to be ok from the perspective of the system.

Another way of looking at is that the frequency of evaluations determines the "resolution" of how accurately we can determine "event_a came before event_b". Typically, the expectation is that the frequency of materializations of a single asset will be much less than once every 30 seconds and so the cases in which this would occur would be fairly rare.

Removing the since_last_handled() condition will work for this test case, but will yield incorrect results in "real-world" usage. In particular, imagine you have A -> B -> C, and you manually materialize A. B will be requested, and because C can see that B is getting requested this tick, it will also get requested (any_deps_updated checks for newly_updated | will_be_requested).

So you have a run that will materialize both of them, and that starts executing. This run will materialize B, which then will be parsed by the system and result in a second run of C, which is not the desired behavior.

This test should pass if you put an extra tick (that emits 0 runs) in between the upstream materializations.

If this test case represents a real-world situation you ran into (and isn't just an artifact of time scales of unit tests being much shorter than real life), then you could using AutomationCondition.any_deps_match(AutomationCondition.newly_updated())

This will result in your asset being requested as soon as any parent update is detected, and will ignore any in progress runs etc. However, this also means that runs will not be chained together (i.e. if you have A -> B -> C, and update A, then the system will create a run for just B in response, and then once that completes, it will create a separate run for C).