kestra-io / kestra

:zap: Workflow Automation Platform. Orchestrate & Schedule code in any language, run anywhere, 500+ plugins. Alternative to Zapier, Rundeck, Camunda, Airflow...
https://kestra.io
Apache License 2.0
10.45k stars 866 forks source link

​Execution​Status​Condition for transient states #4276

Closed rtl-perse closed 3 weeks ago

rtl-perse commented 3 months ago

Issue description

My goal was to use a Flow trigger with ExecutionFlowCondition to get notified via Teams/Slack for PAUSED human in the loop executions.

From my investigation, the ExecutionFlowCondition does currently only work with terminal states. Can you add support for transient states such as PAUSED or QUEUED? Or if this is not possible, please add this information the documentation.

Example Execution Flow:

id: do_pause
namespace: company.myteam

tasks:
  - id: take_a_break
    type: io.kestra.plugin.core.flow.Pause
    timeout: PT1M

concurrency:
  limit: 1

Trigger Flow (never triggered):

id: trigger_by_pause
namespace: company.myteam

tasks:
  - id: log_trigger_state
    type: io.kestra.plugin.core.log.Log
    message: "state: {{ trigger.state }}"

triggers:
  - id: test_trigger
    type: io.kestra.plugin.core.trigger.Flow
    conditions:
      - type: io.kestra.plugin.core.condition.ExecutionFlowCondition
        namespace: company.myteam
        flowId: do_pause
      - type: io.kestra.plugin.core.condition.ExecutionStatusCondition
        in:
          - QUEUED
          - PAUSED
rtl-perse commented 3 months ago

sorry, please transfer issue into main kestra repo

loicmathieu commented 1 month ago

This is more complex that it appears, today we only check dependent flow triggers when a flow execution ends, so it's not mandatory to add an ExecutionStatusCondition as the flow trigger will be evaluated only on terminated flow by default.

Which means that this flow:

id: trigger_by_pause
namespace: company.myteam

tasks:
  - id: log_trigger_state
    type: io.kestra.plugin.core.log.Log
    message: "state: {{ trigger.state }}"

triggers:
  - id: test_trigger
    type: io.kestra.plugin.core.trigger.Flow
    conditions:
      - type: io.kestra.plugin.core.condition.ExecutionFlowCondition
        namespace: company.myteam
        flowId: do_pause

Is equivalent to:

id: trigger_by_pause
namespace: company.myteam

tasks:
  - id: log_trigger_state
    type: io.kestra.plugin.core.log.Log
    message: "state: {{ trigger.state }}"

triggers:
  - id: test_trigger
    type: io.kestra.plugin.core.trigger.Flow
    conditions:
      - type: io.kestra.plugin.core.condition.ExecutionFlowCondition
        namespace: company.myteam
        flowId: do_pause
      - type: io.kestra.plugin.core.condition.ExecutionStatusCondition
        in:
          - FAILED
          - SUCCESS
          - CANCELED
          - WARNING

To allow non-terminal status in an ExecutionStatusCondition we would need to evaluate flow triggers for each status change, which means that our default would change to all terminated state to any states, which will be a breaking change and a bad default (a trigger with no restriction on any status will be executed on each state change).

We will need to think internally about what we can offer to meet your needs.

rtl-perse commented 1 month ago

For now I can workaround this issue. I will let you know if this changes. Thank you.

Ben8t commented 1 month ago

@rtl-perse can you give us more details about your workaround :) ?

rtl-perse commented 1 month ago

Quick fix is not to use trigger but add a send notification task just before the pause task. Eventually I move to another workaround, where I fetch all PAUSED tasks from specific namespace using your REST-API.