dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.17k stars 1.4k forks source link

Support memoization in scheduled runs #7046

Open andipiet opened 2 years ago

andipiet commented 2 years ago

Use Case

We have developed a Dagster job that supportes memoization (we have implemented memoizable IO managers and provided a version strategy).

Everything works fine as long as we start the job from the launchpad or we use backfills. However, when we define a schedule for the job and we enable it, we see the job fails with the following exception

Traceback (most recent call last):
  File "...venv/lib/python3.10/site-packages/dagster/scheduler/scheduler.py", line 130, in launch_scheduled_runs
    yield from launch_scheduled_runs_for_schedule(
  File "...venv/lib/python3.10/site-packages/dagster/scheduler/scheduler.py", line 271, in launch_scheduled_runs_for_schedule
    yield from _schedule_runs_at_time(
  File "...venv/lib/python3.10/site-packages/dagster/scheduler/scheduler.py", line 386, in _schedule_runs_at_time
    run = _create_scheduler_run(
  File "...venv/lib/python3.10/site-packages/dagster/scheduler/scheduler.py", line 448, in _create_scheduler_run
    external_execution_plan = repo_location.get_external_execution_plan(
  File "...venv/lib/python3.10/site-packages/dagster/core/host_representation/repository_location.py", line 665, in get_external_execution_plan
    execution_plan_snapshot_or_error = sync_get_external_execution_plan_grpc(
  File "...venv/lib/python3.10/site-packages/dagster/api/snapshot_execution_plan.py", line 58, in sync_get_external_execution_plan_grpc
    raise DagsterUserCodeProcessError.from_error_info(result.error)
dagster.core.errors.DagsterUserCodeProcessError: dagster.core.errors.DagsterInvariantViolationError: Attempted to build memoized execution plan without providing a persistent DagsterInstance to create_execution_plan.

Now, I tried to troubleshoot the issue by analyzing the Dagster source code and it looks like memoization cannot be supported for scheduled jobs. In particular, it looks like in the call here no known state and no Dagster instance information is provided.

Is this analysis correct or am I missing something?

Memoization support would be crucial for our job, that needs to periodically pull data from an external system by using API. In particular, the data is available only for a limited amount of time on the external system, so if we have to re-run the pipeline we would need to use the data that we have already stored (if we try to download them again they may no longer be there)

Ideas of Implementation

Additional Info


Message from the maintainers:

Excited about this feature? Give it a :thumbsup:. We factor engagement into prioritization.

sryza commented 2 years ago

I would expect this to work if you set the MEMOIZED_RUN_TAG on the schedule, or the scheduled job. E.g.

from dagster import MEMOIZED_RUN_TAG

my_schedule = ScheduleDefinition(tags={MEMOIZED_RUN_TAG: True}, ...)

Is that what you tried already?

CC @dpeng817.

dpeng817 commented 2 years ago

I think this is most likely a bug. We should be passing state through when creating the execution plan, which is likely what is causing this error.

andipiet commented 2 years ago

Yes the executor does try to use memoization but it crashes because the required state is not provided