carpentries-incubator / workflows-nextflow

Workflow management with Nextflow and nf-core
https://carpentries-incubator.github.io/workflows-nextflow/
Other
19 stars 29 forks source link

Channel operator example: Merge vs Join #23

Closed mahesh-panchal closed 3 months ago

mahesh-panchal commented 3 years ago

merge_vs_join_example.nf:

nextflow.enable.dsl = 2

include { SLEEP as SLEEP_ALPHA; SLEEP as SLEEP_BETA; MERGE_CHANNELS; JOIN_CHANNELS } from './nf_procs'

workflow {
    alpha_ch = Channel.of(['A', 1 ], ['B' ,2 ], ['C', 3 ])
    beta_ch  = Channel.of(['A', 4 ], ['B' ,5 ], ['C', 6 ])

    SLEEP_ALPHA( alpha_ch )
    SLEEP_BETA ( beta_ch )
    MERGE_CHANNELS( SLEEP_ALPHA.out, SLEEP_BETA.out      ).view()
    JOIN_CHANNELS ( SLEEP_ALPHA.out.join(SLEEP_BETA.out) ).view()
}

nf_procs.nf:

process SLEEP {

    input:
    tuple val(char_i), val(num_i)

    output:
    tuple val(char_i), val(num_i)

    script:
    """
    sleep \$[ ( \$RANDOM % 10 )  + 1 ]s
    """
}

process MERGE_CHANNELS {

    input:
    tuple val(char_i), val(num_i)
    tuple val(char_j), val(num_j)

    output:
    stdout()

    script:
    """
    echo "${char_i},${num_i} + ${char_j},${num_j}"
    """
}

process JOIN_CHANNELS {

    input:
    tuple val(char_ij), val(num_i), val(num_j)

    output:
    stdout()

    script:
    """
    echo "${char_ij}, ${num_i}, ${num_j}"
    """
}

output:

nextflow run merge_vs_join_example.nf 
N E X T F L O W  ~  version 21.04.0
Launching `merge_vs_join_example.nf` [jovial_pare] - revision: b7291d9907
executor >  local (12)
[7a/2b9736] process > SLEEP_ALPHA (2) [100%] 3 of 3 ✔
[e9/419ff2] process > SLEEP_BETA (3)  [100%] 3 of 3 ✔
[56/85577d] process > MERGE_CHANNELS (3)    [100%] 3 of 3 ✔
[a8/41a969] process > JOIN_CHANNELS (3)     [100%] 3 of 3 ✔
C,3 + B,5
A, 1, 4
A,1 + A,4
C, 3, 6
B, 2, 5
B,2 + C,6

Using this example, although clear, would entail introducing DSL2 modules before channel operators.

ggrimes commented 3 years ago

If this is for the Operator episode, could we use this example for the join operator?

nextflow.enable.dsl=2

read1_ch = channel.fromFilePairs("data/yeast/reads/*_1.fq.gz",size: 1)
read1_ch.count().view({"read1 channel size $it"})

read2_ch = channel.fromFilePairs("data/yeast/reads/ref*_2.fq.gz",size: 1)
read2_ch.count().view({"read2 channel size $it"})

//join on common tuple name
read1_ch.join(read2_ch).view()

output:

Launching `work.nf` [cheeky_baekeland] - revision: 74e843bab5
N E X T F L O W  ~  version 21.04.0
[ref1, [/Users/ggrimes2/Downloads/nextflow_rnaseq_training_dataset/data/yeast/reads/ref1_1.fq.gz], [/Users/ggrimes2/Downloads/nextflow_rnaseq_training_dataset/data/yeast/reads/ref1_2.fq.gz]]
[ref2, [/Users/ggrimes2/Downloads/nextflow_rnaseq_training_dataset/data/yeast/reads/ref2_1.fq.gz], [/Users/ggrimes2/Downloads/nextflow_rnaseq_training_dataset/data/yeast/reads/ref2_2.fq.gz]]
[ref3, [/Users/ggrimes2/Downloads/nextflow_rnaseq_training_dataset/data/yeast/reads/ref3_1.fq.gz], [/Users/ggrimes2/Downloads/nextflow_rnaseq_training_dataset/data/yeast/reads/ref3_2.fq.gz]]
read1 channel size 9
read2 channel size 3

And the cross operator, as the merge is deprecated in DSL2, as below

...
/*
*The cross operators allows you to combine the items of two channels in such a way that the items of the source channel are emitted along with the items emitted by the target channel for which they have a matching key.
*/
read1_ch.cross(read2_ch).view()

output:

N E X T F L O W  ~  version 21.04.0
Launching `work.nf` [angry_majorana] - revision: cfd1faf9fa
[[ref2, [/Users/ggrimes2/Downloads/nextflow_rnaseq_training_dataset/data/yeast/reads/ref2_1.fq.gz]], [ref2, [/Users/ggrimes2/Downloads/nextflow_rnaseq_training_dataset/data/yeast/reads/ref2_2.fq.gz]]]
[[ref3, [/Users/ggrimes2/Downloads/nextflow_rnaseq_training_dataset/data/yeast/reads/ref3_1.fq.gz]], [ref3, [/Users/ggrimes2/Downloads/nextflow_rnaseq_training_dataset/data/yeast/reads/ref3_2.fq.gz]]]
[[ref1, [/Users/ggrimes2/Downloads/nextflow_rnaseq_training_dataset/data/yeast/reads/ref1_1.fq.gz]], [ref1, [/Users/ggrimes2/Downloads/nextflow_rnaseq_training_dataset/data/yeast/reads/ref1_2.fq.gz]]]
mahesh-panchal commented 3 years ago

I suggested my version because it's a common mistake to feed two channels into a process and expect the elements to be paired. Also, many are unaware that this is what the merge operator does, and therefore likely why it was deprecated in DSL2 (it wasn't necessary since feeding two channels into a process does the same thing). The SLEEP process here quite explicitly delays returning channel output to mimic larger scale asynchronous examples, which forces one to see that if they want to pair data properly they need to use join. Small toy examples without incorporating some kind of delay will almost always show the merge version pairing files "correctly". When this is then coded in actual workflows, it can be hard to catch as many don't notice incorrect pairing of files, or notice a randomly occurring incorrect pairing of files.

ggrimes commented 3 years ago

I was hoping to avoid using process or workflow blocks in the operator episode to keep things simple. For that episode, I would just show the channel output after an operation using the view operator. Could this be put in the workflow episode, which covers combing processes? https://carpentries-incubator.github.io/workflows-nextflow/05-workflow/index.html

mahesh-panchal commented 3 years ago

That might be an idea too. It needs to be put somewhere where we are teaching how to compose processes together. I think you're right then that the operators page is not the best place for it. I'll add this to the workflows page then as a challenge.