dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.52k stars 1.45k forks source link

Attempting to access run_id, but it was not provided when constructing the OutputContext #24354

Open tuxiano opened 1 month ago

tuxiano commented 1 month ago

Dagster version

1.8.5

What's the issue?

I try to pass an assets to an op through a graph, but dagster complains that:

Attempting to access run_id, but it was not provided when constructing the OutputContex

What did you expect to happen?

I would expect, that it is a rather straight forward task to pass assets as input to an op, but I'm struggling

How to reproduce?

I created a pytest, which should provide all the needed information:

def test_ins():
    @asset
    def asset_1() -> int:
        return 1

    @asset
    def asset_2() -> int:
        return 2

    # Define the op that takes multiple inputs
    @op(
        ins={
            "input_1": In(),
            "input_2": In(),
        },
        out=Out(int),
    )
    def combine_inputs(context, input_1: int, input_2: int) -> int:
        context.log.info("Hello, world!")
        return input_1 + input_2

    # Define the graph that calls the op
    @graph
    def my_graph():
        return combine_inputs(asset_1, asset_2)

    # Define the job that executes the graph
    my_job = my_graph.to_job()

    # Define the assets and job in a Definitions object
    defs = Definitions(assets=[asset_1, asset_2], jobs=[my_job])

    # Create an ephemeral instance
    instance = DagsterInstance.ephemeral()

    # Materialize the assets first
    result = materialize_to_memory([asset_1, asset_2], instance=instance)
    assert result.success

    # Get the job definition from Definitions
    my_job_def = defs.get_job_def("my_graph")

    # Generate a valid UUID for the run_id
    custom_run_id = str(uuid.uuid4())

    # Execute the job with the custom run_id
    result = my_job_def.execute_in_process(instance=instance, run_id=custom_run_id)
    assert result.success

Deployment type

Local

Deployment details

I use a local installation with python 3.11 and the latest dagster os edition.

Additional information

Full error message:

poweranalytics\poweranalytics_tests\test_assets.py:244: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
..\..\..\..\_SHARED_DATA\venv\dagster_117_p311\Lib\site-packages\dagster\_core\definitions\job_definition.py:726: in execute_in_process
    return core_execute_in_process(
..\..\..\..\_SHARED_DATA\venv\dagster_117_p311\Lib\site-packages\dagster\_core\execution\execute_in_process.py:75: in core_execute_in_process
    event_list = list(execute_run_iterable)
..\..\..\..\_SHARED_DATA\venv\dagster_117_p311\Lib\site-packages\dagster\_core\execution\api.py:888: in __iter__
    yield from self.iterator(
..\..\..\..\_SHARED_DATA\venv\dagster_117_p311\Lib\site-packages\dagster\_core\execution\api.py:755: in job_execution_iterator
    for event in job_context.executor.execute(job_context, execution_plan):
..\..\..\..\_SHARED_DATA\venv\dagster_117_p311\Lib\site-packages\dagster\_core\executor\in_process.py:54: in execute
    yield from iter(
..\..\..\..\_SHARED_DATA\venv\dagster_117_p311\Lib\site-packages\dagster\_core\execution\api.py:888: in __iter__
    yield from self.iterator(
..\..\..\..\_SHARED_DATA\venv\dagster_117_p311\Lib\site-packages\dagster\_core\executor\in_process.py:25: in inprocess_execution_iterator
    yield from inner_plan_execution_iterator(
..\..\..\..\_SHARED_DATA\venv\dagster_117_p311\Lib\site-packages\dagster\_core\execution\plan\execute_plan.py:86: in inner_plan_execution_iterator
    for step_event in check.generator(dagster_event_sequence_for_step(step_context)):
..\..\..\..\_SHARED_DATA\venv\dagster_117_p311\Lib\site-packages\dagster\_core\execution\plan\execute_plan.py:344: in dagster_event_sequence_for_step
    raise error
..\..\..\..\_SHARED_DATA\venv\dagster_117_p311\Lib\site-packages\dagster\_core\execution\plan\execute_plan.py:245: in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
..\..\..\..\_SHARED_DATA\venv\dagster_117_p311\Lib\site-packages\dagster\_core\execution\plan\execute_step.py:465: in core_dagster_event_sequence_for_step
    for event_or_input_value in step_input.source.load_input_object(
..\..\..\..\_SHARED_DATA\venv\dagster_117_p311\Lib\site-packages\dagster\_core\execution\plan\inputs.py:150: in load_input_object
    yield from _load_input_with_input_manager(loader, load_input_context)
..\..\..\..\_SHARED_DATA\venv\dagster_117_p311\Lib\site-packages\dagster\_core\execution\plan\inputs.py:620: in _load_input_with_input_manager
    value = input_manager.load_input(context)
..\..\..\..\_SHARED_DATA\venv\dagster_117_p311\Lib\site-packages\dagster\_core\storage\mem_io_manager.py:21: in load_input
    keys = tuple(context.get_identifier())
..\..\..\..\_SHARED_DATA\venv\dagster_117_p311\Lib\site-packages\dagster\_core\execution\context\input.py:440: in get_identifier
    return self.upstream_output.get_identifier()
..\..\..\..\_SHARED_DATA\venv\dagster_117_p311\Lib\site-packages\dagster\_core\execution\context\output.py:589: in get_identifier
    run_id = self.run_id
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <dagster._core.execution.context.output.OutputContext object at 0x00000214D2901ED0>

    @public
    @property
    def run_id(self) -> str:
        """The id of the run that produced the output."""
        if self._run_id is None:
>           raise DagsterInvariantViolationError(
                "Attempting to access run_id, "
                "but it was not provided when constructing the OutputContext"
            )
E           dagster._core.errors.DagsterInvariantViolationError: Attempting to access run_id, but it was not provided when constructing the OutputContext

..\..\..\..\_SHARED_DATA\venv\dagster_117_p311\Lib\site-packages\dagster\_core\execution\context\output.py:217: DagsterInvariantViolationError

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

maximearmstrong commented 3 weeks ago

Hey @tuxiano - there are two problems with this code.

First, your graph doesn't have inputs, but your op does, causing problems when you call execute_in_progress, because the parameter input_values would be required.

Second, if you want to define assets as upstream dependencies for a graph, I would recommend using a graph-backed asset instead, where you can leverage an op while using assets.

This works:

from dagster import (
    Definitions,
    In,
    Out,
    asset,
    define_asset_job,
    graph_asset,
    materialize_to_memory,
    op,
)

def test_graph():
    @asset
    def asset_1() -> int:
        return 1

    @asset
    def asset_2() -> int:
        return 2

    # Define the op that takes multiple inputs
    @op(
        ins={
            "input_1": In(),
            "input_2": In(),
        },
        out=Out(int),
    )
    def combine_inputs(context, input_1: int, input_2: int) -> int:
        context.log.info("Hello, world!")
        return input_1 + input_2

    # Define the graph that calls the op
    @graph_asset
    def my_graph(asset_1, asset_2):
        return combine_inputs(asset_1, asset_2)

    # Define the assets and job in a Definitions object
    defs = Definitions(assets=[asset_1, asset_2, my_graph])

    # Test materializing the assets
    result = materialize_to_memory([asset_1, asset_2, my_graph])
    assert result.success

    # Test executing a job
    my_job = define_asset_job("my_graph_job", selection=["asset_1", "asset_2", "my_graph"]).resolve(
        asset_graph=defs.get_repository_def().asset_graph
    )
    result = my_job.execute_in_process()
    assert result.success