nextflow-io / nextflow

A DSL for data-driven computational pipelines
http://nextflow.io
Apache License 2.0
2.76k stars 630 forks source link

Operator .collect() hangs when following .subscribe() #4436

Open dreavjr opened 1 year ago

dreavjr commented 1 year ago

Bug report

When the collection operators .collect() / .toList() / .toSortedList() follow a .subscribe() operator, the workflow execution hangs up.

Expected behavior and actual behavior

The operator .subscribe() should be transparent / have no effect on the execution of the workflow.

Steps to reproduce the problem

The following minimal example reproduces the problem:

workflow {
    Channel
        .of( 1, 2, 3, 4 )
        .subscribe(onNext: { println "INFO: Next item: ${it}..." }, onComplete: { println 'All items done!' })
        .collect()
}

Removing either operator (.subscribe or .collect) results in a normal execution, as does inverting the order of the operators (having subscribe after collect).

Program output

N E X T F L O W  ~  version 23.10.0
Launching `/home/valle/workspace/code/CY/experiments/temp.nf` [determined_rosalind] DSL2 - revision: 26cfaa283f
INFO: Next item: 1...

(and hangs indefinitely until ^C is pressed)

Environment

bentsherman commented 1 year ago

According to the docs, the subscribe operator does not forward its input.

However, it seems the code is trying to do something like this: https://github.com/nextflow-io/nextflow/blob/171831eaa1211df641a98564229fa1fcb68838e2/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy#L76-L86

Returning the source allows you to chain other channel methods but not "operators", I guess because they are extensions. If we changed subscribe to return a DataflowWriteChannel then that should make subscribe chainable from both ends.

In the meantime though, you can just use view if you want to log items as they go through a channel