dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.54k stars 1.45k forks source link

Memoizing dynamic ops: `Failure condition: Unexpected execution step type UnresolvedCollectExecutionStep` #7008

Open multimeric opened 2 years ago

multimeric commented 2 years ago

Summary

An error is encountered when trying to cache a dynamic output using SourceHashVersionStrategy

Reproduction

from dagster import DynamicOut, DynamicOutput, op, SourceHashVersionStrategy, job

@op(out=DynamicOut(int))
def dynamic():
    for i in range(10):
        yield DynamicOutput(
            value=i,
            mapping_key=str(i)
        )

@op()
def sum_dynamic(nums: list[int]) -> int:
    return sum(nums)

@job(version_strategy=SourceHashVersionStrategy())
def maths():
    sum_dynamic(dynamic().collect())
Traceback (most recent call last):
  File "/home/migwell/.cache/pypoetry/virtualenvs/dagstertest-a3v85qrq-py3.9/bin/dagster", line 8, in <module>
    sys.exit(main())
  File "/home/migwell/.cache/pypoetry/virtualenvs/dagstertest-a3v85qrq-py3.9/lib/python3.9/site-packages/dagster/cli/__init__.py", line 50, in main
    cli(auto_envvar_prefix=ENV_PREFIX)  # pylint:disable=E1123
  File "/home/migwell/.cache/pypoetry/virtualenvs/dagstertest-a3v85qrq-py3.9/lib/python3.9/site-packages/click/core.py", line 1128, in __call__
    return self.main(*args, **kwargs)
  File "/home/migwell/.cache/pypoetry/virtualenvs/dagstertest-a3v85qrq-py3.9/lib/python3.9/site-packages/click/core.py", line 1053, in main
    rv = self.invoke(ctx)
  File "/home/migwell/.cache/pypoetry/virtualenvs/dagstertest-a3v85qrq-py3.9/lib/python3.9/site-packages/click/core.py", line 1659, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/migwell/.cache/pypoetry/virtualenvs/dagstertest-a3v85qrq-py3.9/lib/python3.9/site-packages/click/core.py", line 1659, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/migwell/.cache/pypoetry/virtualenvs/dagstertest-a3v85qrq-py3.9/lib/python3.9/site-packages/click/core.py", line 1395, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/migwell/.cache/pypoetry/virtualenvs/dagstertest-a3v85qrq-py3.9/lib/python3.9/site-packages/click/core.py", line 754, in invoke
    return __callback(*args, **kwargs)
  File "/home/migwell/.cache/pypoetry/virtualenvs/dagstertest-a3v85qrq-py3.9/lib/python3.9/site-packages/dagster/cli/job.py", line 129, in job_execute_command
    execute_execute_command(instance, kwargs, True)
  File "/home/migwell/.cache/pypoetry/virtualenvs/dagstertest-a3v85qrq-py3.9/lib/python3.9/site-packages/dagster/core/telemetry.py", line 111, in wrap
    result = f(*args, **kwargs)
  File "/home/migwell/.cache/pypoetry/virtualenvs/dagstertest-a3v85qrq-py3.9/lib/python3.9/site-packages/dagster/cli/pipeline.py", line 375, in execute_execute_command
    result = do_execute_command(pipeline, instance, config, mode, tags, solid_selection, preset)
  File "/home/migwell/.cache/pypoetry/virtualenvs/dagstertest-a3v85qrq-py3.9/lib/python3.9/site-packages/dagster/cli/pipeline.py", line 561, in do_execute_command
    return execute_pipeline(
  File "/home/migwell/.cache/pypoetry/virtualenvs/dagstertest-a3v85qrq-py3.9/lib/python3.9/site-packages/dagster/core/execution/api.py", line 383, in execute_pipeline
    return _logged_execute_pipeline(
  File "/home/migwell/.cache/pypoetry/virtualenvs/dagstertest-a3v85qrq-py3.9/lib/python3.9/site-packages/dagster/core/telemetry.py", line 111, in wrap
    result = f(*args, **kwargs)
  File "/home/migwell/.cache/pypoetry/virtualenvs/dagstertest-a3v85qrq-py3.9/lib/python3.9/site-packages/dagster/core/execution/api.py", line 425, in _logged_execute_pipeline
    pipeline_run = instance.create_run_for_pipeline(
  File "/home/migwell/.cache/pypoetry/virtualenvs/dagstertest-a3v85qrq-py3.9/lib/python3.9/site-packages/dagster/core/instance/__init__.py", line 772, in create_run_for_pipeline
    execution_plan = create_execution_plan(
  File "/home/migwell/.cache/pypoetry/virtualenvs/dagstertest-a3v85qrq-py3.9/lib/python3.9/site-packages/dagster/core/execution/api.py", line 755, in create_execution_plan
    return ExecutionPlan.build(
  File "/home/migwell/.cache/pypoetry/virtualenvs/dagstertest-a3v85qrq-py3.9/lib/python3.9/site-packages/dagster/core/execution/plan/plan.py", line 983, in build
    return plan_builder.build()
  File "/home/migwell/.cache/pypoetry/virtualenvs/dagstertest-a3v85qrq-py3.9/lib/python3.9/site-packages/dagster/core/execution/plan/plan.py", line 250, in build
    plan = plan.build_memoized_plan(pipeline_def, self.resolved_run_config, instance)
  File "/home/migwell/.cache/pypoetry/virtualenvs/dagstertest-a3v85qrq-py3.9/lib/python3.9/site-packages/dagster/core/execution/plan/plan.py", line 856, in build_memoized_plan
    step_output_versions = resolve_step_output_versions(pipeline_def, self, resolved_run_config)
  File "/home/migwell/.cache/pypoetry/virtualenvs/dagstertest-a3v85qrq-py3.9/lib/python3.9/site-packages/dagster/core/execution/resolve_versions.py", line 167, in resolve_step_output_versions
    step_versions = resolve_step_versions(pipeline_def, execution_plan, resolved_run_config)
  File "/home/migwell/.cache/pypoetry/virtualenvs/dagstertest-a3v85qrq-py3.9/lib/python3.9/site-packages/dagster/core/execution/resolve_versions.py", line 78, in resolve_step_versions
    for step in execution_plan.get_all_steps_in_topo_order():
  File "/home/migwell/.cache/pypoetry/virtualenvs/dagstertest-a3v85qrq-py3.9/lib/python3.9/site-packages/dagster/core/execution/plan/plan.py", line 701, in get_all_steps_in_topo_order
    return [step for step_level in self.get_all_steps_by_level() for step in step_level]
  File "/home/migwell/.cache/pypoetry/virtualenvs/dagstertest-a3v85qrq-py3.9/lib/python3.9/site-packages/dagster/core/execution/plan/plan.py", line 706, in get_all_steps_by_level
    for step_key_level in toposort(self.get_all_step_deps())
  File "/home/migwell/.cache/pypoetry/virtualenvs/dagstertest-a3v85qrq-py3.9/lib/python3.9/site-packages/dagster/core/execution/plan/plan.py", line 717, in get_all_step_deps
    check.failed(f"Unexpected execution step type {step}")
  File "/home/migwell/.cache/pypoetry/virtualenvs/dagstertest-a3v85qrq-py3.9/lib/python3.9/site-packages/dagster/check/__init__.py", line 1034, in failed
    raise CheckError(f"Failure condition: {desc}")
dagster.check.CheckError: Failure condition: Unexpected execution step type UnresolvedCollectExecutionStep(handle=StepHandle(solid_handle=NodeHandle(name='sum_dynamic', parent=None), key='sum_dynamic'), pipeline_name='maths', step_input_dict={'nums': UnresolvedCollectStepInput(name='nums', dagster_type_key='List.Int', source=FromDynamicCollect(source=FromPendingDynamicStepOutput(step_output_handle=StepOutputHandle(step_key='dynamic', output_name='result', mapping_key=None), solid_handle=NodeHandle(name='sum_dynamic', parent=None), input_name='nums'), solid_handle=NodeHandle(name='sum_dynamic', parent=None), input_name='nums'))}, step_output_dict={'result': StepOutput(solid_handle=NodeHandle(name='sum_dynamic', parent=None), name='result', dagster_type_key='Int', properties=StepOutputProperties(is_required=True, is_dynamic=False, is_asset=False, should_materialize=False))}, tags={})

Additional Info

I guess this relates to #4451, but I wanted to test if this is still a bug since the new major version.

Note that as soon as you remove the version_strategy argument, this works

Dagit UI/UX Issue Screenshots

Additional Info about Your Environment


Message from the maintainers:

Impacted by this bug? Give it a 👍. We factor engagement into prioritization.

yuhan commented 2 years ago

Hi @multimeric thanks for filing the issue. I believe this is related to https://github.com/dagster-io/dagster/issues/4451. cc @dpeng817 to confirm if this is still a failure condition in newer versions.

dpeng817 commented 2 years ago

Hey @multimeric, thanks for raising. Can confirm that this is still a failure condition as of 0.14.6. Until #4451 is solved, we'll still need to error, but I think the error message could be a good deal more clear. The culprit is https://github.com/dagster-io/dagster/blob/9444b2fc8458131f2fdb95971bf6c8c6929915c2/python_modules/dagster/dagster/core/execution/plan/plan.py#L717. cc @alangenfeld , I'm wondering if this is a callsite that we just forgot to update when collect functionality was added, and memoization is hitting it earlier than everything else?