dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.71k stars 1.48k forks source link

Allow IO manager keys to be StrEnums, not just strs #18623

Open ShootingStarD opened 11 months ago

ShootingStarD commented 11 months ago

Can only serialize whitelisted Enums, received JobKeysEnum -> whitelist enums from the StrEnum library

Dagster version

1.5.9

What's the issue?

Hi, I want to by able to provide StrEnums as keys for various dagster elements such as io_manager, jobs, assets etc. It works well for asset but not for IO manager and Jobs. In fact I manage to reproduce the error for jobs but not for IO manager, eventhough I use the same dagster version.

Nevertheless when I try to execute a Job where the name was defined via a child of StrEnum (from the StrEnum library I get the following error : dagster._serdes.errors.SerializationError:

Can only serialize whitelisted Enums, received JobKeysEnum.
               Descent path: <root:JobSnapshot>.name

For me it is important to use StrEnum because :

Like I said, in another project, I get the same error with IO Managers when defining they keys for the resources of a definition

What did you expect to happen?

From what I understand, the error is "just" that StrEnum are not whitelisted. But since they can be treated as strings, I don't see why we could not whitelist them. Would it be possible to do so?

How to reproduce?

assets.py

from dagster import Definitions, asset
import pandas as pd

@asset
def upstream_asset():
    return pd.DataFrame([1, 2, 3])

@asset
def downstream_asset(upstream_asset):
    return upstream_asset *  4

dagster_keys.py

from enum import  auto
from strenum import SnakeCaseStrEnum

class IOManagerKeysSTR(SnakeCaseStrEnum): # SnakeCaseStrEnum inherits from StrEnum to convert the name into snake case
    IO_MANAGER_STR_ENUM = auto()

class JobKeysEnum(SnakeCaseStrEnum):
    SIMPLE_JOB = auto()

jobs.py

from dagster import AssetSelection, define_asset_job
from my_dagster_project.assets import upstream_asset, downstream_asset
from my_dagster_project.dagster_keys import JobKeysEnum

dummy_job = define_asset_job(
    name=JobKeysEnum.SIMPLE_JOB,
    selection=AssetSelection.assets(upstream_asset, downstream_asset),

)

io_manager.py

from dagster import ConfigurableIOManager, InputContext, OutputContext
from typing import List
import pandas as pd

class MyIOManager(ConfigurableIOManager):
    # specifies an optional string list input, via config system
    path_prefix: List[str] = []

    def _get_path(self, context) -> str:
        return "/".join(self.path_prefix + context.asset_key.path)

    def handle_output(self, context: OutputContext, obj:pd.DataFrame):
        obj.to_csv(self._get_path(context))

    def load_input(self, context: InputContext):
        return pd.read_csv(self._get_path(context))

definitions.py

from dagster import Definitions, asset
from .io_manager import MyIOManager
import pandas as pd
from my_dagster_project.dagster_keys import  IOManagerKeysSTR
from my_dagster_project.assets import upstream_asset, downstream_asset
from my_dagster_project.jobs import dummy_job

defs_enum_str = Definitions(
    assets=[upstream_asset, downstream_asset],
    resources={
        IOManagerKeysSTR.IO_MANAGER_STR_ENUM : MyIOManager(

        ),
    },
    jobs=[dummy_job]

)

tests.py

from my_dagster_project.definitions import (
    defs_enum_str
)
from my_dagster_project.dagster_keys import JobKeysEnum

def test_str_enum_io_manager():
    job_job = defs_enum_str.get_job_def(
        JobKeysEnum.SIMPLE_JOB
    )
    job_job_results = job_job.execute_in_process()

    assert job_job_results.success

Like I said previously, I am not able to reproduce the error for the IO Manager keys (and I cannot share it since it is from my company)

Deployment type

None

Deployment details

No response

Additional information

I haven't tried to use the Python 3.11 StrEnum but even if it works, our production is still on python 3.9 and a migration is not possible at the moment

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

sryza commented 11 months ago

Hey @ShootingStarD - are you able to share your full stack trace?

ShootingStarD commented 11 months ago

Hi @sryza here it is :

___________________________________ test_str_enum_io_manager ___________________________________

    def test_str_enum_io_manager(

    ):

        job_job = defs_enum_str.get_job_def(
            JobKeysEnum.SIMPLE_JOB
        )
>       job_job_results = job_job.execute_in_process()

my-dagster-project/my_dagster_project_tests/test_enum.py:34: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.venv/lib/python3.10/site-packages/dagster/_core/definitions/job_definition.py:702: in execute_in_process
    return core_execute_in_process(
.venv/lib/python3.10/site-packages/dagster/_core/execution/execute_in_process.py:51: in core_execute_in_process
    run = execute_instance.create_run_for_job(
.venv/lib/python3.10/site-packages/dagster/_core/instance/__init__.py:1161: in create_run_for_job
    job_def.get_job_snapshot_id(),
.venv/lib/python3.10/site-packages/dagster/_core/definitions/job_definition.py:953: in get_job_snapshot_id
    return self.get_job_index().job_snapshot_id
.venv/lib/python3.10/site-packages/dagster/_core/host_representation/job_index.py:83: in job_snapshot_id
    self._job_snapshot_id = create_job_snapshot_id(self.job_snapshot)
.venv/lib/python3.10/site-packages/dagster/_core/snap/job_snapshot.py:60: in create_job_snapshot_id
    return create_snapshot_id(snapshot)
.venv/lib/python3.10/site-packages/dagster/_serdes/utils.py:9: in create_snapshot_id
    json_rep = serialize_value(snapshot, **kwargs)
.venv/lib/python3.10/site-packages/dagster/_serdes/serdes.py:616: in serialize_value
    packed_value = pack_value(val, whitelist_map=whitelist_map)
.venv/lib/python3.10/site-packages/dagster/_serdes/serdes.py:663: in pack_value
    return _pack_value(val, whitelist_map=whitelist_map, descent_path=descent_path)
.venv/lib/python3.10/site-packages/dagster/_serdes/serdes.py:695: in _pack_value
    return serializer.pack(cast(NamedTuple, val), whitelist_map, descent_path)
.venv/lib/python3.10/site-packages/dagster/_serdes/serdes.py:531: in pack
    packed[storage_key] = _pack_value(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

val = <JobKeysEnum.SIMPLE_JOB: 'simple_job'>
whitelist_map = WhitelistMap(tuple_serializers={'ConfigurableClassData': <dagster._serdes.config_class.ConfigurableClassDataSerializer...rializer object at 0x7fd7f49b7bb0>, 'JobTickStatus': <dagster._serdes.serdes.EnumSerializer object at 0x7fd7f49b7bb0>})
descent_path = '<root:JobSnapshot>.name'

    def _pack_value(
        val: PackableValue,
        whitelist_map: WhitelistMap,
        descent_path: str,
    ) -> JsonSerializableValue:
        # this is a hot code path so we handle the common base cases without isinstance
        tval = type(val)
        if tval in (int, float, str, bool) or val is None:
            return cast(JsonSerializableValue, val)
        if tval is list:
            return [
                _pack_value(item, whitelist_map, f"{descent_path}[{idx}]")
                for idx, item in enumerate(cast(list, val))
            ]
        if tval is dict:
            return {
                key: _pack_value(value, whitelist_map, f"{descent_path}.{key}")
                for key, value in cast(dict, val).items()
            }

        # inlined is_named_tuple_instance
        if isinstance(val, tuple) and hasattr(val, "_fields"):
            klass_name = val.__class__.__name__
            if not whitelist_map.has_tuple_serializer(klass_name):
                raise SerializationError(
                    "Can only serialize whitelisted namedtuples, received"
                    f" {val}.\nDescent path: {descent_path}",
                )
            serializer = whitelist_map.get_tuple_serializer(klass_name)
            return serializer.pack(cast(NamedTuple, val), whitelist_map, descent_path)
        if isinstance(val, Enum):
            klass_name = val.__class__.__name__
            if not whitelist_map.has_enum_entry(klass_name):
>               raise SerializationError(
                    "Can only serialize whitelisted Enums, received"
                    f" {klass_name}.\nDescent path: {descent_path}",
                )
E               dagster._serdes.errors.SerializationError: Can only serialize whitelisted Enums, received JobKeysEnum.
E               Descent path: <root:JobSnapshot>.name

.venv/lib/python3.10/site-packages/dagster/_serdes/serdes.py:699: SerializationError
sryza commented 11 months ago

🙏

ShootingStarD commented 11 months ago

I am open to try to add, during the week-end, the StrEnum classes to the whitelist, will keep you in touch If you have any advices other than reading the contributing guide let me know!

sryza commented 11 months ago

@alangenfeld do you have thoughts on whitelisting StrEnums for serdes so they can be used in place of strings?

I'm not an expert on StrEnums, but a potential challenge here is that enum classes defined in user code processes will not be present in host processes, which means we wouldn't be able to deserialize them there.

alangenfeld commented 11 months ago

I think it may be as easy as reordering the cases in pack_value and moving https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/_serdes/serdes.py#L772-L774 above https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/_serdes/serdes.py#L747-L755 assuming its ok to get a regular string back out if it ever goes across a serialization boundary. Based on my reading of the issue I think that would be ok. Will want to put various conditions under test in the PR that adds it to make the behavior clear.

edit: well, that naive idea would risk changing the behavior of existing whitelisted Enums that inherit from str so may want to have it be a fallback only if there isn't a whitelist entry for the Enum

ShootingStarD commented 11 months ago

@alangenfeld Indeed deserializing into a regular string is good for me as it is just an easy way to define strings that will be used throughout my projects. In fact , I will never read the de-serialized values except maybe in the debugger but I don"t mind

ShootingStarD commented 11 months ago

@alangenfeld tested your solution, it passed my testes,

will try make a PR this wk, thanks for all the help!

ShootingStarD commented 11 months ago

Pull request in : https://github.com/dagster-io/dagster/pull/18778 @alangenfeld