bentsherman / nf-boost

Experimental features for Nextflow
Apache License 2.0
26 stars 0 forks source link

Nf-boost deleting files used in upcoming processes #4

Open gacrestani opened 1 week ago

gacrestani commented 1 week ago

Hello, thanks for the plugin!

I am having some issues with it, though. It is apparently deleting intermediate files marked for use in upcoming processes. When running my pipeline (which I created myself), it fails with the following output error:

  Started on:    shmoo
  Started at:    Sun Sep 22 04:53:11 PM PDT 2024
N E X T F L O W  ~  version 24.04.3
WARN: It appears you have never run this project before -- Option `-resume` is ignored
Launching `main.nf` [maniac_ramanujan] DSL2 - revision: 18f2745068
B U R K E   L A B   P I P E L I N E
===================================

[09/950c2e] Submitted process > BwaMem (2)
[8d/c9fb27] Submitted process > BwaMem (1)
[12/2a6eab] Submitted process > BwaMem (3)
[86/cd95a7] Submitted process > BwaMem (4)
[c7/413b4c] Submitted process > BwaMem (6)
[fc/6081a8] Submitted process > BwaMem (5)
[93/079b6a] Submitted process > MergeSamFiles (2)
[87/efb7ac] Submitted process > MergeSamFiles (1)
[b4/08809a] Submitted process > MarkDuplicates (1)
[b2/01d53a] Submitted process > MarkDuplicates (2)
ERROR ~ Error executing process > 'MarkDuplicates (1)'

Caused by:
  Process `MarkDuplicates (1)` terminated with an error exit status (3)

Command executed:

  # gatk MarkDuplicates script
  # Defining the command
  cmd="gatk MarkDuplicates --INPUT CB_rep01_gen56.bam --METRICS_FILE CB_rep01_gen56_duplicate_metrics.txt --OUTPUT CB_rep01_gen56_duplicates_marked.bam --TMP_DIR /scratch"

  echo "$cmd"

  # Run command
  eval $cmd

Command exit status:
  3

Command output:
  gatk MarkDuplicates --INPUT CB_rep01_gen56.bam --METRICS_FILE CB_rep01_gen56_duplicate_metrics.txt --OUTPUT CB_rep01_gen56_duplicates_marked.bam --TMP_DIR /scratch

Command error:
  Using GATK jar /fs1/local/cqls/software/x86_64/gatk4-4.5.0.0/envs/gatk4/share/gatk4-4.5.0.0-0/gatk-package-4.5.0.0-local.jar
  Running:
      java -Dsamjdk.use_async_io_read_samtools=false -Dsamjdk.use_async_io_write_samtools=true -Dsamjdk.use_async_io_write_tribble=false -Dsamjdk.compression_level=2 -jar /fs1/local/cqls/software/x86_64/gatk4-4.5.0.0/envs/gatk4/share/gatk4-4.5.0.0-0/gatk-package-4.5.0.0-local.jar MarkDuplicates --INPUT CB_rep01_gen56.bam --METRICS_FILE CB_rep01_gen56_duplicate_metrics.txt --OUTPUT CB_rep01_gen56_duplicates_marked.bam --TMP_DIR /scratch
  17:03:55.009 INFO  NativeLibraryLoader - Loading libgkl_compression.so from jar:file:/fs1/local/cqls/software/x86_64/gatk4-4.5.0.0/envs/gatk4/share/gatk4-4.5.0.0-0/gatk-package-4.5.0.0-local.jar!/com/intel/gkl/native/libgkl_compression.so
  [Sun Sep 22 17:03:55 PDT 2024] MarkDuplicates --INPUT CB_rep01_gen56.bam --OUTPUT CB_rep01_gen56_duplicates_marked.bam --METRICS_FILE CB_rep01_gen56_duplicate_metrics.txt --TMP_DIR /scratch --MAX_SEQUENCES_FOR_DISK_READ_ENDS_MAP 50000 --MAX_FILE_HANDLES_FOR_READ_ENDS_MAP 8000 --SORTING_COLLECTION_SIZE_RATIO 0.25 --TAG_DUPLICATE_SET_MEMBERS false --REMOVE_SEQUENCING_DUPLICATES false --TAGGING_POLICY DontTag --CLEAR_DT true --DUPLEX_UMI false --FLOW_MODE false --FLOW_QUALITY_SUM_STRATEGY false --USE_END_IN_UNPAIRED_READS false --USE_UNPAIRED_CLIPPED_END false --UNPAIRED_END_UNCERTAINTY 0 --FLOW_SKIP_FIRST_N_FLOWS 0 --FLOW_Q_IS_KNOWN_END false --FLOW_EFFECTIVE_QUALITY_THRESHOLD 15 --ADD_PG_TAG_TO_READS true --REMOVE_DUPLICATES false --ASSUME_SORTED false --DUPLICATE_SCORING_STRATEGY SUM_OF_BASE_QUALITIES --PROGRAM_RECORD_ID MarkDuplicates --PROGRAM_GROUP_NAME MarkDuplicates --READ_NAME_REGEX <optimized capture of last three ':' separated fields as numeric values> --OPTICAL_DUPLICATE_PIXEL_DISTANCE 100 --MAX_OPTICAL_DUPLICATE_SET_SIZE 300000 --VERBOSITY INFO --QUIET false --VALIDATION_STRINGENCY STRICT --COMPRESSION_LEVEL 2 --MAX_RECORDS_IN_RAM 500000 --CREATE_INDEX false --CREATE_MD5_FILE false --help false --version false --showHidden false --USE_JDK_DEFLATER false --USE_JDK_INFLATER false
  [Sun Sep 22 17:03:55 PDT 2024] Executing as crestang@shmoo.hpc.oregonstate.edu on Linux 5.14.0-362.24.1.el9_3.x86_64 amd64; OpenJDK 64-Bit Server VM 17.0.11-internal+0-adhoc..src; Deflater: Intel; Inflater: Intel; Provider GCS is available; Picard version: Version:4.5.0.0
  [Sun Sep 22 17:03:55 PDT 2024] picard.sam.markduplicates.MarkDuplicates done. Elapsed time: 0.00 minutes.
  Runtime.totalMemory()=285212672
  To get help, see http://broadinstitute.github.io/picard/index.html#GettingHelp
  htsjdk.samtools.SAMException: Cannot read non-existent file: file:///scratch/nxf.wdz1YqbYGY/CB_rep01_gen56.bam
    at htsjdk.samtools.util.IOUtil.assertFileIsReadable(IOUtil.java:498)
    at htsjdk.samtools.util.IOUtil.assertFileIsReadable(IOUtil.java:485)
    at htsjdk.samtools.util.IOUtil.assertInputIsValid(IOUtil.java:461)
    at htsjdk.samtools.util.IOUtil.assertInputsAreValid(IOUtil.java:537)
    at picard.sam.markduplicates.MarkDuplicates.doWork(MarkDuplicates.java:257)
    at picard.cmdline.CommandLineProgram.instanceMain(CommandLineProgram.java:280)
    at org.broadinstitute.hellbender.cmdline.PicardCommandLineProgramExecutor.instanceMain(PicardCommandLineProgramExecutor.java:37)
    at org.broadinstitute.hellbender.Main.runCommandLineProgram(Main.java:166)
    at org.broadinstitute.hellbender.Main.mainEntry(Main.java:209)
    at org.broadinstitute.hellbender.Main.main(Main.java:306)

Work dir:
  /scratch/work/b4/08809a4967c06eccd9265346e27429

Tip: view the complete command output by changing to the process work dir and entering the command `cat .command.out`

My workflow is:

workflow {
    Channel.fromPath("metadata/FLYLONG_metadata_nextflow.csv")
    | splitCsv(header:true)
    | map { row ->
        fastq1_path = params.samples_directory + "FLYLONG_" + row.population + "/" + row.fastq1
        fastq2_path = params.samples_directory + "FLYLONG_" + row.population + "/" + row.fastq2

        meta = row.subMap(
            'flow_cell',
            'lane',
            'population',
            'barcode',
            'sequencing_facility',
            'internal_library_name'
            )
        [row.population, meta, [
            file(fastq1_path, checkIfExists: true),
            file(fastq2_path, checkIfExists: true)]]
    }
    | filter { it.contains("EB_rep04_gen20") || it.contains("CB_rep01_gen56")}
    | set { samples }

    BwaMem(samples)
    MergeSamFiles(BwaMem.out.bam.groupTuple())
    MarkDuplicates(MergeSamFiles.out.bam)
    BaseRecalibrator(MarkDuplicates.out.bam)
    ApplyBQSR(BaseRecalibrator.out.bam)
    HaplotypeCaller(ApplyBQSR.out.bam)
    CombineGVCFs(HaplotypeCaller.out.vcf.collect(), HaplotypeCaller.out.vcftbi.collect())
    GenotypeGVCFs(CombineGVCFs.out.vcf)
    SelectVariants(GenotypeGVCFs.out.vcf)
    VariantFiltration(SelectVariants.out.vcf)

    SnpEff(VariantFiltration.out.vcf.flatten())
    VariantsToTable(SnpEff.out.vcf)

    VcfToTable(VariantFiltration.out.vcf.flatten())
}

And the processes up to the crash are:

process BwaMem {
    label 'low_reqs'

    input:
    tuple val(population), val(meta), val(reads)
    // For some reason I have to input reads as val instead of path, otherwise the process will not work

    output:
    tuple val(population), path("${reads[0].simpleName}_sorted.bam"), emit: bam

    script:
    """
    # bwa mem script
    # Defining the bwa-mem/samtools command

    # Defining RG
    RG="@RG\\tID:${meta.flow_cell}.lane-${meta.lane}.${meta.barcode}\\tSM:${population}\\tLB:${meta.internal_library_name}\\tPL:ILLUMINA\\tPU:${meta.flow_cell}.${meta.lane}.${meta.barcode}"

    # Defining the bwa mem | samtools command
    cmd="bwa mem -R '\$RG' ${params.reference_genome} -t ${task.cpus} ${reads[0]} ${reads[1]} | samtools sort --threads ${task.cpus} -o ${reads[0].simpleName}_sorted.bam"

    # Logging command
    echo "\$cmd"

    # Run command recording disk space before and after usage
    echo "Disk space before processing: "
    df -h /scratch

    eval \$cmd

    echo "Disk space after processing: "
    df -h /scratch
    """
}

process MergeSamFiles {
    label 'low_reqs'

    input:
    tuple val(population), path(bams)

    output:
    tuple val(population), path("${population}.bam"), emit: bam

    script:
    def bams_list = bams.collect{"--INPUT $it"}.join(' ')

    """
    # gatk MergeSamFiles script
    # Defining the command
    cmd="gatk MergeSamFiles ${bams_list} --OUTPUT ${population}.bam --TMP_DIR ${params.scratch_directory}"

    echo "\$cmd"

    # Run command
    eval \$cmd
    """
}

process MarkDuplicates {
    label 'medium_reqs'
    publishDir path: "${params.results_directory}/${population}", mode: 'copy', pattern: "*.txt"

    input:
    tuple val(population), path(bam)

    output:
    tuple val(population), path("${population}_duplicates_marked.bam"), emit: bam
    path("${population}_duplicate_metrics.txt")

    script:
    """
    # gatk MarkDuplicates script
    # Defining the command
    cmd="gatk MarkDuplicates --INPUT ${bam} --METRICS_FILE ${population}_duplicate_metrics.txt --OUTPUT ${population}_duplicates_marked.bam --TMP_DIR ${params.scratch_directory}"

    echo "\$cmd"

    # Run command
    eval \$cmd
    """
}

My nextflow.config file looks like this:

params {
    // Mandatory
    samples_directory   = '/nfs3/IB/Burke_Lab/Crestani/nextflow/fastqs/'
    reference_genome    = '/nfs3/IB/Burke_Lab/Crestani/nextflow/reference/dmel-all-chromosome-r6.51.fasta'
    bqsr_vcf            = '/nfs3/IB/Burke_Lab/Crestani/nextflow/reference/DGRP2.source_NCSU.dm6.final.vcf'
    results_directory   = '/nfs3/IB/Burke_Lab/Crestani/nextflow/results/'
    reports_directory   = '/nfs3/IB/Burke_Lab/Crestani/nextflow/results/reports'
    scratch_directory   = '/scratch'

    max_cpus            = 120
    max_memory          = '1T'
}

process {
    executor            = 'slurm'
    queue               = 'burke_lab'
    scratch             = '/scratch'

    withLabel: low_reqs {
      cpus              = 64
      memory            = '512G'
    }

    withLabel: medium_reqs {
      cpus              = 64
      memory            = '512G'
    }

    withLabel: high_reqs {
      cpus              = 128
      memory            = '512G'
    }
}

executor.queuesize  = 40

plugins {
  id 'nf-boost'
}

boost {
  cleanup = false
}

report {
  enabled = true
  file = "${params.reports_directory}/report.html"
  overwrite = true
}

trace {
  enabled = true
  file = "${params.reports_directory}/trace.txt"
  overwrite = true
}

timeline {
  enabled = true
  file = "${params.reports_directory}/timeline.html"
  overwrite = true
}

dag {
  enabled = true
  file = "${params.reports_directory}/dag.html"
  overwrite = true
}

If I set cleanup = false, the pipeline runs and completes without issues. I am running Nextflow version 24.04.3 on my university's HPC (which uses SLURM). I am likely doing something wrong! Can you please help me troubleshoot this? Thank you very much!

bentsherman commented 1 week ago

@nservant this looks like the issue you were telling me about GATK using async I/O. I see -Dsamjdk.use_async_io_write_samtools=true in the command line, is that the flag you had to disable?

nservant commented 1 week ago

Indeed, I tested this parameter, but in my case, it was more a cluster I/O issue. Increasing the boost.cleanupInterval to 180 sec fixed the issue. But I guess tuning both parameters could be useful

bentsherman commented 1 week ago

@gacrestani try adding this setting to your nextflow config first:

boost.cleanupInterval = '180s'

If that doesn't work then try disabling the flag I mentioned. But ideally the cleanup should be able to work with GATK's async I/O which will be more efficient. I think the default cleanup interval is just too short

EDIT: fixed quotes in config code

gacrestani commented 1 week ago

Thank you. I will add the flag, rerun everything, and let you know how it goes.

gacrestani commented 1 week ago

A ran just crashed due to the same error. Do you think that further increasing the time would help solve the problem?

[cqls-x86_64-tcsh crestang@shmoo nextflow]$ cat genomics_pipeline_trial70/genomics_pipeline_trial70.o100431 
  Started on:    shmoo
  Started at:    Mon Sep 23 02:49:28 PM PDT 2024
N E X T F L O W  ~  version 24.04.3
Launching `main.nf` [exotic_sinoussi] DSL2 - revision: 45e929d511
B U R K E   L A B   P I P E L I N E
===================================

[8a/d89358] Submitted process > BwaMem (1)
[20/1e8fd5] Submitted process > BwaMem (2)
[d6/8d2aa4] Submitted process > BwaMem (3)
[0c/fbeac6] Submitted process > BwaMem (5)
[f1/109c6b] Submitted process > BwaMem (4)
[a7/f46cc7] Submitted process > BwaMem (6)
[25/14158e] Submitted process > MergeSamFiles (1)
[49/20221b] Submitted process > MergeSamFiles (2)
[a6/b8f854] Submitted process > MarkDuplicates (1)
[b5/28c8b6] Submitted process > MarkDuplicates (2)
[2e/5a2a86] Submitted process > BaseRecalibrator (1)
[bb/acb235] Submitted process > BaseRecalibrator (2)
ERROR ~ Error executing process > 'BaseRecalibrator (2)'

Caused by:
  Missing output file(s) `EB_rep04_gen20_duplicates_marked.bam` expected by process `BaseRecalibrator (2)`