kestra-io / kestra

:zap: Open-source workflow automation platform. Orchestrate any language using YAML, hundreds of integrations. Alternative to Airflow, Zapier, RunDeck, Camunda, ...
https://kestra.io
Apache License 2.0
9.48k stars 704 forks source link

Simplify conditional branching for Microservice Orchestration #3469

Open anna-geller opened 6 months ago

anna-geller commented 6 months ago

Feature description

Microservice orchestration is currently challenging in kestra as it requires a lot of nesting.

Here is the simplest possible microservice use case for order processing - note how many extra If tasks are required:

id: ecommerce-order-processing
namespace: ecommerce
description: E-commerce Order Processing Workflow

inputs:
  - id: orderId
    type: STRING

tasks:
  - id: checkInventory
    type: io.kestra.plugin.fs.http.Request
    description: "Check inventory for the order items"
    uri: "http://inventory-service/check"
    method: POST
    contentType: "multipart/form-data"
    formData:
      orderId: "{{inputs.orderId}}"

  - id: checkStock
    type: io.kestra.core.tasks.flows.If
    condition: "{{ outputs.checkInventory.body.inventoryAvailable == true }}"
    then: 
    - id: processPayment
      type: io.kestra.plugin.fs.http.Request
      description: "Process payment for the order"
      uri: "http://payment-service/process"
      formData:
        orderId: "{{inputs.orderId}}"

    - id: confirm
      type: io.kestra.core.tasks.flows.If
      condition: "{{ outputs.processPayment.body.paymentResult == true }}"
      then:
        - id: orderConfirmation
          type: io.kestra.plugin.fs.http.Request
          description: "Confirm the order and notify the customer"
          uri: "http://order-service/confirm"
          formData:
            orderId: "{{inputs.orderId}}"
        - id: confirm2
          type: io.kestra.core.tasks.flows.If
          condition: "{{ outputs.orderConfirmation.body.status == 'SUCCESS' }}"
          then:
          - id: arrangeShipping
            type: io.kestra.plugin.fs.http.Request
            description: "Arrange shipping for the order"
            uri: "http://shipping-service/arrange"
            method: POST
            formData:
              orderId: "{{inputs.orderId}}"
          - id: confirm3
            type: io.kestra.core.tasks.flows.If
            condition: "{{ outputs.arrangeShipping.body.status == 'SUCCESS' }}"
            then:
            - id: updateDeliveryStatus
              type: io.kestra.plugin.fs.http.Request
              description: "Update the delivery status of the order"
              uri: "http://delivery-service/updateStatus"
              method: POST
              formData:
                orderId: "{{inputs.orderId}}"

pluginDefaults:
  - type: io.kestra.plugin.fs.http.Request
    values:
      contentType: "multipart/form-data"
      method: POST 

The topology becomes complex with all those extra If tasks so that you can barely see the real tasks:

image

Possible solutions to explore

  1. Create a new, more microservice-friendly HTTP task that accepts a condition + then + else arguments (I'm aware it might not be feasible as this requires a Flowable task)
  2. Add a condition argument to the existing HTTP Request task
  3. Add a condition argument to the core Runnable task (likely most complex?)
  4. Your idea - add in comments
anna-geller commented 5 months ago

One possibility with BEFORE and AFTER checks:

id: ecommerce-order-processing
namespace: ecommerce
description: E-commerce Order Processing Workflow

inputs:
  - id: orderId
    type: STRING

tasks:
  - id: checkInventory
    type: io.kestra.plugin.fs.http.Request
    description: Check inventory for the order items
    uri: http://inventory-service/check
    contentType: multipart/form-data
    formData:
      orderId: "{{ inputs.orderId }}"

  - id: processPayment
    type: io.kestra.plugin.fs.http.Request
    description: Process payment for the order
    uri: http://payment-service/process
    formData:
      orderId: "{{ inputs.ordered }}"    
    checks: 
      - type: BEFORE
        description: Check stock before accepting payment
        condition: "{{ outputs.checkInventory.body.inventoryAvailable == true }}"
        onConditionFail:
          - type: RUN_SUBFLOW
            flowId: alerting_flow_id
            namespace: myteam
          - type: SET_TASK_RUN_STATE
            state: WARNING # FAILED, WARNING, CANCELLED, KILLED

      - type: AFTER
        description: Validate payment result
        condition: "{{ outputs.processPayment.body.paymentResult == true }}"
        onConditionFail: # optional, by default failed condition leads to failing the taskrun
          - type: SET_TASK_RUN_STATE
            state: CANCELLED # FAILED, WARNING, CANCELLED, KILLED

  - id: orderConfirmation
    type: io.kestra.plugin.fs.http.Request
    description: Confirm the order and notify the customer
    uri: http://order-service/confirm
    formData:
      orderId: "{{ inputs.orderId }}"

  - id: arrangeShipping
    type: io.kestra.plugin.fs.http.Request
    description: Arrange shipping for the order
    uri: http://shipping-service/arrange
    formData:
      orderId: "{{ inputs.orderId }}"
    checks:
      - type: BEFORE
        condition: "{{ outputs.orderConfirmation.body.status == 'SUCCESS' }}"
        description: Check stock before accepting payment        
      - type: AFTER
        condition: "{{ outputs.arrangeShipping.body.status == 'SUCCESS' }}"

  - id: updateDeliveryStatus
    type: io.kestra.plugin.fs.http.Request
    description: Update the delivery status of the order
    uri: http://delivery-service/updateStatus
    formData:
      orderId: "{{ inputs.orderId }}"

taskDefaults:
  - type: io.kestra.plugin.fs.http.Request
    values:
      contentType: multipart/form-data
      method: POST