kestra-io / kestra

:zap: Workflow Automation Platform. Orchestrate & Schedule code in any language, run anywhere, 500+ plugins. Alternative to Zapier, Rundeck, Camunda, Airflow...
https://kestra.io
Apache License 2.0
10.67k stars 918 forks source link

Concurrency limits: limit concurrent executions #1400

Closed tchiotludo closed 11 months ago

tchiotludo commented 1 year ago

Be able to limit the number of concurrent execution for a flow, ex: I want to have at max 5 executions at the same time:

(need for a specific property to handle queuing or canceling execution try when concurrency threshold is reached)

anna-geller commented 1 year ago

possible attributes:

until this is implemented, I was wondering what would be a workaround - would something like this already work @tchiotludo? not sure how the Set state and parsing results from Counts should be handled

id: concurrencyLimited
namespace: dev

tasks:
  - id: counts
    type: io.kestra.core.tasks.executions.Counts
    expression: "{{ gte count 5 }}"
    states: 
      - RUNNING
    flows:
      - namespace: dev
        flowId: concurrencyTest
    startDate: "{{ now() | dateAdd(-1, 'DAYS') }}"
  - id: logResult
    type: io.kestra.core.tasks.log.Log
    message: "{{outputs.counts.results}}"
  - id: cancel
    type: io.kestra.core.tasks.flows.EachParallel
    value: "{{outputs.counts.results}}"
    tasks: 
      - id: setState
        type: io.kestra.core.tasks.states.Set
        name: KILLED
tchiotludo commented 1 year ago

in fact, Counts was done by design to handle notifications when there is no execution for a flow for an amount of time. I wouldn't use it to handle the orchestration logic. I did't see a proper workaround easily right now, maybe state could work, but the state storage is not transactional, so there is edge case I think

anna-geller commented 1 year ago

Thx for clarifying. Agree, even if this would work, it's an ugly hack :) Better to wait

peauc commented 1 year ago

Hello 👋, this would be very useful for my use case. Im using a webhook and I want to make sure that it cant start concurrent executions as the flow is compute heavy.

vdovychenko commented 1 year ago

This feature would fit for our use case. We have regular extractions from MongoDB. However, sometimes MongoDB is overloaded, extraction takes much longer time and next flow starts, which causes to some mess in the data and even more load on MongoDB.

It would be nice if flow has following parameters (or special task, which would check it):

Probably the task would be more valuable, so we can see how long a flow was in wait state.

yuri1969 commented 1 year ago

Our current workloads would use the concurrency limits feature in two different ways:

Serialization

Executions of a flow which must finish in the order they were created. Therefore we need to limit the flow execution concurrency to one.

Throughput Limit

Certain tasks use an expensive resource which can't be scaled out. The concurrency of such tasks should be limited to prevent overloading the resource.

anna-geller commented 1 year ago

High priority as this is atm the most frequently upvoted issue + a critical feature for BPM + microservice use cases.

This issue allows us to limit the number of concurrent executions for a flow in general e.g. "I want to have at most 5 executions of flow X at the same time". Other executions need to be queued or canceled.

Note that this issue is different from allowing concurrent scheduled executions (e.g. triggered from parallel backfills) - https://github.com/kestra-io/kestra/issues/311

anna-geller commented 1 year ago
id: myflow
namespace: dev

concurrencyLimit:
   maxExecutions: 1
   behavior: ENUM

tasks:
  ...

For the behavior, it should provide the following options: