nextflow-io / nextflow

A DSL for data-driven computational pipelines
http://nextflow.io
Apache License 2.0
2.68k stars 621 forks source link

Task output files mixed up between different subworkflow instances #5242

Open sralchemab opened 3 weeks ago

sralchemab commented 3 weeks ago

Bug report

Expected behavior and actual behavior

I have a subworkflow that I'm importing two times and invoking like this:

include { ANNOTATE as ANNOTATE_BCR } from '../subworkflows/local/annotate'
include { ANNOTATE as ANNOTATE_TCR } from '../subworkflows/local/annotate'

workflow TRUST4 {

    take:
    ch_samplesheet // channel: samplesheet read in from --input

    main:

    [...]

    ANNOTATE_BCR(
        ch_bcr_sequences,
        DATABASES.out.reference_igblast.collect(),
        DATABASES.out.reference_fasta.collect(),
        'ig'
    )

    ANNOTATE_TCR(
        ch_tcr_sequences,
        DATABASES.out.reference_igblast.collect(),
        DATABASES.out.reference_fasta.collect(),
        'tr'
    )

    [...]
}

The subworkflow looks something like this:

workflow ANNOTATE {

    take:
    ch_sequences        // channel: [ val(meta), path(sequences_file)  ]
    ch_igblastdb        // channel: /path/to/igblastdb')
    ch_reference_fasta  // channel: /path/to/imgtdb')
    loci                // val    : ig or tr

    main:

    CHANGEO_ASSIGNGENES_FMT7(ch_sequences, ch_igblastdb, loci)

    CHANGEO_MAKEDB(ch_sequences, CHANGEO_ASSIGNGENES_FMT7.out.blast, ch_reference_fasta)

    [...]

    emit:
    repertoire = ch_repertoire
}

I have multiple samples that will go simultaneously through both of these subworkflows.

Now, the issue I'm having is happening randomly and it's costing me a lot of money. For some reason, the task CHANGEO_MAKEDB is sometimes receiving the output from a different instance of the CHANGEO_ASSIGNGENES_FMT7 task.

Here's an example of what I mean:

# running subworkflow for multiple samples
    ANNOTATE_BCR(0000398770)
    ANNOTATE_BCR(0002294183)

# within each subworkflow
    CHANGEO_ASSIGNGENES_FMT7(0000398770)
    # CHANGEO_ASSIGNGENES_FMT7.out.blast returns 0000398770_ig_igblast.fmt7
    CHANGEO_MAKEDB(0000398770, CHANGEO_ASSIGNGENES_FMT7.out.blast)

    CHANGEO_ASSIGNGENES_FMT7(0002294183)
    # CHANGEO_ASSIGNGENES_FMT7.out.blast returns 0002294183_ig_igblast.fmt7
    CHANGEO_MAKEDB(0002294183, CHANGEO_ASSIGNGENES_FMT7.out.blast)

# however, CHANGEO_MAKEDB is receiving as input the output of the wrong instance of CHANGEO_ASSIGNGENES_FMT7, e.g.

    CHANGEO_MAKEDB(0000398770, 0002294183_ig_igblast.fmt7)

    CHANGEO_MAKEDB(0002294183, 0000398770_ig_igblast.fmt7)

Here are some extra details:

# modules.conf
process {
    withName: '.*:ANNOTATE_BCR:.*|.*:ANNOTATE_TCR:.*' {
        publishDir = [ enabled: false ]
        ext.prefix = { "${meta.id}_${meta.loci}" }
    }
}
# module: changeo_makedb.nf
process CHANGEO_MAKEDB {
    tag "$meta.id"
    label 'process_low'

    conda "bioconda::changeo=1.3.0 bioconda::igblast=1.22.0 conda-forge::wget=1.20.1"
    container "${ workflow.containerEngine == 'singularity' && !task.ext.singularity_pull_docker_container ?
        'https://depot.galaxyproject.org/singularity/mulled-v2-7d8e418eb73acc6a80daea8e111c94cf19a4ecfd:a9ee25632c9b10bbb012da76e6eb539acca8f9cd-1' :
        'biocontainers/mulled-v2-7d8e418eb73acc6a80daea8e111c94cf19a4ecfd:a9ee25632c9b10bbb012da76e6eb539acca8f9cd-1' }"

    input:
    tuple val(meta), path(reads) // reads in fasta format
    path  (igblast)
    path  (reference_fasta)

    output:
    tuple val(meta), path("*db-pass.tsv"), emit: tab        // sequence table in AIRR format
    path  ("*_command_log.txt")          , emit: logs       // process logs
    path  "versions.yml"                 , emit: versions

    script:
    def args = task.ext.args ?: ''
    """
    MakeDb.py igblast \\
        -i $igblast \\
        -s $reads \\
        -r ${reference_fasta}/${params.species.toLowerCase()}/vdj/ \\
        $args \\
        --outname ${task.ext.prefix} \\
        > ${task.ext.prefix}_makedb_command_log.txt

    cat <<-END_VERSIONS > versions.yml
    "${task.process}":
        igblastn: \$( igblastn -version | grep -o "igblast[0-9\\. ]\\+" | grep -o "[0-9\\. ]\\+" )
        changeo: \$( MakeDb.py --version | awk -F' '  '{print \$2}' )
    END_VERSIONS
    """
}

Here's what the .command.sh file looks like:

MakeDb.py igblast \
    -i 0002294183_ig_igblast.fmt7 \
    -s 0000398770.ig.fasta \
    -r imgtdb_base/human/vdj/ \
    --regions default --format airr --extended \
    --outname 0000398770_ig \
    > 0000398770_ig_makedb_command_log.txt

cat <<-END_VERSIONS > versions.yml
"ALCHEMAB_RECONSTRIMM:RECONSTRIMM:ANNOTATE_BCR:CHANGEO_MAKEDB":
    igblastn: $( igblastn -version | grep -o "igblast[0-9\. ]\+" | grep -o "[0-9\. ]\+" )
    changeo: $( MakeDb.py --version | awk -F' '  '{print $2}' )
END_VERSIONS

As you can see, both $reads and ${task.ext.prefix} are using the same meta.id, while $igblast is different.

I'm not sure what's going on here nor how I could properly troubleshoot it. So any help would be appreciated.

Steps to reproduce the problem

I can't reproduce it on my own. I just re-run the same jobs multiple times, and sometimes they fail and sometimes they succeed. I noticed that when I submit up to 9-10 jobs, they work fine. If I submit more than that, they would start failing. I will try to write a new and smaller pipeline to see if I can share it here for testing purposes.

Program output

I downloaded the output log from the Batch job, but it's not the same as the .nextflow.log and I don't know how to get this file. So I don't think that the log I'm providing would be of much help. I'm also attaching the command.run and command.sh files to show how the files are being mixed up: you can check out the nxf_stage function on the former, and the script parameters on the later.

files.tar.gz Files are named log-events-viewer-result.csv, command.run, and command.sh.

Environment

I'm running the jobs using AWS Batch. I set up the infrastructure using Tower but I'm submitting the jobs manually using the AWS Cli. I'm using Fargate for the main job, Wave, Fusion, Scratch = False, and I enabled cloudcache as well.

Nextflow version 24.04.3

adamrtalbot commented 2 weeks ago

Concurrency

One of the core tenants of Nextflow is implicit paralellism. That is, you shouldn't have to write code to handle concurrent excecutions of each process, Nextflow does that for you. For that to work, it will run the processes in any order it can and pass the files between steps, but because of this the order isn't guaranteed. In essence, processes are ran and the results are immediately passed to the next process. This can be counterintuitive when compared to using a scripting language which executes in order, but allows a Nextflow developer to not worry about order or file collisions.

In your example, the CHANGEO_MAKEDB process is immediately scooping up the first set of files that are emitted by CHANGEO_ASSIGNGENES_FMT7 and analysing them together. The time it takes to run the first process is noisy, so you are seeing combinations of files being analysed together without any consideration to ensuring they are from the same sample.

Example

Here's a mini example which illustrates the concurrency. It is designed to produce the output a: 10, b: 5 and c: 1, but flips the order because it emits results in reverse order of the sleep time:

process SLEEP {

    tag "${index}"

    input:
    tuple val(index), val(sleepTime)

    output:
    stdout

    script:
    """
    sleep ${sleepTime}
    echo ${sleepTime}
    """
}

process ECHO_TIMES {
    debug true
    tag "${index}"

    input:
        tuple val(index), val(sleepTime)
        val(time)

    output:
        stdout

    script:
        """
        echo ${time}
        """
}

workflow {
    times = Channel.of(["a", 10], ["b", 5], ["c", 1])
    SLEEP(times)
    ECHO_TIMES(times, SLEEP.out)
}
> nextflow run .
N E X T F L O W  ~  version 24.04.4
Launching `./main.nf` [suspicious_legentil] DSL2 - revision: d75eecadbe
[94/fd3d4b] Submitted process > SLEEP (c)
[8a/1574fb] Submitted process > SLEEP (a)
[38/b3cebd] Submitted process > SLEEP (b)
[b8/60ecf4] Submitted process > ECHO_TIMES (a)
1
[7d/0aa527] Submitted process > ECHO_TIMES (b)
5
[3f/b1274f] Submitted process > ECHO_TIMES (c)
10

Your code

To fix this, re-join the appropriate results using a sample specific key. You can do this by changing the input to a tuple of meta, reads and igblast then performing a join on the channel before passing to the pipeline. Here's an example:

process CHANGEO_MAKEDB {
    tag "$meta.id"
    label 'process_low'

    conda "bioconda::changeo=1.3.0 bioconda::igblast=1.22.0 conda-forge::wget=1.20.1"
    container "${ workflow.containerEngine == 'singularity' && !task.ext.singularity_pull_docker_container ?
        'https://depot.galaxyproject.org/singularity/mulled-v2-7d8e418eb73acc6a80daea8e111c94cf19a4ecfd:a9ee25632c9b10bbb012da76e6eb539acca8f9cd-1' :
        'biocontainers/mulled-v2-7d8e418eb73acc6a80daea8e111c94cf19a4ecfd:a9ee25632c9b10bbb012da76e6eb539acca8f9cd-1' }"

    input:
    tuple val(meta), path(reads), path(igblast)
    path  (reference_fasta)

    output:
    tuple val(meta), path("*db-pass.tsv"), emit: tab        // sequence table in AIRR format
    path  ("*_command_log.txt")          , emit: logs       // process logs
    path  "versions.yml"                 , emit: versions

    script:
    def args = task.ext.args ?: ''
    """
    MakeDb.py igblast \\
        -i $igblast \\
        -s $reads \\
        -r ${reference_fasta}/${params.species.toLowerCase()}/vdj/ \\
        $args \\
        --outname ${task.ext.prefix} \\
        > ${task.ext.prefix}_makedb_command_log.txt

    cat <<-END_VERSIONS > versions.yml
    "${task.process}":
        igblastn: \$( igblastn -version | grep -o "igblast[0-9\\. ]\\+" | grep -o "[0-9\\. ]\\+" )
        changeo: \$( MakeDb.py --version | awk -F' '  '{print \$2}' )
    END_VERSIONS
    """
}
workflow ANNOTATE {

    take:
    ch_sequences        // channel: [ val(meta), path(sequences_file)  ]
    ch_igblastdb        // channel: /path/to/igblastdb')
    ch_reference_fasta  // channel: /path/to/imgtdb')
    loci                // val    : ig or tr

    main:

    CHANGEO_ASSIGNGENES_FMT7(ch_sequences, ch_igblastdb, loci)

    // Rejoin on meta value
    ch_sequences_igblastdb = ch_sequences.join(CHANGEO_ASSIGNGENES_FMT7.out.blast)

    CHANGEO_MAKEDB(
        ch_sequences_igblastdb, 
        ch_reference_fasta
    )

    [...]

    emit:
    repertoire = ch_repertoire
}

What happens:

Of course, there's a shortcut we can take here. We can make the output tuple of CHANGEO_ASSIGNGENES_FMT7 the tuple we need directly without using a join:

    output:
    tuple val(meta), path(fastq, includeInputs: true), path("*.fmt7"), emit: blast

Now we can just use it directly:

    CHANGEO_MAKEDB(
        CHANGEO_ASSIGNGENES_FMT7.out.blast, 
        ch_reference_fasta
    )
sralchemab commented 2 weeks ago

Hi Adam, thanks for your explanation.

Would you mind telling me if the following example would work as well? I'm returning the input reads on its own instead of part of the blast tuple like in your example, and then passing both outputs on its own. I prefer to keep them separated to make it more clear.

Thanks!

process CHANGEO_ASSIGNGENES {
    tag "$meta.id"
    label 'process_low'

    input:
    tuple val(meta), path(reads)
    path  (igblast)
    val   (loci)

    output:
    tuple val(meta), path(reads, includeInputs: true), emit: reads   // I ADDED THIS LINE TO RETURN THE INPUT READS
    path  ("*igblast.${task.ext.suffix}"), emit: blast
    path  ("*_command_log.txt")          , emit: logs
    path  "versions.yml"                 , emit: versions

    script:
    """
    ...
    """
}

process CHANGEO_MAKEDB {
    tag "$meta.id"
    label 'process_low'

    input:
    tuple val(meta), path(reads)
    path  (igblast)
    path  (reference_fasta)

    output:
    tuple val(meta), path("*db-pass.tsv"), emit: tab
    path  ("*_command_log.txt")          , emit: logs
    path  "versions.yml"                 , emit: versions

    script:
    """
    ...
    """
}
workflow ANNOTATE {

    take:
    ch_sequences        // channel: [ val(meta), path(sequences_file)  ]
    ch_igblastdb        // channel: /path/to/igblastdb')
    ch_reference_fasta  // channel: /path/to/imgtdb')
    loci                // val    : ig or tr

    main:

    CHANGEO_ASSIGNGENES_FMT7(ch_sequences, ch_igblastdb, loci)

    // THIS TIME, READS AND BLAST OUTPUTS ARE COMING FROM THE PREVIOUS TASK
    // WILL THEY BE INPUTTED TOGETHER OR COULD THEY GET MIXED AS WELL?
    CHANGEO_MAKEDB(
        CHANGEO_ASSIGNGENES_FMT7.out.reads,    
        CHANGEO_ASSIGNGENES_FMT7.out.blast,
        ch_reference_fasta
    )

    [...]

    emit:
    repertoire = ch_repertoire
}
adamrtalbot commented 2 weeks ago

Soooo the answer is yes, but don't trust it. Because the outputs are pushed into channels at the same time, they will probably be emitted together, but Nextflow does nothing to guarantee it. Here's my example modified like yours:

process SLEEP {

    tag "${index}"

    input:
    tuple val(index), val(sleepTime)

    output:
    tuple val(index), val(sleepTime)
    stdout

    script:
    """
    sleep ${sleepTime}
    echo ${sleepTime}
    """
}

process ECHO_TIMES {
    debug true
    tag "${index}"

    input:
        tuple val(index), val(sleepTime)
        val(time)

    output:
        stdout

    script:
        """
        echo ${index}: ${time}
        """
}

workflow {
    times = Channel.of(["a", 10], ["b", 5], ["c", 1])
    SLEEP(times)
    ECHO_TIMES(SLEEP.out[0], SLEEP.out[1])
}

However we can very quickly break it if we something happens in between the two channels. Here I have added a sleep process in between the two steps:

process SLEEP {

    tag "${index}"

    input:
    tuple val(index), val(sleepTime)

    output:
    tuple val(index), val(sleepTime)
    stdout

    script:
    """
    sleep ${sleepTime}
    echo ${sleepTime}
    """
}

process IN_BETWEEN {
    input:
    tuple val(index), val(sleepTime)
    output:
    tuple val(index), val(sleepTime)
    script:
    """
    sleep \$((1 + \$RANDOM % 15))
    """
}

process ECHO_TIMES {
    debug true
    tag "${index}"

    input:
        tuple val(index), val(sleepTime)
        val(time)

    output:
        stdout

    script:
        """
        echo ${index}: ${time}
        """
}

workflow {
    times = Channel.of(["a", 1], ["b", 2], ["c", 3])
    SLEEP(times)
    IN_BETWEEN(SLEEP.out[0])
    ECHO_TIMES(IN_BETWEEN.out[0], SLEEP.out[1])
}

One option that is guaranteed to work is to use a multiMap, then use each element of that map. Like so:

process SLEEP {

    tag "${index}"

    input:
    tuple val(index), val(sleepTime)

    output:
    tuple val(index), val(sleepTime), stdout

    script:
    """
    sleep ${sleepTime}
    echo ${sleepTime}
    """
}

process ECHO_TIMES {
    debug true
    tag "${index}"

    input:
        tuple val(index), val(sleepTime)
        val time

    output:
        tuple val(index), val(sleepTime)

    script:
        """
        echo ${index}: ${time}
        """
}

workflow {
    times = Channel.of(["a", 10], ["b", 5], ["c", 1])
    SLEEP(times)
    split_ch = SLEEP.out.multiMap { meta, sleepTime, time ->
        sleepTime: [ meta, sleepTime ]
        time: time
    }               
    ECHO_TIMES( split_ch.sleepTime, split_ch.time )
}

But honestly, I would never use this there was no other option and in your case, joining by a sample ID is perfect. In Nextflow, passing around sample specific data as tuples is exactly how it is supposed to be done. This is a very common pattern:

input:
    tuple val(sampleId), path(bam), path(bai)
    tuple path(dbsnp), path(dbnsnp_tbi)
    tuple path(fasta), path(fai)

This allows you to explicitly maintain the groups of data without relying on file paths or naming systems. Is there any reason you are opposed to doing it this way?

bentsherman commented 2 weeks ago

Output channels from the same process are guaranteed to have the same order. Outputs from different processes have no such guarantee.

sralchemab commented 2 weeks ago

Hi @bentsherman. Are you saying that my example would work fine, then?

Hi @adamrtalbot. I don't think my way is opposed to yours. In the last version, I thought that having "reads" and "blast" as two separate inputs was the right thing to do, as they are two very different things and also makes the task inputs more clear. If I were to do something like:

input:
    tuple vale(sampleId), path(reads), path(blast)

shouldn't then all tasks be ending up having just one long input? (except for common channels, like references)

I based my last example from the vdj_annotation.nf subworkflow from the Airrflow pipeline. If my case could potentially fail, would it fail in here as well?

Thanks again! Really appreciate it!

bentsherman commented 2 weeks ago

I'll summarize it like this:

  1. Outputs can be separate, but if you need to re-combine them later, you should include a key like the meta map with each output
  2. The only inputs that should be separate are singleton inputs like a reference file. If you have multiple inputs associated with some key (e.g. meta) then you should provide them as a single input tuple and join them together in the workflow logic before calling the process

The nf-core/modules follow this pattern pretty well, so I would refer to them as an example.

Many people understandably get tripped up managing process inputs/outputs, which is why I'm working on a new syntax to simplify all of this stuff. Until then, just follow the above pattern and you'll be alright

sralchemab commented 2 weeks ago

Thanks, @bentsherman! Will give it a go. Cheers!

adamrtalbot commented 1 week ago

shouldn't then all tasks be ending up having just one long input? (except for common channels, like references)

Yep. One of the major problems with bioinformatics tools interfaces is they have 1 dimension of inputs, e.g. inputs.bam, input.bai, input.fasta, input.fai. We then have to coerce them into groups and sections using bizarre logic. Nextflow essentially gives you two or more dimensions by using groups of inputs. Let's consider this as an input block:

input:
    sample
    genome
    background

Nice and clear, easy to read. The challenge is when you add tuples it becomes less clear what each one represents:

input:
    tuple val(meta), path(bam), path(bai)
    tuple val(meta2), path(fasta), path(fai)
    tuple val(meta3), path(dbsnp), path(dbsnp_tbi)

As Ben says, we need to make this more straightforward to use.