nextflow-io / nextflow

A DSL for data-driven computational pipelines
http://nextflow.io
Apache License 2.0
2.78k stars 634 forks source link

Long and Short reads several mapping rounds #2185

Closed LPerlaza closed 3 years ago

LPerlaza commented 3 years ago

Hi! I'm just starting coding nextflow, and couldn't find an answer/example for my specific problem, but apologies if I miss it. I want to write a pipeline for metagenomes binning (and some other features). I have long reads and short reads. The pipeline should assemble the long reads, and use the short reads to polish/improve the long reads assembly. Note that for each long read assembly I have several pairs of short reads (different sequencing platforms or library preps). So, I do several mapping rounds. I want to do this in one workflow because I need to do some merging at the end.

Roughly it is something like:

  1. assembly of long reads --> file with the assembly (assemblylong)
  2. polished assembly using short reads --> several files, one file per each short read method (nanoassemblydef)
  3. map to polished assembly using several short reads pairs independently --> several files, one file per each short read method (mappedassembly)

The 3rd step is giving me the problems because I need the mapping of the short reads to map only for the long read assembly polished with those same reads. Is there a way that I can make the pipeline create a channel/variable that has the structure of a hash where the polished assembly of the long reads shares an ID with the corresponding short reads used for that polish?

the problematic step produces a files like for example:

CORRECT: nanopore.gz_test_merged_map.sam_assemblyracon2correction.fasta_catalogue.mmi_test_map.sam

(1) nanopore.gz_(2) test_merged_map.sam (3) _assemblyracon2correction.fasta_catalogue.mmi (4)_test_map.sam

1.long reads file

  1. reads used for polishing
  2. arbitrary prefix
  3. short reads used for mapping

INCORRECT: nanopore.gz_test_merged_map.sam_assemblyracon2correction.fasta_catalogue.mmi_data_map.sam

(1) nanopore.gz_(2) test_merged_map.sam (3) _assemblyracon2correction.fasta_catalogue.mmi (4)_data_map.sam

  1. long reads file
  2. reads used for polishing
  3. arbitrary prefix
  4. short reads used for mapping don't correspond to reads used for polish

examplePipeline.nf

The pipeline is just echoing commands at the moment so it is easy to reproduce. The input file could even be empty text files like:

The input files should be: a long read file = nanopore.gz short reads= test.R1.fq, testR2.fq, data.R1.fq, data.R2.fq

#! /usr/bin/env nextflow
nextflow.enable.dsl=2

/*
 * pipeline input parameters
 */

params.assembly_type ='hybrid'
params.shortreads = "$baseDir/test/*{R1,R2}.fastq"
params.longreads = "$baseDir/test/nanopore.gz"
params.threads=(26)
params.outdir="$baseDir/results/"
params.publish_dir_mode='copy'
params.modules= "$baseDir/examplePipeline_modules"

/*
 * Channels
 */

Channel.fromFilePairs(params.shortreads, checkIfExists: true)
        .set{  ch_in_shortreads}

Channel.fromPath(params.longreads, checkIfExists: true)
        .set{ch_in_longreads}

Channel.value( params.threads )
       .set{ threads_ch }

/*
 * print parameters input
 */

log.info """\

         Metagenomics Binning  P I P E L I N E
         ===================================
         Type of Assmbly   : ${params.assembly_type}
         Short Reads       : ${params.shortreads}
         Long Reads        : ${params.longreads}
         Number of Threats : ${params.threads}
         Output directory  : ${params.outdir}

         """
         .stripIndent()

/*
 * include
 */

include { assemblyLong  } from params.modules
include { racon_1  } from params.modules
include { mapping_nano_racon  } from params.modules
include { shuffle_pairs } from params.modules
include { mapping_index } from params.modules
include { mapping  } from params.modules

/*
 * workflow
 */

workflow {
    assemblyLong (ch_in_longreads,threads_ch )
    racon_1 (ch_in_longreads,assemblyLong.out.assemblylong,threads_ch)
    shuffle_pairs (ch_in_shortreads)
      mapping_nano_racon(assemblyLong.out.assemblylong, 
     shuffle_pairs.out.mergeshort,ch_in_longreads,racon_1.out.racon,threads_ch)
    mapping_index(  mapping_nano_racon.out.nanoassemblydef)
    mapping( mapping_index.out.assemblyindex,ch_in_shortreads,threads_ch)

} 

workflow.onComplete { 
    println ( workflow.success ? "\nDone! Open the following report in your browser --> $params.outdir\n" : "Oops .. something went wrong" )
}

examplePipeline_modules.nf

the processes need to execute the pipeline code (include)


process assemblyLong{

    publishDir params.outdir, mode: 'copy'

    input:
    path (FileLongReads)
    val threads 

    output:
    path """${FileLongReads}_assemblylongreadsout.fasta""" , emit: assemblylong

    script:

  """
  mkdir ${FileLongReads}_assemblylongreadsout.fasta
  echo 'flye --nano-raw $FileLongReads --out-dir ${FileLongReads}_assemblylongreadsout.fasta --meta --plasmids --threads $threads'
  """
}    

process racon_1 {

    publishDir params.outdir, mode: 'copy'

    input:
    path (assemblylong)
    path (nanofastq)
    val (threads)

    output: 
    path "${nanofastq}_gfa1"  , emit: gfa1
    path "${nanofastq}_racon" , emit: racon

    script:

    """

    echo 'minimap2 -t $threads $assemblylong $nanofastq'  > ${nanofastq}_gfa1
    echo 'racon -t $threads racon $nanofastq ${nanofastq}_gfa1 $assemblylong' > ${nanofastq}_racon

    """
}   

process shuffle_pairs {

    publishDir params.outdir, mode: 'copy'

    input:
    tuple val(genomeName), path (genomeReads)

    output: 

    path "${genomeName}_merged" , emit: mergeshort

    script:

    """

    echo 'python3 shuffle_pairs_fastq.py ${genomeReads[0]} ${genomeReads[1]}' > ${genomeName}_merged

    """
}   

process mapping_nano_racon {

    publishDir params.outdir, mode: 'copy'

    input:
    path (assemblylong) 
    each path (mergeshort)
    path (nanofastq)
    path (nanoracon)
    val (threads)

    output: 
    path "${nanofastq}_${mergeshort}_map.sam"  , emit: mapsam
    path "${nanofastq}_${mergeshort}_map.sam_assemblyracon2correction.fasta" , emit: nanoassemblydef

    script:

    """

    echo 'minimap2 -t $threads -ax sr $assemblylong $mergeshort' > ${nanofastq}_${mergeshort}_map.sam
    echo 'racon -t $threads $mergeshort ${nanofastq}_${mergeshort}_map.sam $nanoracon'  > ${nanofastq}_${mergeshort}_map.sam_assemblyracon2correction.fasta

    """
}   

process mapping_index {

    publishDir params.outdir, mode: 'copy'

    input:
    path (nanoassemblydef) 

    output: 
    path "${nanoassemblydef}_catalogue.mmi"  , emit: assemblyindex

    script:

    """

    echo 'minimap2 -d ${nanoassemblydef}_catalogue.mmi ${nanoassemblydef}' > ${nanoassemblydef}_catalogue.mmi
    """
}   

process mapping {

    publishDir params.outdir, mode: 'copy'

    input:
    path(assemblyindex) 
    tuple val(genomeName), path(genomeReads)
    val (threads)

    output: 
    path "${assemblyindex}_${genomeName}_map.sam"  , emit: mappedassembly

    script:
    def connected_ch = assemblyindex
    """
    echo $connected_ch
    echo '$assemblyindex' >> ${assemblyindex}_${genomeName}_map.sam
    echo 'minimap2 -t $threads -N 50 -ax sr $assemblyindex ${genomeReads[0]} ${genomeReads[1]}' >>${assemblyindex}_${genomeName}_map.sam

    """
}   

Thanks!! Laura

pditommaso commented 3 years ago

Closing as duplicate of https://github.com/nextflow-io/nextflow/discussions/2180.