dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.62k stars 1.46k forks source link

`op_concurrency_slot_buffer: 0` not working as expected in some cases #25109

Open mmutso-boku opened 4 weeks ago

mmutso-boku commented 4 weeks ago

What's the issue?

I have been testing out the concurrency limits, specifically the use case of limiting concurrency across assets https://docs.dagster.io/guides/limiting-concurrency-in-data-pipelines#limiting-opasset-concurrency-across-runs

During my testing I had difficulty understanding the behaviour of op_concurrency_slot_buffer value 0 vs 1 (and higher)

As I understand this currently:

The issue seems to be that the concurrency key configuration for a new concurrency_key found in an asset's op_tags is created once the run is started (not when queued), so for a new concurrency_key and with op_concurrency_slot_buffer value of 0, the runs stay in queued status infinitely.

The workaround for this is to: A) Set op_concurrency_slot_buffer value to greater than 0, let dagster create the concurrency key configurations, then once they exists, you can set op_concurrency_slot_buffer value to 0 and it will work as expected. B) Let op_concurrency_slot_buffer have value of 0 but manually create the concurrency key configuration that is stuck in queued status. After this, the queue for that key will get unstuck.

What did you expect to happen?

I expected that new concurrency key configurations will be created even with op_concurrency_slot_buffer: 0 with the runs not being stuck in queued status

How to reproduce?

@asset(op_tags={'dagster/concurrency_key': 'dummy'},
       group_name='dummy')
def dummy_asset():
    sleep(10)
    return
concurrency:
  default_op_concurrency_limit: 1

run_coordinator:
  module: dagster.core.run_coordinator
  class: QueuedRunCoordinator
  config:
    max_concurrent_runs: 10
    block_op_concurrency_limited_runs:
      enabled: true
      op_concurrency_slot_buffer: 0

run_monitoring:
  enabled: true
  free_slots_after_run_end_seconds: 30

Materialize the dummy asset multiple times to they would try to run concurrently. Observe that they will be all in QUEUED status. You can use the workarounds written above.

Dagster version

1.8.8

Deployment type

Dagster Helm chart

Deployment details

No response

Additional information

If this is actually working as expected, then the documentation needs to be updated accordingly

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization. By submitting this issue, you agree to follow Dagster's Code of Conduct.

mmutso-boku commented 1 week ago

Tested this again with 1.8.13 locally - still can reproduce the issue

prha commented 1 week ago

@mmutso-boku Apologies. It looks like this just missed the cutoff for 1.8.13. I can ensure that this makes it into 1.8.14. I'll keep this open until that goes out.