dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.66k stars 1.47k forks source link

BackfillPolicy.single_run() does not work with MultiPartitionsDefinition when NOT selecting all inner partitions #18852

Open mmutso-boku opened 10 months ago

mmutso-boku commented 10 months ago

Dagster version

1.5.7

What's the issue?

I have a MultiPartitioned asset (outer Hourly, inner Static) with single run backfill policy. When starting a backfill for X amount of outer partitions and all of the inner partitions, then the backfill runs in a single run, as expected. When starting a backfill for X amount of outer partitions and NOT all of the inner partitions, but just some of them, then the backfill runs as a multi-run backfill - each partition is materialized in a separate run.

What did you expect to happen?

Having defined backfill_policy=BackfillPolicy.single_run() on the asset, I expect the backfill to start in a single run for the selected partition range. Instead, each single partitions is executed in a separate run.

How to reproduce?

partitions_def = MultiPartitionsDefinition(
    {
        'partition': HourlyPartitionsDefinition(datetime(2023, 12, 1)),
        'source': StaticPartitionsDefinition(['A', 'B', 'C'])
    })

@asset(
    partitions_def=partitions_def,
    backfill_policy=BackfillPolicy.single_run()
)
def asset1(context: AssetExecutionContext):
    return

From the UI select multiple outer partitions, and from the inner partitions, select one or two partitions.

Deployment type

Dagster Helm chart

Deployment details

No response

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

patrikdevlin commented 6 months ago

@mmutso-boku I ran into this issue as well -- as a workaround you can change the names of your partitions which will swap which one is considered the inner partition because the keys get sorted alphabetically here -> https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/_core/definitions/multi_dimensional_partitions.py#L211

sean-stach-ff commented 2 months ago

I am getting a similar issue.

partitions_def = MultiPartitionsDefinition(
    {
        "date": DailyPartitionsDefinition(start_date="2024-08-20"),
        "static": StaticPartitionsDefinition(
            ["partition1", "partition2", "partition3"]
        ),
    }
)

@asset(
    partitions_def=partitions_def,
    backfill_policy=BackfillPolicy.single_run(),
    metadata={"partition_expr": "mmm"},
    io_manager_key="duckdb_io_manager",
)
def test_multi_part_asset(context):
    context.log.info("hi")

If I run 2 date x 2 static - it will create a new job run for each date partition (2 job runs)

However, it only seems to create two job runs for 2x2, I've tried a bunch of other combinations and it properly creates a single run.