dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.59k stars 1.46k forks source link

Respect partition mappings in asset job backfills #11962

Open sryza opened 1 year ago

sryza commented 1 year ago

Imagine a software-defined asset with a self-dependency where each partition depends on the prior day's partition. If you backfill this asset from the asset graph UI, Dagster will respect that partition dependency, i.e. avoid launching later partitions in the backfill until earlier partitions in the backfill have successfully materialized.

However, now imagine that you take this same asset and define a job that targets it, and then launch a backfill for that job from the job backfill UI. Dagster won't respect this partition self-dependency.

What we've heard

juggy commented 1 year ago

I experienced this as discussed with @sryza on slack (copying here as slack history is wonky at best):

Julien Guimont 5 hours ago I have a job that includes all assets. Some assets depends on others. When I start the job, it seems that all assets are built at the same time (using previously materialized assets?) so after the job run is done, I still get some upstream data changed markers. How do you fix this so that downstream assets wait for the new version of upstream assets before materializing? Seems like this should be already enforced, but somehow this is not what I am seeing

Julien Guimont 5 hours ago Some assets are partitioned and others not. It seem that the runs (got 16 because of partitions) metarialize all non partitioned asset each time. Not sure how I can avoid that now

sandy 4 hours ago it seems that all assets are built at the same time This is unexpected. How are you determining this is the case? If you go to look at the run, does the gantt chart in the run view not reflect the dependencies?

Julien Guimont 4 hours ago Each partition run show materialization for all assets, partitioned or not.

sandy 4 hours ago ah got it - if you launch the backfill from the asset graph instead of the asset job page, dagster will only materialize the non-partitioned assets once. this isn't yet implemented in job backfills. here's the issue where we're tracking this: https://github.com/dagster-io/dagster/issues/11962

We are using partitions as "shards" to increase parallelism and process all the data on every run. We want to run a different subset of the assets on weekly/daily/hourly runs. We cannot properly/efficiently use jobs and schedule because of this bug. The workaround of triggering materialization from the asset graph is far from ideal.

HynekBlaha commented 11 months ago

Hello, I came across this unexpected behavior as well. I want to define a partitioned asset which depends on previous partition of itself - counting cumulative sum.

Here is a snippet of code:

import random
from datetime import datetime

from dagster import (
    AssetExecutionContext,
    AssetIn,
    DailyPartitionsDefinition,
    TimeWindowPartitionMapping,
    asset, define_asset_job, Definitions, DagsterType, TypeCheck,
)

def optional_int_type_check_fn(_context, value):
    """Typecheck function - first asset doesn't have a predecessor (previous_cumulative_sum is None)"""
    if value is None:
        return TypeCheck(success=True, description="First materialized partition is None")
    if isinstance(value, int):
        return TypeCheck(success=True, description="All other materialized partitions are Int")

    TypeCheck(success=False, description=f"Unexpected type: {type(value)}")

OptionalInt = DagsterType(
        type_check_fn=optional_int_type_check_fn,
        name="OptionalString",
        description="DagsterType for assets that depend on previous partition.",
)

# This example models a partitioned asset which depends on previous partition of itself (counting cumulative sum).
# When executed as "Asset backfill", runs are executed sequentially.
# If first one fails, following don't get executed.
@asset(
    partitions_def=DailyPartitionsDefinition(start_date="2023-10-01"),
    ins={
        "previous_cumulative_sum": AssetIn(
            key="sequential_dependent_on_previous",
            partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1),
            dagster_type=OptionalInt,
        )
    },
)
def sequential_dependent_on_previous(context: AssetExecutionContext, previous_cumulative_sum: int | None) -> int:
    chance = random.random()
    if chance < 0.2:
        raise RuntimeError(f"Random issue (chance: {chance})")

    previous_cumulative_sum = previous_cumulative_sum or 0  # None on start_date
    increment = datetime.fromisoformat(context.asset_partition_key_for_output()).day

    context.log.debug(f"{previous_cumulative_sum=}, {increment=}")
    return previous_cumulative_sum + increment

# When executed as a job, backfill doesn't respect sequential order.
# This leads to running all job backfills at once and fails in IOManager on non-existent predecessor inputs.
sequential_dependent_on_previous_job = define_asset_job(
    name="sequential_dependent_on_previous_job",
    selection="sequential_dependent_on_previous"
)

defs = Definitions(
    assets=[
        sequential_dependent_on_previous,
    ],
    jobs=[sequential_dependent_on_previous_job]
)

When asset backfill is executed, each partitioned asset materialization is executed in series (after previous finishes).

asset_backfill - executed in serie

Once first partitioned asset materialization fails, all others automatically fail. Backfill status shows as completed, I think it should have status Incomplete.

asset_backfill - fails after first failed

Asset job backfill doesn't respect the dependency all partitions are executed in parallel. Shows status as Incomplete.

job_backfill - doesn't respect dependency

Having asset backfills and asset job backfills act differently is counter-intuitive.