dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.57k stars 1.46k forks source link

Single run backfill not working consistently correctly when backfilling partitioned assets with different start times #19625

Open mmutso-boku opened 8 months ago

mmutso-boku commented 8 months ago

Dagster version

1.6.2

What's the issue?

When initiating a backfill for multiple assets, where all of them have single run backfill policy, then the downstream assets' partitions do not get materialized in a single run, but rather in multiple runs with random chunks.

What did you expect to happen?

All assets's partitions get materialized in a single run, as specified by their backfill policy, regardless of their partitions definitions start time differences.

How to reproduce?

To reproduce in local dev env:

class NoOpIoManager(IOManager):
    def load_input(self, context: InputContext) -> Any:
        pass

    def handle_output(self, context: OutputContext, obj: Any) -> None:
        pass
@asset(partitions_def=HourlyPartitionsDefinition(datetime(2020, 1, 1)),
       backfill_policy=BackfillPolicy.single_run(),
       io_manager_key=NO_OP_IO_MANAGER)
def root_buggy(context):
    yield Output(value=None, output_name=context.output_for_asset_key(context.asset_key))

@asset(partitions_def=HourlyPartitionsDefinition(datetime(2022, 4, 21)),
       backfill_policy=BackfillPolicy.single_run(),
       io_manager_key=NO_OP_IO_MANAGER,
       deps=[root_buggy])
def downstream_buggy(context):
    yield Output(value=None, output_name=context.output_for_asset_key(context.asset_key))

@asset(partitions_def=HourlyPartitionsDefinition(datetime(2020, 1, 1)),
       backfill_policy=BackfillPolicy.single_run(),
       io_manager_key=NO_OP_IO_MANAGER)
def root_working(context):
    yield Output(value=None, output_name=context.output_for_asset_key(context.asset_key))

@asset(partitions_def=HourlyPartitionsDefinition(datetime(2020, 1, 1)),
       backfill_policy=BackfillPolicy.single_run(),
       io_manager_key=NO_OP_IO_MANAGER,
       deps=[root_working])
def downstream_working(context):
    yield Output(value=None, output_name=context.output_for_asset_key(context.asset_key))

Two chains here - one chain has different start times for partitions_def, the other chain has identical start times.

1) Select the root_buggy asset, and in the lineage view, click "Downstream" and then "Materialize all..." 2) In my example, for the period to backfill, choose [2024-01-01-00:00...2024-01-31-23]. The "Backfill preview" view should look like this: image 3) Once the backfill starts, it will at first backfill all the root_buggy partitions in a single run, and after that the downstream_buggy partitions are materialized in multiple runs, where the time period is random. I have seen anywhere from 3 hours to 15 days-worth to all partitions in a single run. It looks like this (there are more runs of course, did not fit all of them in the picture): image 4) For comparison, start a backfill in the exact same way with the same partitions range for root_working and downstream. 5) Once the backfill starts, it will backfill both assets's partitions in a single run, as expected: image

Deployment type

Dagster Helm chart

Deployment details

No response

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

mmutso-boku commented 8 months ago

Any updates or acknowledgement of this issue?

mmutso-boku commented 6 months ago

Tried the steps to reproduce again with version 1.7.1, it is still present