kestra-io / kestra

Infinitely scalable, event-driven, language-agnostic orchestration and scheduling platform to manage millions of workflows declaratively in code.
https://kestra.io
Apache License 2.0
7.11k stars 422 forks source link

Add a task allowing to purge outputs from the current execution by IDs #4207

Open anna-geller opened 1 week ago

anna-geller commented 1 week ago

Feature description

Context

There are many use cases in which user may want to pass output to downstream tasks without the need to keep it afterward due to:

For example, imagine that you extract a large dataset from a given source, and then you load it to a destination such as BigQuery. Once data has been successfully stored in BigQuery, there's no need to keep it in internal storage. We currently support a Purge task, but it will delete all outputs — you can't cherry-pick only specific ones. However, sometimes you want to keep all outputs but only purge a single large/sensitive output.

It would be great to add a task that allows to delete only specific outputs by ID.

Proposed syntax

  - id: clean
    type: io.kestra.plugin.core.storage.PurgeOutputs
    outputs: 
      - "{{ outputs.extract.uri }}"

Example usage in a flow

id: purge_outputs
namespace: company.myteam
tasks:
  - id: extract
    type: io.kestra.plugin.core.http.Download
    description: imagine this is a huuuuuge file that doesn't need to be kept in internal storage after it has been loaded to BigQuery
    uri: https://huggingface.co/datasets/kestra/datasets/blob/main/csv/orders.csv

  - id: load
    type: io.kestra.plugin.gcp.bigquery.Load
    from: "{{ outputs.extract.uri }}"
    destinationTable: "my_project.my_dataset.my_table"
    format: CSV
    csvOptions:
      fieldDelimiter: ";"

  - id: clean
    type: io.kestra.plugin.core.storage.PurgeOutputs
    outputs: 
      - "{{ outputs.extract.uri }}"
aku commented 1 week ago

I have a lot of similar cases. It would be super helpful to have this feature implemented

anna-geller commented 1 week ago

Update: This will be complex to implement (not a quick win) since this new task will not be able to remove the outputs of other tasks unless we change the executor implementation to make it work.

Additionally, for each output to remove, we'll need to:

  1. Remove it from internal storage if it's a file
  2. Remove it from the DB if it's an object (not a file)
  3. Remove the metadata for that output in the backend so that removed outputs are not displayed in the Outputs tab.