dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.14k stars 1.4k forks source link

Materializing assets that switch between being partitioned and non-partitioned does not work #19357

Open ryandstoughton opened 7 months ago

ryandstoughton commented 7 months ago

Dagster version

dagster, version 1.6.1

What's the issue?

When materializing assets that go from non-partitioned to partitioned and then back, the 2nd reintroduction of partitioning seems to keep the asset from being materialized when selecting the asset group and clicking "Materialize selected".

I select my assets

Screenshot 2024-01-22 at 3 47 30 PM

Click "Materialize selected (4)"

Screenshot 2024-01-22 at 3 46 54 PM

Click "Launch backfill"

Screenshot 2024-01-22 at 3 49 24 PM

some_asset4 is seemingly dropped from the output of the AssetBackfillData

2024-01-22 21:50:02 +0000 - dagster.daemon.BackfillDaemon - INFO - Evaluating asset backfill wqgslvia
2024-01-22 21:50:02 +0000 - dagster.daemon.BackfillDaemon - INFO - Targets for asset backfill wqgslvia are valid. Continuing execution with current status: BulkActionStatus.REQUESTED.
2024-01-22 21:50:02 +0000 - dagster.daemon.BackfillDaemon - INFO - Asset backfill wqgslvia completed iteration with status BulkActionStatus.REQUESTED.
2024-01-22 21:50:02 +0000 - dagster.daemon.BackfillDaemon - INFO - Updated asset backfill data for wqgslvia: AssetBackfillData(target_subset=AssetGraphSubset(non_partitioned_asset_keys={AssetKey(['some_asset1']), AssetKey(['some_asset3'])}, partitions_subsets_by_asset_key={AssetKey(['some_asset2']): DefaultPartitionsSubset(subset={'partition1', 'partition2'})}), requested_runs_for_target_roots=True, latest_storage_id=247, materialized_subset=AssetGraphSubset(non_partitioned_asset_keys=set(), partitions_subsets_by_asset_key={}), requested_subset=AssetGraphSubset(non_partitioned_asset_keys={AssetKey(['some_asset1'])}, partitions_subsets_by_asset_key={}), failed_and_downstream_subset=AssetGraphSubset(non_partitioned_asset_keys=set(), partitions_subsets_by_asset_key={}), backfill_start_time=DateTime(2024, 1, 22, 21, 49, 57, 158773, tzinfo=Timezone('UTC')))

And then it never runs after some_asset3 succeeds

Screenshot 2024-01-22 at 3 51 45 PM

And it does not show in the "Runs" tab.

Screenshot 2024-01-22 at 3 52 38 PM

What did you expect to happen?

I expect some_asset4 to materialize the same as some_asset2. They both rely on a non-partitioned asset.

How to reproduce?

DAGSTER_VERSION = "1.6.1" DAGSTER_LIB_VERSION = "0.22.1"

My asset definitions are

from dagster import OpExecutionContext, StaticPartitionsDefinition, asset

@asset
def some_asset1() -> str:
    return "some_asset1"

@asset(partitions_def=StaticPartitionsDefinition(["partition1", "partition2"]))
def some_asset2(context: OpExecutionContext, some_asset1: str) -> str:
    # Log the partition name
    context.log.info(f"some_asset2 partition: {context.partition_key}")
    return "some_asset2"

@asset
def some_asset3(some_asset2: dict) -> str:
    return "some_asset3"

@asset(partitions_def=StaticPartitionsDefinition(["partition1", "partition2"]))
def some_asset4(context: OpExecutionContext, some_asset3: str) -> str:
    # Log the partition name
    context.log.info(f"some_asset4 partition: {context.partition_key}")
    return "some_asset4"

My definitions are loaded in __init__.py as

from dagster import Definitions, load_assets_from_package_module
from . import assets

defs = Definitions(
    assets=load_assets_from_package_module(
        assets,
        group_name="dag_testing_assets",
    ),
)

My workspace is defined in workspace.yaml as

load_from:
    - python_package:
        package_name: dag_testing 

Deployment type

Docker Compose

Deployment details

No response

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

tacastillo commented 7 months ago

What happens if you define the partition definition once (StaticPartitionsDefinition(["partition1", "partition2"]) and share it across both assets?

ryandstoughton commented 7 months ago

Unfortunately it appears to behave the same either way.

partitions = StaticPartitionsDefinition(["partition1", "partition2"])
...

@asset(partitions_def=partitions)
def some_asset2(context: OpExecutionContext, some_asset1: str) -> str:
    ...

@asset(partitions_def=partitions)
def some_asset4(context: OpExecutionContext, some_asset3: str) -> str:
    ...

yields

Screenshot 2024-01-22 at 5 11 26 PM
ryandstoughton commented 7 months ago

To add further insight, adding a some_asset5 after the partitioned asset that never materializes causes some_asset5 and some_asset1 to materialize at the same time. This appears to ignore the some_asset5's dependency on the asset before it.

Screenshot 2024-01-23 at 9 12 06 AM Screenshot 2024-01-23 at 9 15 09 AM Screenshot 2024-01-23 at 9 15 31 AM
ryandstoughton commented 7 months ago

This is still an issue with the latest Dagster version at the time of writing (1.6.4).

ryandstoughton commented 5 months ago

This is still an issue for my team and is blocking us from using partitions at all.

Based on this minimum reproducible error, are we misunderstanding partitioning in Dagster or is this truly a bug? We would like to partition our assets by client, but not all assets are able to be partitioned this way. Our alternative is to just run every client sequentially (which is becoming prohibitively slow), or to implement concurrency on our own (would like to lean on Dagster wherever possible).

Any info would be greatly appreciated here.