dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.23k stars 1.41k forks source link

Awkward to use graph-backed assets with no-IO-manager deps #18934

Open mikolak-net opened 9 months ago

mikolak-net commented 9 months ago

Dagster version

dagster, version 1.5.13

What's the issue?

It is not possible for a Graph-Backed Asset to be a direct dependency of an External Asset. Attempting to do so will result in a "bogus" materialization retrieval error.

What did you expect to happen?

Graph-Backed Assets should be able to supply a superset of functionality of @asset-based Assets. However, they lack a deps parameter.

How to reproduce?

Launch the included code as a file with dagster dev and turn Auto-Materialization on.

from dagster import (
    external_asset_from_spec,
    AssetSpec,
    graph_asset,
    AssetIn,
    sensor,
    DefaultSensorStatus,
    SensorEvaluationContext,
    SensorResult,
    AssetMaterialization,
    Definitions,
    op,
    OpExecutionContext,
    AutoMaterializePolicy,
)

EXTERNAL_ASSET_KEY = "external_asset"
METADATA_KEY = "some"
external_asset = external_asset_from_spec(AssetSpec(EXTERNAL_ASSET_KEY))

@op
def retrieve_external_asset_value(context: OpExecutionContext, external_asset_in) -> str:
    mat = context.instance.get_latest_materialization_event(
        AssetSpec(EXTERNAL_ASSET_KEY)
    ).asset_materialization
    return mat.metadata[METADATA_KEY]

@graph_asset(
    ins={"external_asset_in": AssetIn(EXTERNAL_ASSET_KEY)},
    auto_materialize_policy=AutoMaterializePolicy.eager(),
)
def graph_asset(
    external_asset_in,
) -> str:
    return retrieve_external_asset_value(external_asset_in)

@sensor(default_status=DefaultSensorStatus.RUNNING)
def test_sensor(context: SensorEvaluationContext):
    generated = context.cursor or False

    if not generated:
        yield SensorResult(
            asset_events=[AssetMaterialization(EXTERNAL_ASSET_KEY, metadata={METADATA_KEY: "value"})]
        )

    context.update_cursor("1")

defs = Definitions(assets=[external_asset, graph_asset], sensors=[test_sensor])```

Running the code will eventually result in an error of the form:

dagster._core.errors.DagsterExecutionLoadInputError: Error occurred while loading input "external_asset_in" of step "graph_asset.retrieve_external_asset_value"::

FileNotFoundError: [Errno 2] No such file or directory: '<PATH_TO_DAGSTER>/storage/external_asset'

Deployment type

None

Deployment details

No response

Additional information

  1. This error can be worked around by adding an @asset-based:
    • "in-between" the External Asset and the Graph-Backed Asset
    • the newly created asset should include the External Asset spec in its deps parameter.
  2. The error appears to ultimately stem from the fact that Dagster attempts to load an asset value where there is none (since there's currently no possibility for an External Asset to have a value). A related issue appears to be this one.

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

benpankow commented 9 months ago

Hi @mikolak-net, will pass this on to the external assets team.

For now, one workaround would be to supply a custom, no-op input manager which will let you keep the dependency between the external asset and the graph-backed-asset but avoid this error:


class NoOpInputManager(ConfigurableIOManager):
    def load_input(self, context):
        return None

    def handle_output(self, context, obj):
        pass

@op(ins={"external_asset_in": In(input_manager_key="no_op")})
def retrieve_external_asset_value(
    context: OpExecutionContext, external_asset_in
) -> Any:
    ...

@graph_asset(
    ins={"external_asset_in": AssetIn(EXTERNAL_ASSET_KEY)},
    auto_materialize_policy=AutoMaterializePolicy.eager(),
)
def graph_asset(
    external_asset_in,
) -> str:
    return retrieve_external_asset_value(external_asset_in)

...

defs = Definitions(
    assets=[external_asset, graph_asset],
    sensors=[test_sensor],
    resources={"no_op": NoOpInputManager()},
)
mikolak-net commented 9 months ago

@benpankow : oh yeah, that's a considerably less-intrusive workaround in the context of the DAGs, thanks!