nextflow-io / nextflow

A DSL for data-driven computational pipelines
http://nextflow.io
Apache License 2.0
2.71k stars 620 forks source link

Process cache groups #5238

Open adamrtalbot opened 1 month ago

adamrtalbot commented 1 month ago

New feature

Sometimes an error in the process is only exposes later on in the pipeline, when a downstream process fails. In these cases, it would be useful to force resume to restart from an earlier step in the pipeline rather than the failed process.

Usage scenario

Let's imagine a scenario where have three processes. The first is non-deterministic, because it uses a fancy new AI algorithm. The second and third use the output of the first process in sequence, however sometimes the third process will fail because the algorithm doesn't reach equilibrium or something. We might solve this by using the resume feature of Nextflow and trying to catch the error, but this will skip process 1 and 2 and jump straight to 3. This might just repeat the error, so we would prefer to start from process 1 again. Here's a minimal example:

params.exitcode = 1

process RANDOM {
    output:
    path("output.txt")

    script:
    """
    echo \$RANDOM > output.txt
    """
}

process DO_THING_WITH_RANDOM {
    input:
    path "input.txt"

    output:
    path("output.txt")

    script:
    """
    cat input.txt > output.txt
    """
}

process FAIL_WITH_RANDOM {
    input:
    path "input.txt"
    val exitcode

    output:
    path "output.txt"

    script:
    """
    cat input.txt > output.txt
    exit $exitcode
    """
}

workflow {
    RANDOM()
    DO_THING_WITH_RANDOM(RANDOM.out)
    FAIL_WITH_RANDOM(DO_THING_WITH_RANDOM.out, params.exitcode)
}

In this case, there is nothing we can do to make RANDOM restart when using -resume, even though it the output will change every time we run it.

Suggest implementation

If we could 'group' caches up, so if any are invalidated within a set we could restart from all of them. For example, we could add a key value which can be used to associate processes by sample ID:

process MYPROCESS { 
    cache true, key: id

    input:
    tuple val(id), path(bam), path(bai)
    ...
}

Alternatively, we should provide the tools for developers to add this to the errorStrategy so this could be baked into the pipeline itself. This might follow a similar pattern:

process MYPROCESS { 
    errorStrategy "retry"
    errorGroup id

    input:
    tuple val(id), path(bam), path(bai)
    ...
}
adamrtalbot commented 1 month ago

For a real world example, this is relevant to machine learning based algorithms such as Alphafold who may not know if step 1 is valid until performing a later step.

bentsherman commented 1 month ago

A hacky version of this would be to run Nextflow-in-Nexflow, then the cache of the inner nextflow run is essentially a "cache group" as you described.

This is related to subworkflow grouping, which makes me wonder if it could be supported using the proposed syntax in https://github.com/nf-core/fetchngs/pull/309 .

How about this:

workflow {
  inputs = Channel.of( 1, 2, 3 )

  inputs
    |> map(cacheGroup: true) { input ->
      input |> RANDOM |> DO_THING_WITH_RANDOM
    }
    |> map { out ->
      FAIL_WITH_RANDOM(out, params.exitcode)
    }
}

Since the first two processes are grouped in the same closure, it is trivial to group them into shared behaviors like a cache group. Whereas a cache id at the process level would allow for groupings that don't make sense (e.g. grouping two processes in completely different subworkflows).