dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.65k stars 1.47k forks source link

Global concurrency_key limits do not affect asset materialization #23508

Open VerdantForge opened 3 months ago

VerdantForge commented 3 months ago

Dagster version

1.7.10

What's the issue?

Op level concurrency limits set globally through the dagster.yaml file don't seem to be working at all. Example from our dagster.yaml

run_coordinator:
  module: dagster.core.run_coordinator
  class: QueuedRunCoordinator
  config:
    max_concurrent_runs: 5
    tag_concurrency_limits:
      - key: "dagster/concurrency_key"
        value: "libretranslate"
        limit: 2

however creating the same configuration at the level of the asset job works just fine

asset_job = define_asset_job(
    name="asset_job",
    description="Provides scheduling to asset job",
    selection=[
        asset_that_we_have.key
    ],
    config={
        "execution": {
            "config": {
                "multiprocess": {
                    "tag_concurrency_limits": [
                        {
                            "key": "dagster/concurrency_key",
                            "value": "libretranslate",
                            "limit": 2,
                        }
                    ],
                },
            }
        }
    },
)

What did you expect to happen?

I expected the only 2 ops to be launched accross the entire system for any number of runs that might be launched in parallel. However this was not the case, even for a single run with a dynamic op which was supposed to be throttled, it launched many in parallel.

How to reproduce?

No response

Deployment type

None

Deployment details

No response

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

divyanshu-in commented 3 months ago

@garethbrickman looking into it

VerdantForge commented 2 months ago

Any progress since last week? Is there some info I can give you?

divyanshu-in commented 2 months ago

I got little busy with some personal stuff, but I started with storing the op/asset tags to the storage so that later on it can be compared in concurrency logic, can I take some more time, I assure you it will be done by end of this week.

prha commented 2 weeks ago

Apologies for the confusion. There are multiple levels of execution that you can configure concurrency on, and we haven't dialed in the precise API to make this clear.

When you configure tag_concurrency_limits on the run coordinator, you're controlling which runs can be in progress at any given time. The tags specified there should be run tags.

When you configure tag_concurrency_limits within the job execution config, you're controlling which ops can be in progress for that given run. This is especially useful when you have some dynamic fanout for that run, and you want to control the number of concurrent processes for a particular set of dynamic ops.

For controlling concurrency across runs, you'll want to set a global concurrency limit on the instance, either from the Deployment > Concurrency limits settings page, or via the CLI command dagster instance concurrency.