kestra-io / kestra

:zap: Workflow Automation Platform. Orchestrate & Schedule code in any language, run anywhere, 500+ plugins. Alternative to Zapier, Rundeck, Camunda, Airflow...
https://kestra.io
Apache License 2.0
13.21k stars 1.15k forks source link

Script tasks cannot directly access output files from `io.kestra.plugin.core.flow.ForEachItem` Subflows #4996

Open yorickdevries opened 2 months ago

yorickdevries commented 2 months ago

Describe the issue

This is the yaml file of my flow using the plugin io.kestra.plugin.core.flow.ForEachItem. It calls the flow comp.batch which processes a subset of 100 rows of the input file.

id: main_parallel
namespace: comp.main_parallel
tasks:
  - id: "connector"
    type: io.kestra.plugin.core.flow.Subflow
    namespace: comp.connector
    flowId: connector

  - id: json_to_ion
    type: io.kestra.plugin.serdes.json.JsonToIon
    from: '{{ outputs.connector.outputs.output_file }}'
    newLine: false # regular json
  - id: ion_to_jsonl
    type: io.kestra.plugin.serdes.json.IonToJson
    from: "{{ outputs.json_to_ion.uri }}"
    newLine: true # JSON-L

  - id: for_each_item
    type: io.kestra.plugin.core.flow.ForEachItem
    items: "{{ outputs.ion_to_jsonl.uri }}"
    batch:
      rows: 100
    namespace: comp.batch
    flowId: batch
    wait: true
    transmitFailed: true
    inputs:
      input_file: '{{ taskrun.items }}'

  - id: merge_outputs
    type: io.kestra.plugin.scripts.shell.Script
    taskRunner:
      type: io.kestra.plugin.core.runner.Process
    outputFiles:
      - output.jsonl
    script: |
      subflow_output_paths="{{ outputs.for_each_item_merge.subflowOutputs }}"
      merged_output="output.jsonl"
      echo "$subflow_output_paths"

      > "$merged_output"
      while IFS= read -r subflow_output_line
      do
        subflow_output_path="$(echo $subflow_output_line | cut -d '"' -f 2)"
        cat "$subflow_output_path" >> "$merged_output"
      done < "$subflow_output_paths"

I am able to split my input into multiple batches and process these in subprocesses. I see that the subprocesses properly output files. Hereafter I am able to read the file containing the paths to the different batches. However, I am not able to read those files:

The merge_outputs step crashes with the following error: 2024-09-20 19:33:54.563 cat: ‘kestra:///comp/batch/batch/executions/3Ft8Yi2jdLF9pfZnhkdb8Z/tasks/ion-to-jsonl/6zicRSyNtzeMuL9WodJNGc/2161082710634191013.jsonl’: No such file or directory.

Could it be that the batch outputs arent properly made available in the main task?

Furthermore, I was wondering whether there is an built-in way in kestra which enables (optional) concatenation of the batch-output files. This would save quite some hassle writing this from scratch as I am doing now.

If you need additional information, I'd gladly hear it. Thanks in advance!

Environment

yorickdevries commented 2 months ago

I also tried this in a local docker container and see the same behavior. Below the flow main_parallel and subflow batch for reproduction. When I run this, I get the error: FileNotFoundError: [Errno 2] No such file or directory: ‘kestra:///comp/batch/batch/executions/7arbliDB81Csvy03zrdi80/tasks/ion-to-jsonl/1SYubDfufXzT5wfHxKMnuh/8121262965891312966.jsonl’ in the task merge_outputs

main flow:

id: main_parallel
namespace: comp.main_parallel

tasks:
  - id: "connector"
    type: io.kestra.plugin.core.http.Download
    uri: https://dummyjson.com/products
  - id: extract_products
    type: io.kestra.plugin.scripts.python.Script
    taskRunner:
      type: io.kestra.plugin.scripts.runner.docker.Docker
    containerImage: python:slim
    outputFiles:
      - "output.json"
    script: |
      import json
      with open('{{ outputs.connector["uri"] }}', 'r') as infile:
        data=json.load(infile)
        with open('output.json', "w") as outfile:
          json.dump(data["products"], outfile, indent=2, sort_keys=True)

  - id: json_to_ion
    type: io.kestra.plugin.serdes.json.JsonToIon
    from: '{{ outputs.extract_products.outputFiles["output.json"] }}'
    newLine: false # regular json
  - id: ion_to_jsonl
    type: io.kestra.plugin.serdes.json.IonToJson
    from: "{{ outputs.json_to_ion.uri }}"
    newLine: true # JSON-L

  - id: for_each_item
    type: io.kestra.plugin.core.flow.ForEachItem
    items: "{{ outputs.ion_to_jsonl.uri }}"
    batch:
      rows: 2
    namespace: comp.batch
    flowId: batch
    wait: true
    transmitFailed: true
    inputs:
      input_file: '{{ taskrun.items }}'

  - id: ion_to_jsonl2
    type: io.kestra.plugin.serdes.json.IonToJson
    from: "{{ outputs.for_each_item_merge.subflowOutputs }}"
    newLine: true # JSON-L

  - id: merge_outputs
    type: io.kestra.plugin.scripts.python.Script
    taskRunner:
      type: io.kestra.plugin.scripts.runner.docker.Docker
    containerImage: python:slim
    script: |
      import json
      with open('{{ outputs.ion_to_jsonl2.uri }}', 'r') as infile:
        lines = infile.readlines()
        for line in lines:
          data = json.loads(line)
          print("file_path: ", data["output_file"])
          with open(data["output_file"], 'r') as infile2:
            # read content of subflowoutput
            print("file_content: ", infile2.read())

subflow:

id: batch
namespace: comp.batch

inputs:
  - id: input_file
    type: FILE

tasks:
  - id: jsonl_to_ion
    type: io.kestra.plugin.serdes.json.JsonToIon
    from: '{{ inputs.input_file }}'
    newLine: true # JSON-L
  - id: ion_to_json
    type: io.kestra.plugin.serdes.json.IonToJson
    from: "{{ outputs.jsonl_to_ion.uri }}"
    newLine: false # regular json
  - id: json_to_ion
    type: io.kestra.plugin.serdes.json.JsonToIon
    from: "{{ outputs.ion_to_json.uri }}"
    newLine: false # regular json
  - id: ion_to_jsonl
    type: io.kestra.plugin.serdes.json.IonToJson
    from: "{{ outputs.json_to_ion.uri }}"
    newLine: true # JSON-L

outputs:
  - id: output_file
    type: FILE
    value: '{{ outputs.ion_to_jsonl.uri }}'
loicmathieu commented 1 month ago

The outputs of the ForEachItem is a list of internal storage files, you cannot read them directly via Python open().

The best would be to read the file before returning it as output, it's not good is good idea if the file is big but if the file is small this is not an issue.

outputs:
  - id: output_file
    type: STRING
    value: '{{ read(outputs.ion_to_jsonl.uri) }}'
yorickdevries commented 1 month ago

Hi Loïc, Thanks for picking this issue up.

Reading the outputs as strings in memory doesn't really solve the problem as the main benefit of using io.kestra.plugin.core.flow.ForEachItem is to split up big files in more managable (less big, but still big) files and process these separately. Furthermore, the output file from a subtask might not even be serializable to a string. If the outputs of the subflows cannot be returned as files in the mainflow, the advantage of running io.kestra.plugin.core.flow.ForEachItem is gone IMO.

It is now possible to get the output files of regular subflows (io.kestra.plugin.core.flow.Subflow) in mainflows. I don't really see why calling one regular subflow vs. one from io.kestra.plugin.core.flow.ForEachItem should behave much different in whether you can access the output files. Is there any possibility to still get access output files from a io.kestra.plugin.core.flow.ForEachItem subflow in a mainflow in a similar way as calling a regular subflow (io.kestra.plugin.core.flow.Subflow)?

loicmathieu commented 1 month ago

@yorickdevries subflow work exactly the same, if you have an output of type file it didn't return the file content but an internal storage file (which can be previewed in the UI, but in code this will still be a file).

You can try it for ex with:

id: subflow
namespace: comp.main_parallel

tasks:
  - id: "connector"
    type: io.kestra.plugin.core.http.Download
    uri: https://dummyjson.com/products

  - id: json_to_ion
    type: io.kestra.plugin.serdes.json.JsonToIon
    from: '{{ outputs.connector.uri }}'
    newLine: false # regular json

  - id: ion_to_jsonl
    type: io.kestra.plugin.serdes.json.IonToJson
    from: "{{ outputs.json_to_ion.uri }}"
    newLine: true # JSON-L

  - id: subflow
    type: io.kestra.plugin.core.flow.Subflow
    namespace: comp.batch
    flowId: batch
    wait: true
    transmitFailed: true
    inputs:
      input_file: '{{ outputs.ion_to_jsonl.uri }}'

  - id: ion_to_jsonl2
    type: io.kestra.plugin.serdes.json.IonToJson
    from: "{{ outputs.for_each_item_merge.subflowOutputs }}"
    newLine: true # JSON-L

ForEachItem merge the output in a single file, so if the outputs are files, it will not merge the file content but the file URI. I agree this is not easy to work with, but I'm not sure we can/want to change that.

Reading the outputs as strings in memory doesn't really solve the problem as the main benefit of using io.kestra.plugin.core.flow.ForEachItem is to split up big files in more managable (less big, but still big) files and process these separately.

Agreed but outputs of all subflow executions will not be loaded into memory, each output will create a separate file that will then be merged into one file at the end of the ForEachItem, this is the whole idea behind the ForEachItem: allow the user to work easily with big files by providing automatic split/merge. So with ForEachItem it's OK to return subflo outputs in STRING and not FILE.

I'll raise this internally anyway.

yorickdevries commented 1 month ago

@yorickdevries subflow work exactly the same, if you have an output of type file it didn't return the file content but an internal storage file (which can be previewed in the UI, but in code this will still be a file).

I don't understand what you mean with that. With subflows (io.kestra.plugin.core.flow.Subflow) you can easily return file(paths) which you can directly access. With io.kestra.plugin.core.flow.ForEachItem you cannot and therefore need to parse the strings separately (see below).

You can try it for ex with:

This code results in io.kestra.core.exceptions.IllegalVariableEvaluationException: Unable to find for_each_item_merge used in the expression {{ outputs.for_each_item_merge.subflowOutputs }} at ... when run

The outputs of the ForEachItem is a list of internal storage files, you cannot read them directly via Python open().

The best would be to read the file before returning it as output, it's not good is good idea if the file is big but if the file is small this is not an issue.

outputs:
  - id: output_file
    type: STRING
    value: '{{ read(outputs.ion_to_jsonl.uri) }}'

When I try this approach I get a file which does not directly resemble the input file. The for_each_item_merge step now just prints the 2 items per batch as one complete string under the key output_file (see image).

image

Now I need to parse these output strings again (and remove whitespaces) to merge them to one new file, while I just need a concatenation of the files which already exist in the io.kestra.plugin.core.flow.ForEachItem calls.

Effectively, this means I need to change the merge_outputs to the following:

  - id: merge_outputs
    type: io.kestra.plugin.scripts.python.Script
    taskRunner:
      type: io.kestra.plugin.scripts.runner.docker.Docker
    containerImage: python:slim
    outputFiles:
    - output.jsonl
    script: |
      import json
      with open('output.jsonl', 'a') as outfile:
        with open('{{ outputs.ion_to_jsonl2.uri }}', 'r') as infile:
          lines = infile.readlines()
          for line in lines:
            output_file_data = json.loads(line)["output_file"]
            for line2 in output_file_data.splitlines():
              outfile.write(line2.lstrip())
              outfile.write("\n")

I must say this might work, but is there really no more elegant solution to this? When I go for this approach I need to add a lot of extra code te reconstruct the file. Furthermore, I cannot re-use the batch-flows easily in case I dont want to split them up and just want to call them as subflow (io.kestra.plugin.core.flow.Subflow, which expects files as output). A workaround might be to have multiple outputs though.

I'll raise this internally anyway.

Thank you!

yorickdevries commented 1 month ago

For reference, the complete flows I tested with were:

main flow:

id: main_parallel
namespace: comp.main_parallel

tasks:
  - id: "connector"
    type: io.kestra.plugin.core.http.Download
    uri: https://dummyjson.com/products
  - id: extract_products
    type: io.kestra.plugin.scripts.python.Script
    taskRunner:
      type: io.kestra.plugin.scripts.runner.docker.Docker
    containerImage: python:slim
    outputFiles:
      - "output.json"
    script: |
      import json
      with open('{{ outputs.connector["uri"] }}', 'r') as infile:
        data=json.load(infile)
        with open('output.json', "w") as outfile:
          json.dump(data["products"], outfile, indent=2, sort_keys=True)

  - id: json_to_ion
    type: io.kestra.plugin.serdes.json.JsonToIon
    from: '{{ outputs.extract_products.outputFiles["output.json"] }}'
    newLine: false # regular json

  - id: ion_to_jsonl
    type: io.kestra.plugin.serdes.json.IonToJson
    from: "{{ outputs.json_to_ion.uri }}"
    newLine: true # JSON-L

  - id: for_each_item
    type: io.kestra.plugin.core.flow.ForEachItem
    items: "{{ outputs.ion_to_jsonl.uri }}"
    batch:
      rows: 2
    namespace: comp.batch
    flowId: batch
    wait: true
    transmitFailed: true
    inputs:
      input_file: '{{ taskrun.items }}'

  - id: ion_to_jsonl2
    type: io.kestra.plugin.serdes.json.IonToJson
    from: "{{ outputs.for_each_item_merge.subflowOutputs }}"
    newLine: true # JSON-L

  - id: merge_outputs
    type: io.kestra.plugin.scripts.python.Script
    taskRunner:
      type: io.kestra.plugin.scripts.runner.docker.Docker
    containerImage: python:slim
    outputFiles:
    - output.jsonl
    script: |
      import json
      with open('output.jsonl', 'a') as outfile:
        with open('{{ outputs.ion_to_jsonl2.uri }}', 'r') as infile:
          lines = infile.readlines()
          for line in lines:
            output_file_data = json.loads(line)["output_file"]
            for line2 in output_file_data.splitlines():
              outfile.write(line2.lstrip())
              outfile.write("\n")

  - id: jsonl_to_ion
    type: io.kestra.plugin.serdes.json.JsonToIon
    from: '{{ outputs.merge_outputs.outputFiles["output.jsonl"] }}'
    newLine: true # JSON-L
  - id: ion_to_json
    type: io.kestra.plugin.serdes.json.IonToJson
    from: "{{ outputs.jsonl_to_ion.uri }}"
    newLine: false # regular json

outputs:
- id: output_file
  type: FILE
  value: "{{ outputs.ion_to_json.uri }}"

subflow:

id: batch
namespace: comp.batch

inputs:
  - id: input_file
    type: FILE

tasks:
  - id: jsonl_to_ion
    type: io.kestra.plugin.serdes.json.JsonToIon
    from: '{{ inputs.input_file }}'
    newLine: true # JSON-L
  - id: ion_to_json
    type: io.kestra.plugin.serdes.json.IonToJson
    from: "{{ outputs.jsonl_to_ion.uri }}"
    newLine: false # regular json
  - id: json_to_ion
    type: io.kestra.plugin.serdes.json.JsonToIon
    from: "{{ outputs.ion_to_json.uri }}"
    newLine: false # regular json
  - id: ion_to_jsonl
    type: io.kestra.plugin.serdes.json.IonToJson
    from: "{{ outputs.json_to_ion.uri }}"
    newLine: true # JSON-L

outputs:
  - id: output_file
    type: STRING
    value: '{{ read(outputs.ion_to_jsonl.uri) }}'
loicmathieu commented 1 month ago

ForEachItem output returns a file with the merge of all the outputs of all subflows. If the outputs of the subflows are a FILE, it return a FILE with the merge of all files URIs, this is logical, that's why I said to use a STRING output and not a FILE, as each subflow handle a small amount of the original file, it should not be an issue.

We are thinking about this, maybe we will offer a reduce boolean which could be set to true to read those files automatically during the merge phase of the ForEachItem.

yorickdevries commented 1 month ago

We are thinking about this, maybe we will offer a reduce boolean which could be set to true to read those files automatically during the merge phase of the ForEachItem.

Thanks! That would be a really helpful feature!

anna-geller commented 1 month ago

Thinking a bit more, I don't understand what would be the purpose of the reduce to merge the previously split files back together. Concatenating all files from "{{ outputs.for_each_item_merge.subflowOutputs }}" will give you the same as the initial file you were trying to split in items.

It would be helpful to provide a bit more info about the problem you're trying to address. Perhaps ForEachItem is not the right option here—feel free to share more.

anna-geller commented 1 month ago

@yorickdevries here is an example how you can merge all files together - description contains a subflow:

id: iterate_over_json
namespace: company.team
description: |
  id: mysubflow
  namespace: company.team

  inputs:
    - id: json_file
      type: FILE

  tasks:
    - id: debug
      type: io.kestra.plugin.core.log.Log
      message: "{{ json(read(inputs.json_file)) }}"

    - id: return
      type: io.kestra.plugin.core.debug.Return
      format: "{{ json(read(inputs.json_file)) }}"

  outputs:
    - id: myoutput
      type: FILE
      value: "{{ inputs.json_file }}"    
tasks:
  - id: download
    type: io.kestra.plugin.core.http.Download
    uri: https://api.restful-api.dev/objects
    contentType: application/json
    method: GET
    failOnEmptyResponse: true
    timeout: PT15S

  - id: json_to_ion
    type: io.kestra.plugin.serdes.json.JsonToIon
    from: "{{ outputs.download.uri }}"
    newLine: false # regular json

  - id: ion_to_jsonl
    type: io.kestra.plugin.serdes.json.IonToJson
    from: "{{ outputs.json_to_ion.uri }}"
    newLine: true # JSON-L

  - id: for_each_item
    type: io.kestra.plugin.core.flow.ForEachItem
    items: "{{ outputs.ion_to_jsonl.uri }}"
    batch:
      rows: 1
    namespace: company.team
    flowId: mysubflow
    wait: true
    transmitFailed: true
    inputs:
      json_file: "{{ taskrun.items }}"

  - id: ion_to_json
    type: io.kestra.plugin.serdes.json.IonToJson
    from: "{{ outputs.for_each_item_merge.subflowOutputs }}"
    newLine: false

  - id: concat
    type: io.kestra.plugin.core.storage.Concat
    files: "{{ read(outputs.ion_to_json.uri) | jq('map(.myoutput)') | flatten }}"

If that solves the issue for you, feel free to close the issue. To be transparent, we are investigating improvements to this ForEach pattern, we are aware that the UX can be improved 👍

yorickdevries commented 1 month ago

Thinking a bit more, I don't understand what would be the purpose of the reduce to merge the previously split files back together. Concatenating all files from "{{ outputs.for_each_item_merge.subflowOutputs }}" will give you the same as the initial file you were trying to split in items.

It would be helpful to provide a bit more info about the problem you're trying to address. Perhaps ForEachItem is not the right option here—feel free to share more.

I made this (toy) example as reproduction to illustrate the problem I was running into (not being able to read the files from the io.kestra.plugin.core.flow.ForEachItem subtasks within Scrips). In practice I would perform more advanced calculations on the elements before returning files from the subflow.

@yorickdevries here is an example how you can merge all files together - description contains a subflow:

id: iterate_over_json
...

If that solves the issue for you, feel free to close the issue. To be transparent, we are investigating improvements to this ForEach pattern, we are aware that the UX can be improved 👍

I was not aware of the existence of io.kestra.plugin.core.storage.Concat, this already helps a lot! Thank you! Perhaps it would be a good addition for the ForEachItem documentation?

While the io.kestra.plugin.core.storage.Concat task can actually acccess (and concatenate) the files from the Kestra storage, normal Script tasks still cannot access these files directly (see earlier comments).

In certain cases one would still like to be able to process the original files from a bash/python script. Think of cases where more advanced "reduce" functions than just a concatenation need to be performed (for example when calculating grouped averages over large datasets).

A quick win might be a io.kestra.plugin.core.storage.Zip function, which returns a zip with all the Storage files in it. Scripts like Bash and Python can then unpack these files and further process them as they like. Would that be an idea?

anna-geller commented 1 month ago

thank you! totally understandable, super useful feedback! we are exploring compressOutputs property allowing Zip as one option https://github.com/kestra-io/kestra/issues/5456

yorickdevries commented 1 month ago

thank you! totally understandable, super useful feedback! we are exploring compressOutputs property allowing Zip as one option #5456

Great! I would propose to keep this issue open for the initial reason (as stated in the title): Script tasks cannot directly access output files from io.kestra.plugin.core.flow.ForEachItem Subflows. I changed the title of the issue to this.