nextflow-io / nextflow

A DSL for data-driven computational pipelines
http://nextflow.io
Apache License 2.0
2.68k stars 621 forks source link

Workflow with two watchDir channels hangs on local executor #5207

Open da-i opened 1 month ago

da-i commented 1 month ago

Bug report & Expected behavior and actual behavior

I was testing the option to watch two separate folders for different input files. When building a proof of concept workflow i found an issue where the workflow does no longer accept any data, but does not exit, and just hangs.

The idea is to wait for a certain file with a different regex pattern and when it sees that stop the pipeline. This approach resulted from the fact that you are not allowed to nest regex patterns (which i get, as it is far from ideal in most cases). So as a workaround we create a file with a certain name in the input directory, as i could not get any other method to work.

Steps to reproduce the problem

Some code:

workflow {

    log.info """Realtime pipeline started"""

    Some_input_model = Channel.fromPath(params.modbase_model).first()
    reference = Channel.fromPath(params.reference).first()

    // add the trailing slash if its missing from the project folder.
    if (params.input_read_dir.endsWith("/")){
        input_folder = "${params.input_read_dir}"
    }
    else {
        input_folder = "${params.input_read_dir}/"
    }

    read_pattern = "${input_folder}${params.read_pattern}"
    stop_pattern = "${input_folder}${params.sequence_summary_pattern}"
    log.info """Looking for files with: $read_pattern"""
    log.info """Going until files with pattern: $stop_pattern"""

    stop_files_initial = Channel.fromPath(stop_pattern).ifEmpty('empty')
    stop_files_initial.dump(tag: "stop-files")
    StopConditionInitial(stop_files_initial)

    // Realtime collection stop files
    stop_files = Channel.watchPath(stop_pattern,'create,modify').until{ file -> file.name.matches('seq*DONE*.txt') }
    stop_files.dump(tag: "stop-files-rt")
    StopCondition(stop_files.first())

    read_files_initial = Channel.fromPath(read_pattern)
    read_files_initial = read_files_initial.map(x -> [x.Parent.simpleName, x.simpleName,x])
    read_files_initial.dump(tag: "read-files")

    // Realtime collection read files
    read_files = Channel.watchPath(read_pattern,'create,modify').until{ file -> file.name.matches('DONE*.fastq.gz') }
    read_files = read_files.map(x -> [x.Parent.simpleName, x.simpleName,x])
    read_files.dump(tag: "readfiles_postfilter")

    // Example processes
    ModifiedBaseCallingEX(read_files_initial, Some_input_model, reference)
    ModifiedBaseCallingRT(read_files, Some_input_model, reference)

    basecalled_data = ModifiedBaseCallingEX.out.concat(ModifiedBaseCallingRT.out)
....

where we import StopConditionInitial and StopCondition from:

process StopCondition { 
    publishDir = [path: params.input_read_dir , mode: 'copy', pattern: "*"]

    input:
        val (stop_reason)
    output:
        path "*DONE.*", optional: true
    script:
    """
    #!/usr/bin/env python
    if "$stop_reason" == "empty":
        print('empty init case')
        exit(0)

    print("$stop_reason")
    with open("DONE.fastq.gz", "w") as f:
        f.write("abc")
    with open("sequencing_summary_DONE.txt", "w") as f:
        f.write("defasdasd")
    """
}

Not the most elegant solution with respect of the $stop_reason == 'empty', build was testing if this works :).

Program output

When i run this

nextflow main.nf --output_dir "abc_test_out" -stub -dump-channels stop-files-rt

 N E X T F L O W   ~  version 24.04.3

Launching `main.nf` [crazy_woese] DSL2 - revision: 5714c7a005

WARN: NEXTFLOW RECURSION IS A PREVIEW FEATURE - SYNTAX AND FUNCTIONALITY CAN CHANGE IN FUTURE RELEASES

    ===================================================
    Cyclomics/Realtime_test : Cyclomics real-time example
    ===================================================
    Inputs:
        input_reads              : test_in
        Cmd line                 : nextflow main.nf --output_dir abc_test_out -stub -dump-channels stop-files-rt

Cyclomics Realtime pipeline started
Looking for files with: test_in/**.{fq,fastq,fq.gz,fastq.gz}
Going until files with pattern: test_in/{seq*,DONE}.txt
executor >  local (4)
executor >  local (4)
executor >  local (4)
[2c/bb27b4] Sto…onditionInitial (1) | 1 of 1 ✔
[d0/634117] StopCondition           | 1 of 1 ✔
[d4/b78b88] Mod…edBaseCallingEX (1) | 2 of 2 ✔
[-        ] ModifiedBaseCallingRT   -
[DUMP: stop-files-rt] <PATH>/test_in/sequencing_summary_abc.txt
[DUMP: stop-files-rt] <PATH>/test_in/sequencing_summary_DONE.txt

NOTE: i created a stop file after a bit. The .nextflow.log shows:

Aug-06 13:38:55.321 [Task submitter] DEBUG n.executor.local.LocalTaskHandler - Launch cmd line: /bin/bash -ue .command.run
Aug-06 13:38:55.322 [Task submitter] INFO  nextflow.Session - [d0/634117] Submitted process > StopCondition
Aug-06 13:38:55.733 [Task monitor] DEBUG n.processor.TaskPollingMonitor - Task completed > TaskHandler[id: 4; name: StopCondition; status: COMPLETED; exit: 0; error: -; workDir: /home/dami/Software/nf-sturgeon/work/d0/6341178bf6baeee05ec13995447fee]
Aug-06 13:38:56.301 [Actor Thread 56] INFO  nextflow.extension.DumpOp - [DUMP: stop-files-rt] /home/dami/Software/nf-sturgeon/test_in/sequencing_summary_DONE.txt
Aug-06 13:38:56.302 [main] DEBUG nextflow.Session - Session await > all processes finished
Aug-06 13:38:56.336 [Task monitor] DEBUG n.processor.TaskPollingMonitor - <<< barrier arrives (monitor: local) - terminating tasks monitor poll loop
Aug-06 13:38:56.336 [main] DEBUG nextflow.Session - Session await > all barriers passed

And then nothing happens and it just sits there. even if we add files matching to the read_input pattern or stopfile pattern.

Same kind of output when the StopFilesInitial is not empty.

When we remove the block:

    // stop_files = Channel.watchPath(stop_pattern,'create,modify').until{ file -> file.name.matches('seq*DONE*.txt') }
    // stop_files.dump(tag: "stop-files-rt")
    // StopCondition(stop_files.first())

Everything works!

Aug-06 13:47:20.074 [main] DEBUG nextflow.Session - Session await > all barriers passed
Aug-06 13:47:20.077 [main] DEBUG nextflow.util.ThreadPoolManager - Thread pool 'TaskFinalizer' shutdown completed (hard=false)
Aug-06 13:47:20.077 [main] DEBUG nextflow.util.ThreadPoolManager - Thread pool 'PublishDir' shutdown completed (hard=false)
Aug-06 13:47:20.083 [main] DEBUG n.trace.WorkflowStatsObserver - Workflow completed > WorkflowStats[succeededCount=5; failedCount=0; ignoredCount=0; cachedCount=0; pendingCount=0; submittedCount=0; runningCount=0; retriesCount=0; abortedCount=0; succeedDuration=3.3s; failedDuration=0ms; cachedDuration=0ms;loadCpus=0; loadMemory=0; peakRunning=3; peakCpus=3; peakMemory=0; ]
Aug-06 13:47:20.083 [main] DEBUG nextflow.trace.TraceFileObserver - Workflow completed -- saving trace file
Aug-06 13:47:20.085 [main] DEBUG nextflow.trace.ReportObserver - Workflow completed -- rendering execution report
Aug-06 13:47:20.450 [main] DEBUG nextflow.trace.TimelineObserver - Workflow completed -- rendering execution timeline
Aug-06 13:47:20.631 [main] DEBUG nextflow.cache.CacheDB - Closing CacheDB done
Aug-06 13:47:20.644 [main] DEBUG nextflow.util.ThreadPoolManager - Thread pool 'FileTransfer' shutdown completed (hard=false)
Aug-06 13:47:20.645 [main] DEBUG nextflow.script.ScriptRunner - > Execution complete -- Goodbye

Environment

> nextflow -version

      N E X T F L O W
      version 24.04.3 build 5916
      created 09-07-2024 19:35 UTC (21:35 CEST)
      cite doi:10.1038/nbt.3820
      http://nextflow.io

> java --version
openjdk 21.0.4 2024-07-16
OpenJDK Runtime Environment (build 21.0.4+7-Ubuntu-1ubuntu224.04)
OpenJDK 64-Bit Server VM (build 21.0.4+7-Ubuntu-1ubuntu224.04, mixed mode, sharing)
> cat /etc/*-release
DISTRIB_ID=Ubuntu
DISTRIB_RELEASE=24.04
DISTRIB_CODENAME=noble
DISTRIB_DESCRIPTION="Ubuntu 24.04 LTS"
PRETTY_NAME="Ubuntu 24.04 LTS"
NAME="Ubuntu"
VERSION_ID="24.04"
VERSION="24.04 LTS (Noble Numbat)"
VERSION_CODENAME=noble
ID=ubuntu
ID_LIKE=debian
HOME_URL="https://www.ubuntu.com/"
SUPPORT_URL="https://help.ubuntu.com/"
BUG_REPORT_URL="https://bugs.launchpad.net/ubuntu/"
PRIVACY_POLICY_URL="https://www.ubuntu.com/legal/terms-and-policies/privacy-policy"
UBUNTU_CODENAME=noble
LOGO=ubuntu-logo

Additional context

disabling nextflow.preview.recursion does not resolve the issue.

changed code blocks
bentsherman commented 1 month ago

Are you using recursion somewhere in your pipeline?

da-i commented 1 month ago

I was initially, but i disabled in a test before submitting, see #Additional context

da-i commented 1 month ago

But even if you distill it down to its core, it does not work:

da-i commented 1 month ago
input_folder = "test_in/"
read_pattern = "**.{fq,fastq,fq.gz,fastq.gz}"
stop_pattern = "{seq*,DONE}.txt"

process StopCondition {
    input:
        path(input_file)
    output:
        path("*.txt")
    """
    echo "hi" >> hi.txt
    """

}

workflow {
    read_pattern = "${input_folder}${read_pattern}"
    stop_pattern = "${input_folder}${stop_pattern}"
    log.info """Looking for files with: $read_pattern"""
    log.info """Going until files with pattern: $stop_pattern"""

    stop_file_found = true
    stop_files = Channel.watchPath(stop_pattern,'create,modify').until{ stop_file_found }
    stop_files.filter{ it ->  it.simpleName !=~ 'seq*DONE.txt'}
    stop_files.dump(tag: "stop-files-rt")
    StopCondition(stop_files)

}%    

Never exits.