nextflow-io / nextflow

A DSL for data-driven computational pipelines
http://nextflow.io
Apache License 2.0
2.61k stars 605 forks source link

`join` with `remainder:true` emits even when the upstream process fails. Is this an expected behaviour? #5043

Closed GallVp closed 3 weeks ago

GallVp commented 3 weeks ago

join with remainder:true emits even when the upstream process fails. Is this an expected behaviour? Can we kindly document it if it is expected?

workflow  {
    Channel.of('A')
    | PROCESS_A

    PROCESS_A.out.optional
    | join(Channel.of('B'), remainder: true)
    | view
}

process PROCESS_A {
    input:
    val a
    output:
    path 'optional', optional: true, emit: optional

    script:
    """
    exit 1
    """
}

join(Channel.of('B'), remainder: true) has the same emission for:

These are two different cases, but the outcome is same and it causes pipeline failure when PROCESS_A fails and the emitted partial values are not handled with this knowledge. This is a very simple scenario. For a more realistic scenario please see: https://github.com/Plant-Food-Research-Open/assemblyqc/blob/3548df4d7f742f42894f587405e2f446ea6e8b78/subworkflows/pfr/fasta_ltrretriever_lai/main.nf#L96

Thank you very much!

bentsherman commented 3 weeks ago

I'm confused as to why this is an issue. It seems like the expected behavior. If you enable the remainder and ignore errors from process A (which I assume you must be doing or else the pipeline would have failed), the join will emit the partial B value as if process A had succeeded but not emitted the optional output..

GallVp commented 3 weeks ago

Thanks for clarifying this.

I was worried that in case of partial join with retry error strategy, the downstream process will execute with wrong inputs. But after trying it on local and slurm executors, I am convinced that the downstream process won't be executed. The partial join only emits when the upstream process is successful or when the pipeline comes to an end. Or am I still missing something?

For future reference.

workflow  {
    Channel.of(['A', 'data'])
    | PROCESS_A

    PROCESS_A.out.optional
    | join(Channel.of(['A', 'data2']), remainder: true)
    | view
    | map { a, data, data2 -> [ a, data ?: [], data2 ] }
    | PROCESS_B
    | view
}

process PROCESS_A {
    input:
    tuple val(a), val(data)
    output:
    tuple val(a), path('optional'), optional: true, emit: optional

    script:
    """
    exit 1
    """
}

process PROCESS_B {
    input:
    tuple val(a), val(data), val(dat2)

    output:
    stdout

    script:
    """
    echo $data
    """
}
process {
    errorStrategy = 'retry'
    maxRetries = 2
    cpus = 1
}