nextflow-io / nextflow

A DSL for data-driven computational pipelines
http://nextflow.io
Apache License 2.0
2.76k stars 629 forks source link

Workflow with reduction operators hangs forever when recursed #3795

Open bentsherman opened 1 year ago

bentsherman commented 1 year ago

Bug report

If a workflow uses any kind of reduction operator (collect, count, max, reduce, etc), it will hang forever if recursed.

Steps to reproduce the problem

Here is a minimal example that recurses a workflow that uses the count operator. You could also use any operator that returns a value channel, and I predict you would get the same result.

nextflow.preview.recursion=true

workflow LOOP {
    take:
        ch_in

    main:
        ch_in
            | flatMap { n -> 1 .. n + 1 }
            | count
            | set { ch_out }

    emit:
        ch_out
}

workflow {
    ch_in = Channel.value(1)

    // conpletes
    // LOOP( ch_in )

    // hangs forever
    // LOOP.recurse( ch_in ).times( 10 )

    // hangs forever
    LOOP.recurse( ch_in ).until( n -> n == 10 )

    LOOP.out.view()
}

Program output

Log output (abridged):

Mar-23 02:45:26.927 [main] DEBUG nextflow.Session - Session start
Mar-23 02:45:27.153 [main] DEBUG nextflow.script.ScriptRunner - > Launching execution
Mar-23 02:45:27.169 [main] WARN  nextflow.NextflowMeta$Preview - NEXTFLOW RECURSION IS A PREVIEW FEATURE - SYNTAX AND FUNCTIONALITY CAN CHANGE IN FUTURE RELEASE
Mar-23 02:45:27.313 [main] DEBUG nextflow.Session - Workflow process names [dsl2]: 
Mar-23 02:45:27.314 [main] DEBUG nextflow.Session - Igniting dataflow network (1)
Mar-23 02:45:27.315 [main] DEBUG nextflow.script.ScriptRunner - > Awaiting termination 
Mar-23 02:45:27.315 [main] DEBUG nextflow.Session - Session await
Mar-23 02:45:27.315 [main] DEBUG nextflow.Session - Session await > all processes finished
Mar-23 02:45:27.315 [main] DEBUG nextflow.Session - Session await > all barriers passed
Mar-23 02:45:33.008 [SIGINT handler] DEBUG nextflow.Session - Session aborted -- Cause: SIGINT
Mar-23 02:45:33.030 [SIGINT handler] DEBUG nextflow.Session - The following nodes are still active:
  [operator] flatMap
  [operator] count
  [operator] view

Environment

Additional context

I suspect it's the same or similar bug as #2609

bentsherman commented 1 year ago

For now I think the only way to work around this bug is to avoid using reduction operators in a recursive workflow. To do that, you would have to combine some processes so that the reduction happens in a process script instead of the dataflow logic. In the above example, I would have to refactor the flatMap | count piece into a single process that performs the same thing in Groovy or e.g. a Bash script. The downside is that you may lose some parallelization.

asangphukieo commented 1 year ago

I found this issue with operator collect. Any suggestion to work around this bug for collect()? Thank you

bentsherman commented 1 year ago

Hi @asangphukieo , see my previous comment , for now the only workaround I can see is to avoid using collect and try to implement the same logic with Groovy or Bash code. You might have to merge some processes to make it work.

jeffquinn-msk commented 1 year ago

I think I'm encountering this right now with recursive subworkflow I'm trying to write that uses groupTuple, I suppose that qualifies as a reduction operator as well..

bentsherman commented 1 year ago

Indeed

I think the problem is that the operator is re-used across every iteration of the workflow, so if it needs to wait for all inputs before emitting then it will never proceed past the first iteration.

Basically we need a way to "reset" the operator on each iteration. What we really need to do is create a new instance of the operator in the DAG for each iteration, but that requires modifying the DAG while it is running which is a whole can of worms...