kestra-io / kestra

:zap: Workflow Automation Platform. Orchestrate & Schedule code in any language, run anywhere, 500+ plugins. Alternative to Zapier, Rundeck, Camunda, Airflow...
https://kestra.io
Apache License 2.0
10.97k stars 947 forks source link

Add pluggable system to allow concatenating ForEachItem results #5456

Open anna-geller opened 1 week ago

anna-geller commented 1 week ago

Feature description

There are many ways users may want to combine subflow execution outputs, and multiple concat strategies are needed depending on the output ID and type. To facilitate this, we want to explore a pluggable system where each plugin can perform custom concatenation strategy.

When implemented, this closes https://github.com/kestra-io/kestra-ee/issues/1934 as the Zip concat strategy would make accessing subflow execution files no longer necessary.

Subflow with 3 types of outputs:

id: json_file_output_child
namespace: company.team

inputs:
  - id: json_file
    type: FILE

tasks:
  - id: debug
    type: io.kestra.plugin.core.log.Log
    message: "{{ json(read(inputs.json_file)) }}"

  - id: jsonata
    type: io.kestra.plugin.transform.jsonata.TransformItems
    from: "{{ inputs.json_file }}"
    expression: |
      {
        "id": id,
        "name": data.Color & ' ' & name
      }

outputs:
  - id: myoutput
    type: FILE
    value: "{{ outputs.jsonata.uri }}"

  - id: mystring
    type: STRING
    value: hello 

  - id: myint
    type: INT
    value: 42

Proposed solution with compressOutputs property allowing multiple outputs to be concatenated in various ways based on the chosen plugin:

id: json_file_output_parent_batch_1
namespace: company.team 

tasks:
  - id: download
    type: io.kestra.plugin.core.http.Download
    uri: https://api.restful-api.dev/objects
    contentType: application/json
    method: GET
    failOnEmptyResponse: true
    timeout: PT15S

  - id: json_to_ion
    type: io.kestra.plugin.serdes.json.JsonToIon
    from: "{{ outputs.download.uri }}"
    newLine: false # regular json

  - id: ion_to_jsonl
    type: io.kestra.plugin.serdes.json.IonToJson
    from: "{{ outputs.json_to_ion.uri }}"
    newLine: true # JSON-L

  - id: for_each_item
    type: io.kestra.plugin.core.flow.ForEachItem
    items: "{{ outputs.ion_to_jsonl.uri }}"
    batch:
      rows: 1
    namespace: company.team
    flowId: json_file_output_child
    wait: true
    transmitFailed: true
    inputs:
      json_file: "{{ taskrun.items }}"
    compressOutputs: 
      - id: myzip
        type: io.kestra.plugin.compress.foreachitem.Archive
        algorithm: TAR
        compression: GZIP
        algorithm: ZIP
        outputs: 
          - myoutput  
      - id: myarray
        type: io.kestra.plugin.compress.Array
        outputs: 
          - myint
          - mystr     
      - id: myion
        type: io.kestra.plugin.compress.Ion
        outputs: # default ALL outputs if not explicitly specified
          - myint
          - mystr     
      - id: custom
        type: io.kestra.plugin.compress.CustomFunction
        outputs: 
          - myint
          - mystr     
        script: | 
          (myint, mystr) {
            init['bla'] += value 
          } 
        init: 
          bla: 0 
anna-geller commented 1 week ago

example use cases and flows that can be used during design https://github.com/kestra-io/examples/tree/main/ForEachItem

Harsh4902 commented 1 week ago

@anna-geller Can I work on this issue?

MilosPaunovic commented 1 week ago

Absolutely, go for it @Harsh4902! 🚀

Harsh4902 commented 5 days ago

How can I check the current behavior of the application?

MilosPaunovic commented 5 days ago

@Harsh4902 Not sure what do you mean?

anna-geller commented 5 days ago

@Harsh4902 this issue is very very difficult, I'd suspect it can take 1-2 weeks. Perhaps you can check issues labeled as a good first issue instead?

Harsh4902 commented 5 days ago

@anna-geller sure. I am unassigning the issue, but will try to work on this.