kestra-io / kestra

:zap: Workflow Automation Platform. Orchestrate & Schedule code in any language, run anywhere, 500+ plugins. Alternative to Zapier, Rundeck, Camunda, Airflow...
https://kestra.io
Apache License 2.0
12.98k stars 1.13k forks source link

ForEachItem - Stream input items to the subflow instead of splitting the input file into multiple files #5161

Closed johnkm516 closed 1 month ago

johnkm516 commented 1 month ago

Feature description

One of the biggest problems with the current ForEachItem for me is that by default, ForEachItem will split an input file into multiple files, one for each batch. Consider the following flow parameters :

Instead of splitting the 100,000 row ION file into 100,000 separate files, I would like an option for the ForEachItem to simply stream the original ION file in memory in batches (the amount stored in memory will be dependent on the batch property's rows attribute), creating executions as it processes the file in a "sliding window" fashion.

I believe this will make the ForEachItem task far more efficient way it passes inputs to subflows.

anna-geller commented 1 month ago

hey John, did you know that you can decide about the batch size? it doesn't have to be 1 file per row e.g.: here 10k rows per batch:

  - id: each
    type: io.kestra.plugin.core.flow.ForEachItem
    items: "{{ inputs.file }}" # could be also an output variable {{ outputs.extract.uri }}
    inputs:
      file: "{{ taskrun.items }}" # items of the batch
    batch:
      rows: 10000
    namespace: company.team
    flowId: subflow
    revision: 1 # optional (default: latest)
    wait: true # wait for the subflow execution
    transmitFailed: true # fail the task run if the subflow execution fails
    labels: # optional labels to pass to the subflow to be executed
      key: value

To be transparent, we can't do that splitting in memory 😢 in fact, this is why this task exists - to split data processing into chunks and avoid processing everything at once.

johnkm516 commented 1 month ago

@anna-geller Hi Anna, yes I'm aware of the rows attribute. I mentioned that I wanted one row per execution in the OP.

To process this, there is a way, and this is how i currently process the flows :

1) Master flow with ForEachItem which splits the file into 100 rows and invokes a batching subflow 2) Batching subflow uses ForEach to invoke a worker subflow, splitting the 100 rows into one row per worker execution 3) Each worker execution processes the row

The above steps work, but now i have to create three flows for this task. And this workaround still splits the file and stores it in internal storage, which is essentially storing duplicate data.

You said you cannot do this in memory, but I'm wondering why not? Java has BufferedReader which should be able to process a file line by line instead of storing the entire file in memory. Is this more of a limitation due to the stateless nature of tasks?

I really like the UI that ForEachItem has when looking at gantt and I really wish i could use ForEachItem instead of splitting into three different flows.

anna-geller commented 1 month ago

Yup, you're exactly right - it's because of the stateless nature of the task run execution. Each task can in theory be executed on an entirely different worker and to pass data reliably and efficiently, we leverage internal storage.

What you've described seems like a perfectly valid use of kestra (I wouldn't consider it a workaround as you did everything the right way). Is there something that's inconvenient to split work into multiple subflows? a bit difficult to talk about this without examples or screen share, can you book a call with me and @loicmathieu to discuss after Tuesday (release date)? Loic could help validate technical aspects like the one about BufferedReader -- ping us via Slack to coordinate date or book here https://calendly.com/anna__geller/meet

johnkm516 commented 1 month ago

@anna-geller Thank you for the response. As you said, due to the nature of how tasks work it's not possible to stream input data. The thing I find inconvenient to split work into multiple subflows is that to do this For Each loop I have to create three subflows rather than two : one to cut the input into batches, and one to loop through each batch, and a third one to actually do the work for each row. The ForEachItem has a unique UI which shows all the sub executions which I find very useful, so I hoped that I could use this UI for each of my rows.

Would it be possible to somehow add an option to combine the ForEach and ForEachItem together? So that to maintain statelessness, we still split the input file into each batch file, but within the ForEachItem it also does a ForEach execution in memory for each batch to create an execution for each row.

I messaged you on Slack for a meeting time (if needed).

fatal-error22 commented 1 month ago

Hi @anna-geller, I would like to contribute on this project, I am an Backend Developer having 3years of experience in Java, though this is first I am trying to do something on opensource, so Please if anyone is not looking into this, Please assign this to me.

anna-geller commented 1 month ago

hey @fatal-error22, this one is too complex for first-time contributors. Can you check one of these issues instead?

johnkm516 commented 1 month ago

Closing after meeting with @loicmathieu and @tchiotludo.