dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.01k stars 1.38k forks source link

define_asset_job does not obey execution config limits #11983

Open j-hulbert opened 1 year ago

j-hulbert commented 1 year ago

Dagster version

1.1.9

What's the issue?

When creating an asset job using define_asset_job no limit is applied to the number of concurrent processes

my_job= define_asset_job(
    name="my_job",
    selection=AssetSelection.groups("my_group"),
    config={
        "execution": {
            "config": {
                "multiprocess": {
                    "max_concurrent": 1,
                },
            }
        }
    },
)

Example from dagster docs uses the "@job" decorator but would expect the same should also work with define_asset_job https://docs.dagster.io/concepts/ops-jobs-graphs/job-execution#default-job-executor

Potential cause: I believe this is caused by a typo in the following code referencing partitions_def instead of executor_def https://github.com/dagster-io/dagster/blob/77711cefe1186902ae44b4dbb330b3430398d471/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py#L67

What did you expect to happen?

Number of concurrent processes during job execution would be limited to 1

How to reproduce?

No response

Deployment type

Local

Deployment details

No response

Additional information

Workaround using executor_def directly with a configured executor

my_job= define_asset_job(
    name="my_job",
    selection=AssetSelection.groups("my_group"),
    executor_def= multiprocess_executor.configured({"max_concurrent": 1})
)

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

j-hulbert commented 1 year ago

Note I patched the line I mentioned above in the dagster code and the execution concurrency limits are obeyed https://github.com/dagster-io/dagster/blob/77711cefe1186902ae44b4dbb330b3430398d471/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py#L67 to

executor_def=check.opt_inst_param(
    executor_def, "executor_def", ExecutorDefinition
),
sryza commented 1 year ago

Interesting - I'm a little surprised that fixes it but certainly can't hurt. Would you be up for submitting a PR?

j-hulbert commented 1 year ago

Started working on the PR for this and having trouble recreating the issue now. Scope of when the limits are not obeyed might be under a more limited scenario I haven't isolated yet.

j-hulbert commented 1 year ago

Haven't had time to dig into this much again but one scenario it appears this isn't obeyed is when running a partition range in 1 run using this technique https://github.com/dagster-io/dagster/discussions/11653 Execution limits only get added if you choose a partition and that technique intentionally does not choose a specific partition