kestra-io / kestra

Orchestration and automation platform to execute millions of scheduled and event-driven workflows declaratively in code and from the UI
https://kestra.io
Apache License 2.0
9.4k stars 689 forks source link

Add Execution Lookup task allowing to implement flow-level timeouts and SLA/SLO as system flows #3350

Open anna-geller opened 6 months ago

anna-geller commented 6 months ago

Feature description

A flow-level timeout (e.g. execution should not take longer than max 1 hour) and flow-level SLA/SLO (e.g. send an email alert if execution takes longer than 1 hour) are commonly requested features. However, implementing them would require periodic polling for execution duration, which would be costly and intrasparent to the user if done in the background.

As an alternative to a polling trigger or internal service, we may leverage a system flow with a lookup task:

The Lookup task can then iterate over each matching execution in ForEachItem to process those executions (sending alerts on missed SLAs, automatically canceling, killing, or restarting executions, or ingesting those to an Operational Analytics tool).

Benefits of this approach:

  1. Simple to understand = a system flow queries for specific executions and iterates over the results (more transparent to the user than a background process + easier to debug/troubleshoot)
  2. Instead of sending 1 SLA alert for each, this approach also allows sending a single alert for multiple executions that failed to meet the SLA/SLO
  3. User has full control e.g. you may have different SLAs and SLOs per flows or namespaces, you can filter executions in a specific way (based on specific inputs, labels) and react to them as you wish (send alert, cancel execution, etc)
  4. You can use the same Lookup task and system flow to programmatically cancel, kill or restart executions matching specific criteria.
  5. You can react to the lack of Executions: e.g. if the Lookup returns 0 rows, it means something is wrong — e.g. you expect 1 execution for a daily flow and if the Lookup task results in 0 results, you may want to get an alert about it.

Example syntax

All properties of the Lookup task are meant to be optional - we can set sensible defaults. The output of this task is an ION file with 1 row per Execution and columns with execution metadata.

(Alternative naming of the task could be Query: io.kestra.core.tasks.executions.Query.)

id: check_sla
namespace: system

tasks:
  - id: lookup
    type: io.kestra.core.tasks.executions.Lookup
    namespace: datateam
    flowId: sla_flow
    triggerId: daily # allows to limit to only executions from specific triggers
    last: PT24H
    labels:
      env: uat
    # revision: latest
    # state: a list of Execution states to query. By default, all states.
    # inputs:
    #   SLA: PT1H
    # since: "{{ execution.startDate }}" # default
    # isSubflow: true/false/null # TBD whether this task can check for child executions of specific parent flow

  - id: send_alert
    type: io.kestra.plugin.scripts.python.Script
    inputFiles:
      executions.ion: "{{ outputs.lookup.uri }}"
    script: |
      read the ION file and implement custom logic 
      to process executions matching the Lookup criteria
      e.g. to send a single alert for multiple SLA/SLO misses

  - id: for_each
    type: io.kestra.core.tasks.flows.ForEachItem
    items: "{{ outputs.lookup.uri }}"
    description: this subflow sends 1 alert for each SLA/SLO miss (retrieved from inputs)
    namespace: system
    flowId: process_sla
    inputs:
      execution: "{{ taskrun.items }}"

triggers:
  - id: daily
    type: io.kestra.core.models.triggers.types.Schedule
    cron: "0 10 * * *"

Example flow for which the above system flow checks SLA:

id: sla_flow
namespace: datateam

labels:
  env: uat

inputs:
  - id: SLA
    type: DURATION
    defaults: PT1H

  - id: SLO
    type: DURATION
    defaults: PT45M 

tasks:
  - id: business_critical_task
    type: io.kestra.core.tasks.log.Log
    message: running {{ task.id }}

triggers:
  - id: daily
    type: io.kestra.core.models.triggers.types.Schedule
    cron: "0 9 * * *"
anna-geller commented 5 months ago

related https://github.com/kestra-io/kestra/issues/2386 and https://github.com/kestra-io/kestra/issues/2397