nextflow-io / nextflow

A DSL for data-driven computational pipelines
http://nextflow.io
Apache License 2.0
2.74k stars 626 forks source link

Channel emits differently depending on order of dependent task completion #4104

Open kgalens opened 1 year ago

kgalens commented 1 year ago

Bug report

A channel may emit all, some, or none of what is expected, depending on the order of completion of upstream tasks. This can be caused when using the groupTuple operator in combination with groupKey.

It seems that the underlying issue may be related to the output from groupKey not being a string in some contexts (seemingly inconsistent, it works in some places but not in others). This can be remedied by calling .toString() on the value output by groupKey but it would be great if this emitted a string so channels would behave predictably and consistently across contexts.

Expected behavior and actual behavior

Expected behavior for a channel would be that the values emitted would be the same (perhaps in a different order) so long as the input channels emit the same values (again, order should not matter).

There exists an example where this is not the case. Depending on the order of upstream task completions, the channel may emit fewer tuples (without error) than expected, or even none as demonstrated in the example below.

Steps to reproduce the problem

In the below example, this is setup to ensure that the by_sample tasks will finish first (sleep 1) and by_batch tasks will finish after all samples have completed (sleep 30). This will create a situation where nothing is emitted from the batch_done channel (even though we'd expect 2 tuples to emitted, one for each batch).

process by_sample {
    input:
    val sample_id

    output:
    val sample_id, emit: sample_ids

    script:
    """
    sleep 1
    echo $sample_id
    """
}

process by_batch {
    input:
    val batch_id

    output:
    val batch_id, emit: batch_ids

    script:
    """
    sleep 30
    echo $batch_id
    """
}

workflow {
    // Input samples, each belonging to one of 2 batches.
    samp_ch  = Channel.from(
        ['SAMP1', 'BATCH1'],
        ['SAMP2', 'BATCH1'],    
        ['SAMP3', 'BATCH2'],
        ['SAMP4', 'BATCH2'],
        ['SAMP5', 'BATCH2'],
    )

    // A process which scatters on sample
    by_sample(samp_ch.map { sample, batch -> sample })
    by_sample.out.sample_ids.dump(tag: 'by_sample')

    by_batch(samp_ch.map { sample, batch -> batch }.unique())
    by_batch.out.batch_ids.dump(tag: 'by_batch')

    // This will create a channel resulting in:
    // tuple(batch_id, num_samples_in_batch)
    sample_counts = samp_ch
        .map { sample_id, batch -> tuple(batch, sample_id) }
        .groupTuple()
        .map { batch, sample_ids -> tuple(batch, sample_ids.size()) }

    sample_counts.dump(tag: 'sample_counts')

    // Using the above counts of samples per batch, we can create a channel
    // which will emit a batch ID when the following conditions are met:
    //   1. All samples in a single batch have completed process "by_sample"
    //   2. The batch has completed the "by_batch process"
    batch_done = by_sample.out.sample_ids
        .join(samp_ch)
        .map { sample, batch -> tuple(batch, sample) }
        .combine(sample_counts, by:0)
        .map { batch, sample, batch_size  -> tuple(groupKey(batch, batch_size), sample) }
        .groupTuple()
        .join(by_batch.out.batch_ids)
        .map { batch, samples -> tuple(batch, "batch_done") }.dump(tag: 'batch_done')
} 

Program output

If samples finish before batches (as enforced in the example above), we do not see any emitted from the batch_done channel:

N E X T F L O W  ~  version 23.04.2
Launching `src/bioinformatics/workflows/nextflow/hello_world/main.nf` [goofy_albattani] DSL2 - revision: 6badd408f3
executor >  local (7)
[e0/e080c9] process > by_sample (3) [100%] 5 of 5 ✔
[1c/dafd4a] process > by_batch (1)  [100%] 2 of 2 ✔
[DUMP: sample_counts] ['BATCH1', 2]
[DUMP: sample_counts] ['BATCH2', 3]
[DUMP: by_sample] 'SAMP1'
[DUMP: by_sample] 'SAMP5'
[DUMP: by_sample] 'SAMP2'
[DUMP: by_sample] 'SAMP4'
[DUMP: by_sample] 'SAMP3'
[DUMP: by_batch] 'BATCH2'
[DUMP: by_batch] 'BATCH1'

If we switch it up and enforce the batches to finish before samples (i.e. by_batch: sleep 1 and by_sample: sleep 30), the output looks similar to (notice that now batch_done emits a tuple for each batch):

N E X T F L O W  ~  version 23.04.2
Launching `src/bioinformatics/workflows/nextflow/hello_world/main.nf` [confident_mercator] DSL2 - revision: 717a98d981
executor >  local (7)
[11/334d03] process > by_sample (4) [100%] 5 of 5 ✔
[d4/c85a77] process > by_batch (1)  [100%] 2 of 2 ✔
[DUMP: sample_counts] ['BATCH1', 2]
[DUMP: sample_counts] ['BATCH2', 3]
[DUMP: by_batch] 'BATCH2'
[DUMP: by_batch] 'BATCH1'
[DUMP: by_sample] 'SAMP2'
[DUMP: by_sample] 'SAMP5'
[DUMP: by_sample] 'SAMP3'
[DUMP: by_sample] 'SAMP1'
[DUMP: batch_done] [BATCH1, 'batch_done']
[DUMP: by_sample] 'SAMP4'
[DUMP: batch_done] [BATCH2, 'batch_done']

Environment

Additional context

This can also be fixed in a couple other ways which may demonstrate the underlying problem. If we change the channel creation of batch_done to the following, we always see the expected output, no matter the order upstream tasks complete in (notice the call to batch.toString() in line 7):

    batch_done = by_sample.out.sample_ids
        .join(samp_ch)
        .map { sample, batch -> tuple(batch, sample) }
        .combine(sample_counts, by:0)
        .map { batch, sample, batch_size  -> tuple(groupKey(batch, batch_size), sample) }
        .groupTuple()
        .map { batch, samples -> tuple(batch.toString(), samples) }
        .join(by_batch.out.batch_ids)
        .map { batch, samples -> tuple(batch, "batch_done") }.dump(tag: 'batch_done')
bentsherman commented 1 year ago

You are right, the problem is with comparing group keys to other objects. It works when the groupkey is on the left-hand side but not when it's on the right:

v = 'BATCH1'
gk = groupKey(v, 3)

println v == gk // false
println gk == v // true

I don't know if there is a way to override the equals from the right-hand side...

bentsherman commented 1 year ago

@pditommaso one solution could be to "unwrap" the group key when it is emitted by groupTuple

robsyme commented 1 year ago

+1 to Ben's unwrap approach. This matches the semantics of the tidyverse groupBy and summarize functions. I find that the groupKey just gets in the way after the groupTuple operation has been applied.

stale[bot] commented 7 months ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.