dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.7k stars 1.48k forks source link

Inconsistent input type depending on length of static partition #15538

Open harrylojames opened 1 year ago

harrylojames commented 1 year ago

Dagster version

dagster, version 1.3.13

What's the issue?

The type of a loaded static partitioned upstream asset is different depending on whether the static partition definition has one partition or more than one partition.

For my downstream asset to behave consistently under the scenario where the partition definition has 1 or more partitions I need to convert the input to dict which isn't a pretty pattern.

Realise fixing this might be a breaking change and realise there's likely a motivation for the current behaviour I'm missing!

What did you expect to happen?

For the loaded asset to always return a dictionary of the following format even if the dictionary only has one element.

{partition_key: partitioned_asset_value}

How to reproduce?

Example 1, asset_1 is of type int:

from dagster import StaticPartitionsDefinition, asset, \
    DagsterInstance, materialize

partitions_numbers = StaticPartitionsDefinition(["0"])

@asset(partitions_def=partitions_numbers)
def asset_1():
    return 1

@asset
def asset_2(asset_1):
    print(type(asset_1))
    return

if __name__ == "__main__":
    instance = DagsterInstance.get()

    materialize(
        assets=[asset_1, asset_2],
        selection=[asset_1],
        partition_key="0",
        instance=instance,
    )

    materialize(
        assets=[asset_1, asset_2],
        selection=[asset_2],
        instance=instance
    )

Example 2, asset_1 is of type dict

from dagster import StaticPartitionsDefinition, asset, \
    DagsterInstance, materialize

partitions_numbers = StaticPartitionsDefinition(["0", "1"])

@asset(partitions_def=partitions_numbers)
def asset_1():
    return 1

@asset
def asset_2(asset_1):
    print(type(asset_1))
    return

if __name__ == "__main__":
    instance = DagsterInstance.get()

    for partition_key in partitions_numbers.get_partition_keys():
        materialize(
            assets=[asset_1, asset_2],
            selection=[asset_1],
            partition_key=partition_key,
            instance=instance,
        )

    materialize(
        assets=[asset_1, asset_2],
        selection=[asset_2],
        instance=instance
    )

The pattern I find myself writing to ensure consistent behaviour.

def convert_input_to_dict(x: Any, key: str) -> dict: 
    if type(x) != dict:
        x = {key: x}
    return x

Deployment type

None

Deployment details

No response

Additional information

I am using dagster 1.3.13 as the example above with multiple partitions won't work due to this bug https://github.com/dagster-io/dagster/pull/15457

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

sryza commented 1 year ago

Here's a discussion about this issue, with a proposal: https://github.com/dagster-io/dagster/discussions/15557.

harrylojames commented 1 year ago

@sryza Thank you for creating the discussion and proposal - looks great!