dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.98k stars 1.5k forks source link

Node definition name does not recognize prefix added by load_assets_from_modules #12406

Open Bonnevie opened 1 year ago

Bonnevie commented 1 year ago

Dagster version

1.1.18

What's the issue?

I use an asset factory pattern to generate almost identical assets with the same names (say, foo) in two different modules (say, a.pyand b.py), and then I load them into a shared Definitions instance by using load_assets_from_modules separately on the two modules, each call with a different key prefix, e.g.

assets_a = load_assets_from_modules([a], key_prefix=["version1"])

If I just add one of them to the definitions it works like a charm and the prefix is added to the keys, but if I do both it crashes with:

dagster._core.errors.DagsterInvalidDefinitionError: Detected conflicting node definitions with the same name "foo"
  File "/home/rasmus/.cache/pypoetry/virtualenvs/xray-0VIpMuiQ-py3.8/lib/python3.8/site-packages/dagster/_grpc/server.py", line 266, in __init__
    self._loaded_repositories: Optional[LoadedRepositories] = LoadedRepositories(
  File "/home/rasmus/.cache/pypoetry/virtualenvs/xray-0VIpMuiQ-py3.8/lib/python3.8/site-packages/dagster/_grpc/server.py", line 115, in __init__
    loadable_targets = get_loadable_targets(
  File "/home/rasmus/.cache/pypoetry/virtualenvs/xray-0VIpMuiQ-py3.8/lib/python3.8/site-packages/dagster/_grpc/utils.py", line 57, in get_loadable_targets
    else loadable_targets_from_python_package(package_name, working_directory)
  File "/home/rasmus/.cache/pypoetry/virtualenvs/xray-0VIpMuiQ-py3.8/lib/python3.8/site-packages/dagster/_core/workspace/autodiscovery.py", line 49, in loadable_targets_from_python_package
    module = load_python_module(
  File "/home/rasmus/.cache/pypoetry/virtualenvs/xray-0VIpMuiQ-py3.8/lib/python3.8/site-packages/dagster/_core/code_pointer.py", line 138, in load_python_module
    return importlib.import_module(module_name)
  File "/usr/lib/python3.8/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
  File "<frozen importlib._bootstrap>", line 991, in _find_and_load
  File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 848, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/home/rasmus/xray/xray/orchestration/__init__.py", line 1, in <module>
    from .definitions import defs
  File "/home/rasmus/xray/xray/orchestration/definitions.py", line 102, in <module>
    defs = Definitions(
  File "/home/rasmus/.cache/pypoetry/virtualenvs/xray-0VIpMuiQ-py3.8/lib/python3.8/site-packages/dagster/_core/definitions/definitions_class.py", line 218, in __init__
    self._created_pending_or_normal_repo = _create_repository_using_definitions_args(
  File "/home/rasmus/.cache/pypoetry/virtualenvs/xray-0VIpMuiQ-py3.8/lib/python3.8/site-packages/dagster/_core/definitions/definitions_class.py", line 104, in _create_repository_using_definitions_args
    def created_repo():
  File "/home/rasmus/.cache/pypoetry/virtualenvs/xray-0VIpMuiQ-py3.8/lib/python3.8/site-packages/dagster/_core/definitions/decorators/repository_decorator.py", line 129, in __call__
    else CachingRepositoryData.from_list(
  File "/home/rasmus/.cache/pypoetry/virtualenvs/xray-0VIpMuiQ-py3.8/lib/python3.8/site-packages/dagster/_core/definitions/repository_definition.py", line 870, in from_list
    for job_def in get_base_asset_jobs(
  File "/home/rasmus/.cache/pypoetry/virtualenvs/xray-0VIpMuiQ-py3.8/lib/python3.8/site-packages/dagster/_core/definitions/assets_job.py", line 54, in get_base_asset_jobs
    build_assets_job(
  File "/home/rasmus/.cache/pypoetry/virtualenvs/xray-0VIpMuiQ-py3.8/lib/python3.8/site-packages/dagster/_annotations.py", line 107, in inner
    return target(*args, **kwargs)
  File "/home/rasmus/.cache/pypoetry/virtualenvs/xray-0VIpMuiQ-py3.8/lib/python3.8/site-packages/dagster/_core/definitions/assets_job.py", line 201, in build_assets_job
    return graph.to_job(
  File "/home/rasmus/.cache/pypoetry/virtualenvs/xray-0VIpMuiQ-py3.8/lib/python3.8/site-packages/dagster/_core/definitions/graph_definition.py", line 600, in to_job
    return JobDefinition(
  File "/home/rasmus/.cache/pypoetry/virtualenvs/xray-0VIpMuiQ-py3.8/lib/python3.8/site-packages/dagster/_core/definitions/job_definition.py", line 234, in __init__
    super(JobDefinition, self).__init__(
  File "/home/rasmus/.cache/pypoetry/virtualenvs/xray-0VIpMuiQ-py3.8/lib/python3.8/site-packages/dagster/_core/definitions/pipeline_definition.py", line 283, in __init__
    self._all_node_defs = _build_all_node_defs(self._current_level_node_defs)
  File "/home/rasmus/.cache/pypoetry/virtualenvs/xray-0VIpMuiQ-py3.8/lib/python3.8/site-packages/dagster/_core/definitions/pipeline_definition.py", line 781, in _build_all_node_defs
    raise DagsterInvalidDefinitionError(

Putting a breakpoint there, the name associated with the node definition is just foo, rather than the prefixed a__foo which I get if I set all the prefixes manually on each asset.

What did you expect to happen?

Since the assets are added to the definition object with different prefixes, I'd expect there to be no naming clash, and behaviour should be independent of how the prefix got added.

How to reproduce?

make two modules with identically named assets and load them into definitions using load_assets_from_modules with a unique prefix for each module.

Deployment type

Local

Deployment details

No response

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

OwenKephart commented 1 year ago

Unfortunately, there's no easy fix here as the name of the underlying operation gets assigned a name when it is created, and this name cannot be changed later on in the process (at least without a lot of internal rewiring). Some options:

Bonnevie commented 1 year ago

Fair if it's too much work to support, but I'd argue that key_prefix should then be removed as an option on load_assets_from_modules; I think most users would expect it to operate identically to using key_prefix on the assets individually.

thalesfm commented 1 year ago

I agree, this behavior is very confusing. The documentation often compares asset keys to files in a hierarchical file system, so I wouldn't expect assets like "a/foo" and "b/foo" to conflict with each other.

ghost commented 1 year ago

Unfortunately I believe that the bug affects load_assets_from_modules even when you specify the key_asset within the assets decorators. I have the following folders, and read_offer is a parent of raw_cleaning_offer:

|store_a
 |assets
   - read_offer
   - raw_cleaning_offer
|store_b
 |assets
   - read_offer
   - raw_cleaning_offer

I first imported my assets like this without specifying a key_prefix in their decorator :

defs = Definitions(
    assets=[
        *load_assets_from_package_module(
            store_a_assets, auto_materialize_policy=AutoMaterializePolicy.eager(), key_prefix="store_a"
        ),
        *load_assets_from_package_module(
          store_b_assets, auto_materialize_policy=AutoMaterializePolicy.eager(), key_prefix="store_b"
        )] )

Which triggers the Detected conflicting node definitions error.

However, I then removed the key prefix from *load_assets_from_package_module and added them on every assets, eg

@asset(compute_kind="pandas", key_prefix="store_a")
def read_offer(context: AssetExecutionContext, gcs: GCSResource) -> pd.DataFrame: 
      [...]

When you only import store_a or only store_b with the correct prefixes :

defs = Definitions(
    assets=[
        *load_assets_from_package_module(
            store_a_assets, auto_materialize_policy=AutoMaterializePolicy.eager()
        )]

It works, however when you import both, it fails with another error :

UserWarning: Error loading repository location dagster_etl:dagster._core.errors.DagsterInvalidDefinitionError: Input asset '["read_offer"]' for asset '["store_a", "raw_cleaning_offer"]' is not produced by any of the provided asset ops and is not one of the provided sources

This appears to be caused by read_offer being present in both modules and being incorrectly named.

Now the most striking part of this behavior : if I now move read_offer and raw_cleaning_offer from store_b into assets/store_a :

|store_a
 |assets
  - read_offer (key_prefix = "store_a")
  - read_offer (key_prefix = "store_b")
  - raw_cleaning_offer (key_prefix = "store_a")
  - raw_cleaning_offer (key_prefix = "store_b")

And use a single load_assets :

defs = Definitions(
    assets=[
        *load_assets_from_package_module(
            store_a_assets, auto_materialize_policy=AutoMaterializePolicy.eager()
        )]

It now works correctly.

This bug is not too much of an issue for the moment but it will force my team to rename every asset depending on the store, which in my view spoils a bit the very practical folder view of the assets catalog. I could do without the load_assets_from_packagemodule but it is a very nice feature to ease assets importation in my \_init__ file.

I'm afraid that as we scale the number of assets it may trigger unvoluntary name conflicts with a not so obvious error message if it happens.

dridk commented 1 year ago

Same for us. We cannot have "source1/results" and "source2/results" using load_assets_from_modules(key_prefix).

Without this feature, we cannot organize asset using prefix.