dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
10.73k stars 1.33k forks source link

Custom partition mapping is not used when launching backfill and raises cryptic error #17225

Open vladhc opened 8 months ago

vladhc commented 8 months ago

Dagster version

dagster, version 1.5.3

What's the issue?

Given:

Launching a backfill on the child asset will throw an error: dagster._core.errors.DagsterInvariantViolationError: Asset partition AssetKeyPartitionKey(asset_key=AssetKey(['country_value']), partition_key='Germany') depends on invalid partition keys {AssetKeyPartitionKey(asset_key=AssetKey(['user_value']), partition_key='Germany')}.

Launching a single run for each partition (without backfill) will successfully materialize any partition of the child asset.

What did you expect to happen?

Backfill should successfully materialize child asset.

How to reproduce?

from typing import Dict, Optional, List
import datetime

from dagster import Definitions, asset, AssetIn
from dagster import DynamicPartitionsDefinition, sensor, DefaultSensorStatus, SensorResult, SkipReason
from dagster import PartitionMapping

from dagster._core.definitions.partition import PartitionsDefinition, PartitionsSubset
from dagster._core.definitions.partition_mapping import UpstreamPartitionsResult
from dagster._core.instance import DynamicPartitionsStore

countries_partitions_def = DynamicPartitionsDefinition(name='countries')

users_partitions_def = DynamicPartitionsDefinition(name='users')

COUNTRY_TO_USERS = {
    'Germany': {'Hans', 'Otto'},
    'USA': {'Tom', 'Jerry'},
}

@sensor(default_status=DefaultSensorStatus.RUNNING)
def update_partitions(context):
    if context.instance.get_dynamic_partitions(countries_partitions_def.name):
        return SkipReason('Partitions already initialized')
    return SensorResult(
        dynamic_partitions_requests=[
            countries_partitions_def.build_add_request(['USA', 'Germany']),
            users_partitions_def.build_add_request(['Tom', 'Jerry', 'Hans', 'Otto']),
        ])

class UserToCountryPartitionMapping(PartitionMapping):

    def get_upstream_mapped_partitions_result_for_partitions(
        self,
        downstream_partitions_subset: Optional[PartitionsSubset],
        upstream_partitions_def: PartitionsDefinition,
        current_time: Optional[datetime.datetime] = None,
        dynamic_partitions_store: Optional[DynamicPartitionsStore] = None,
    ) -> UpstreamPartitionsResult:

        global COUNTRY_TO_USERS

        assert downstream_partitions_subset is not None
        downstream_partitions_def = downstream_partitions_subset.partitions_def
        assert isinstance(downstream_partitions_def, DynamicPartitionsDefinition)
        assert downstream_partitions_def.name == 'countries'

        assert upstream_partitions_def is not None
        assert isinstance(upstream_partitions_def, DynamicPartitionsDefinition)
        assert upstream_partitions_def.name == 'users'

        users: List[str] = []
        for country in downstream_partitions_subset.get_partition_keys():
            users.extend(COUNTRY_TO_USERS[country])

        return UpstreamPartitionsResult(
            upstream_partitions_def.subset_with_partition_keys(users),
            list())

    def get_downstream_partitions_for_partitions(
        self,
        upstream_partitions_subset: PartitionsSubset,
        downstream_partitions_def: PartitionsDefinition,
        current_time: Optional[datetime.datetime] = None,
        dynamic_partitions_store: Optional[DynamicPartitionsStore] = None,
    ) -> PartitionsSubset:
        raise NotImplemented

@asset(partitions_def=users_partitions_def)
def user_value() -> int:
    return 1

@asset(
    partitions_def=countries_partitions_def,
    ins={
        'user_value': AssetIn(key=user_value.key, partition_mapping=UserToCountryPartitionMapping())
    }
)
def country_value(context, user_value: Dict[str, int]) -> int:
    context.log.info(f'Users: {user_value.keys()}')
    return sum(user_value.values())

defs = Definitions(
    assets=[
        user_value,
        country_value,
    ],
    sensors=[update_partitions]
)

Materialize the user_value asset. Then launch backfill materialization for country_value asset.

The status of the backfill will become "Failed".

The error will be:

dagster._core.errors.DagsterInvariantViolationError: Asset partition AssetKeyPartitionKey(asset_key=AssetKey(['country_value']), partition_key='Germany') depends on invalid partition keys {AssetKeyPartitionKey(asset_key=AssetKey(['user_value']), partition_key='Germany')}

  File "/home/vladimir/miniconda3/envs/dagster-repro-multi-partition/lib/python3.11/site-packages/dagster/_daemon/backfill.py", line 34, in execute_backfill_iteration
    yield from execute_asset_backfill_iteration(
  File "/home/vladimir/miniconda3/envs/dagster-repro-multi-partition/lib/python3.11/site-packages/dagster/_core/execution/asset_backfill.py", line 698, in execute_asset_backfill_iteration
    for result in execute_asset_backfill_iteration_inner(
  File "/home/vladimir/miniconda3/envs/dagster-repro-multi-partition/lib/python3.11/site-packages/dagster/_core/execution/asset_backfill.py", line 1059, in execute_asset_backfill_iteration_inner
    asset_partitions_to_request = asset_graph.bfs_filter_asset_partitions(
                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/vladimir/miniconda3/envs/dagster-repro-multi-partition/lib/python3.11/site-packages/dagster/_core/definitions/asset_graph.py", line 657, in bfs_filter_asset_partitions
    if condition_fn(candidates_unit, result):
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/vladimir/miniconda3/envs/dagster-repro-multi-partition/lib/python3.11/site-packages/dagster/_core/execution/asset_backfill.py", line 1061, in <lambda>
    lambda unit, visited: should_backfill_atomic_asset_partitions_unit(
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/vladimir/miniconda3/envs/dagster-repro-multi-partition/lib/python3.11/site-packages/dagster/_core/execution/asset_backfill.py", line 1156, in should_backfill_atomic_asset_partitions_unit
    raise DagsterInvariantViolationError(

Deployment type

Local

Deployment details

No response

Additional information

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

sryza commented 8 months ago

Hi @vladhc - Dagster does not currently support custom user-defined partition mappings (because we can't load arbitrary user code in host processes like the backfill daemon).

We should raise an error earlier instead of failing in this way. @clairelin135 - thoughts on the best way to do this?

@vladhc - would StaticPartitionMapping work for you if it didn't raise an error when one of the PartitionsDefinitions is dynamic?

vladhc commented 8 months ago

Hi @sryza . Thank you for fast reply.

would StaticPartitionMapping work for you if it didn't raise an error when one of the PartitionsDefinitions is dynamic?

This won't be sufficient for us. We can have 2-3 partitions added/removed during a day. Also so far it's not possible to say in advance what the IDs of partitions would be.

As a workaround I can restart Dagster each day and use only StaticPartitions and StaticPartitionMapping. This would be sufficient for now.

Dagster does not currently support custom user-defined partition mappings (because we can't load arbitrary user code in host processes like the backfill daemon).

I see. This makes sense. Although for my use case, when all the partitions are simple one-to-one or one-to-many mappings, one could imagine passing only partition data structures to the daemon. In my fantasies I would even treat partitions as assets. Something like:

@partition_asset(io_manager='my_db_io_manager')
def images_partition(context, my_resource: MyAnotherResource) -> dagster.PartitionMapping:
  # ... figure out partition mapping
  return image_names  # is a Mapping[str, Set[str]]. Serialized and sent to dagster's daemon.

@asset(partition_def=images_partition)
def model_predictions(...):
  ...

This would give all the benefits of assets: io managers, auto-materialization when upstream assets change, seeing their dependencies on the graph.

sryza commented 8 months ago

This won't be sufficient for us. We can have 2-3 partitions added/removed during a day. Also so far it's not possible to say in advance what the IDs of partitions would be.

In your code snippet, the COUNTRY_TO_USERS mapping is hardcoded. Is the idea that this would actually be populated by reading from a database or something? Btw here's an issue that tracks relevant functionality: https://github.com/dagster-io/dagster/issues/13139.

In my fantasies I would even treat partitions as assets

Interesting. This seems somewhat relevant to this issue: https://github.com/dagster-io/dagster/issues/9559.

CaselIT commented 3 weeks ago

The behaviour is super frustrating, since running one partition at a time works correctly. It only fails then multiple or all partitions are requested to be run.

CaselIT commented 2 weeks ago

Would additional partition mapping be accepted in case PR were to be provided? I think something like a regexp partition mapping could go a long way to reduce this problem without having to support running arbitrary code in the dagster process