dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.65k stars 1.47k forks source link

Enable limiting run concurrency for non-job asset runs #15752

Open sryza opened 1 year ago

sryza commented 1 year ago

You can attach tags to jobs that limit concurrency of runs of those jobs.

There's no clear way to do this for asset runs that aren't part of jobs:

You can specify tags that get attached to all auto-materialize runs: https://docs.dagster.io/concepts/assets/asset-auto-execution#run-tags, but this doesn't help with manually launched runs.

What we've heard

akoumjian commented 1 year ago

For what it's worth, we were hoping this would work by using the op_tags keyword in the @asset decorator. It does appear to properly respect our dagster-k8s/config related tags, as we're using the k8s executor.

sryza commented 1 year ago

@akoumjian the issue is that run-level concurrency limits are based on run tags, not op tags.

janosroden commented 1 year ago

@sryza which direction to go? To make QueuedRunCoordinatorDaemon check the tags of steps of the runs OR to tag runs with the union of the tags of the steps?

sryza commented 1 year ago

@janosroden that is a good question that I think we need to do a more detailed investigation of the pros and cons to figure out the right answer to.

prha commented 4 months ago

I think we have settled on using concurrency limits on individual assets and then using throttling settings on the run coordinator: https://docs.dagster.io/guides/limiting-concurrency-in-data-pipelines#throttling-concurrency-limited-runs

akoumjian commented 1 month ago

Hi @prha, I'm attempting to use this throttling, but it appears the setting is not rendering to the dagster.yaml file from the helm values. Here is our relevant config (which in full, validates).

spec:
  values:  
    dagsterDaemon:  
      runCoordinator:
        config:
          queuedRunCoordinator:
            # For our shared clusters, limit each instance to 20 concurrent runs
            maxConcurrentRuns: ${var.maxConcurrentRuns["${environment.name}"] || 20}
            tagConcurrencyLimits:
              - key: "10-per"
                value:
                  applyLimitPerUniqueValue: true
                limit: 10
              - key: "20-per"
                value:
                  applyLimitPerUniqueValue: true
                limit: 20
              - key: "50-per"
                value:
                  applyLimitPerUniqueValue: true
                limit: 50
            dequeueIntervalSeconds: ~
            dequeueUseThreads: ~
            dequeueNumWorkers: ~
          blockOpConcurrencyLimitedRuns:
            enabled: true
            opConcurrencySlotBuffer: 1

Deploying this passes type validation, but I can see the rendered config is missing this section.

...
run_coordinator:
  module: dagster.core.run_coordinator
  class: QueuedRunCoordinator
  config:

    max_concurrent_runs: 20
    tag_concurrency_limits:
    - key: 10-per
      limit: 10
      value:
        applyLimitPerUniqueValue: true
    - key: 20-per
      limit: 20
      value:
        applyLimitPerUniqueValue: true
    - key: 50-per
      limit: 50
      value:
        applyLimitPerUniqueValue: true
...

Is it possible that the chart is not properly rendering the values?

alangenfeld commented 1 month ago

what version of the helm chart are you using?

akoumjian commented 1 month ago

1.8.7

alangenfeld commented 1 month ago

from the sample above, it looks like blockOpConcurrencyLimitedRuns is indented in the config section sibling to queuedRunCoordinator and it should actually be in the queuedRunCoordinator section. blockOpConcurrencyLimitedRuns should be at the same indent level as dequeueNumWorkers

thejerrybao commented 2 weeks ago

Hi @alangenfeld, I'm having a similar issue; I've put that setting under queuedRunCoordinator and that setting isn't being rendered in the dagster.yaml config.

Helm

      ...
      runCoordinator:
        config:
          queuedRunCoordinator:
            maxConcurrentRuns: 50
            blockOpConcurrencyLimitedRuns:
              enabled: true
              opConcurrencySlotBuffer: 0

Dagster Config

run_coordinator:
  module: dagster._core.run_coordinator
  class: QueuedRunCoordinator
  config:
    dequeue_num_workers: 4
    dequeue_use_threads: true
    max_concurrent_runs: 50
alangenfeld commented 2 weeks ago

alright found the issue and landed a fix, should go out in the release this week