nf-core / modules

Repository to host tool-specific module files for the Nextflow DSL2 community!
https://nf-co.re/modules
MIT License
279 stars 696 forks source link

emitting metrics (info,stat,summaries) from modules for analytics and db ingestion #2330

Open rjpbonnal opened 2 years ago

rjpbonnal commented 2 years ago

Is your feature request related to a problem? Please describe

Intro

nf-core, with its community, is building a collection of reusable pipelines. With the introduction of DSL2, nf-core created the modules project which aim is to decoupling the inner part of the pipelines to find the common elements and create small bricks that can be reused as well. Each module is, usually associated to a single application of a precise functionality, which generates outputs in form of nextflow channels. Channels can be "attached" to: files; stdout; and in general tuples generated and defined inside the output of the module's process. The output's channels can be used to communicate with the main orchestrator and pass data around to the subsequent steps. It's a best practice to emit those files which contain log or information about the result of the software such as: statistics, info, log, metrics. The emitted files/channel are then feeding software to extract and collect those information in a centralized manner, the main software to perform this task in a comprehensive way, which also generates a fancy browsable and interactive html document is MultiQC.

Problem

Even if MultiQC is a widespread tool in bioinformatics to collect and visualize the metrics from the analyses performed during a workflow, it may be suboptimal to extract the information. Even more, nextflow's design is build around the dataflow paradigm. Information came out from a process and are digested by others like a continuous stream. In this regard the metrics (info, log, statistics, summary) generated by the software, inside the processes, are digested mainly (only) by MultiQC, which is usually at the end of the pipeline. If the user wants to collect the metrics immediately after their production and eventually store them inside a database for further analytics he/she must post-process the files, those generated by the single processes or the MutliQC outputs. Another issues arise when the output of a software is not yet supported by MultiQC. In this case the user has two options: 1) contribute to MultiQC adding the specific plugin, 2) customize the pipeline and create a separate report. Both the solution are suboptimal, the first it is slow and require the developer to join another community, learn how to develop a plugin and then wait for the approval for its contribute; the second may break the pipeline and can be hard to follow the guidelines for the nf-core community (elaborate more on this).

Describe the solution you'd like

Solution

To follow, at the same time, the nextflow's paradigms/approches, the nf-core guidelines and giving the developer more flexibility and freedom in consuming the metrics, I propose to add to each module an output channel which emits the metrics. The name of the channel is (metrics) This could be a convention similar to the versions.yml and the developer has the ability to develop its own way to generate the metrics starting from the software's output. The output format can be tabular or json (must be decided in advance). My experiments are based on the long table format which I prefer for simplicity and because it can be easily provided as input to many database engines.

The emitting channel can avoid to emit the header because the number of the columns must be the same for ALL the modules. If all the channels are emitting the header it must be skipped at the time of collection, to avoid repetitions. The basic columns:

in this example the data type is missing, and I don't know how to encode it and which "standard" to use

fecb5e54-67df-42be-a70a-e4a2e5344ad4,1,cellranger-count,cellranger_metrics,Mean Reads per Cell,128333
fecb5e54-67df-42be-a70a-e4a2e5344ad4,2,cellranger-count,cellranger_metrics,Fraction Reads in Cells,85.9%
fecb5e54-67df-42be-a70a-e4a2e5344ad4,1,cellranger-count,cellranger_metrics,Reads Mapped to Genome,93.7%
fecb5e54-67df-42be-a70a-e4a2e5344ad4,2,cellranger-count,cellranger_metrics,Number of Reads,569791233
fecb5e54-67df-42be-a70a-e4a2e5344ad4,2,cellranger-count,cellranger_metrics,Q30 Bases in Barcode,92.7%
fecb5e54-67df-42be-a70a-e4a2e5344ad4,3,cellranger-count,cellranger_metrics,Estimated Number of Cells,2500
fecb5e54-67df-42be-a70a-e4a2e5344ad4,3,cellranger-count,cellranger_metrics,Reads Mapped Confidently to Intergenic Regions,7.8%

I usually add the nxf_session_id when I collect the channels in the main.nf, eventually nxf_session_id could be added directly inside the process. The nxf_session_id is crucial to connect the nextflow's log with the metrics from the different processes.

My experiments are not at the "module" level. I process the metrics' channels in the main.nf, remember that this is a proof of concept. This approach as many disadvantages and does not allow a decentralized development by every nf-core module's owner (which I aim to).

Ideally each module should have a Python? script for processing its own output, assuming that:

So this is a proposal and I hope to have the chance to discuss more on this.

Describe alternatives you've considered

Examples

ChIP-seq

/*
 * Long table format summaries
 */

fastqc_data.flatMap { sample, tag, info_type, f->
        def reader = f.newReader()
        def lines = reader.readLines()*.tokenize()
        def results = []
        results << [sample, tag, info_type, "Raw reads", lines[6][2]]
        return results
}
    .set { fastqc_data_summary }

samtools_flagstats.flatMap { sample, tag, info_type, f->
        def reader = f.newReader()
        def lines = reader.readLines()*.tokenize()
        def results = []
        results << [sample, tag, info_type, "Input total", lines[0][0]]
        results << [sample, tag, info_type, "Duplicates", lines[3][0]]
        results << [sample, tag, info_type, "Mapped", lines[4][0]]
        results << [sample, tag, info_type, "Paired in sequencing", lines[5][0]]
        results << [sample, tag, info_type, "Read1", lines[6][0]]
        results << [sample, tag, info_type, "Read2", lines[7][0]]
        results << [sample, tag, info_type, "Properly paired", lines[8][0]]
        results << [sample, tag, info_type, "With itself and mate mapped", lines[9][0]]
        results << [sample, tag, info_type, "Singletons", lines[10][0]]
        results << [sample, tag, info_type, "With mate mapped to a different char", lines[11][0]]
        results << [sample, tag, info_type, "With mate mapped to a different char mapQ get 5", lines[12][0]]
        return results
    }
    .set { samtools_flagstat_summary }

samtools_flagstats_aftFilt.flatMap { sample, tag, info_type, f->
        def reader = f.newReader()
        def lines = reader.readLines()*.tokenize()
        def results = []
        results << [sample, tag, info_type, "Input total", lines[0][0]]
        results << [sample, tag, info_type, "Duplicates", lines[3][0]]
        results << [sample, tag, info_type, "Mapped", lines[4][0]]
        results << [sample, tag, info_type, "Paired in sequencing", lines[5][0]]
        results << [sample, tag, info_type, "Read1", lines[6][0]]
        results << [sample, tag, info_type, "Read2", lines[7][0]]
        results << [sample, tag, info_type, "Properly paired", lines[8][0]]
        results << [sample, tag, info_type, "With itself and mate mapped", lines[9][0]]
        results << [sample, tag, info_type, "Singletons", lines[10][0]]
        results << [sample, tag, info_type, "With mate mapped to a different char", lines[11][0]]
        results << [sample, tag, info_type, "With mate mapped to a different char mapQ get 5", lines[12][0]]
        return results
    }
    .set { samtools_flagstats_aftFilt_summary }

if (!params.single_end) { samtools_flagstats_aftFilt_orph.flatMap { sample, tag, info_type, f->
        def reader = f.newReader()
        def lines = reader.readLines()*.tokenize()
        def results = []
        results << [sample, tag, info_type, "Input total", lines[0][0]]
        results << [sample, tag, info_type, "Duplicates", lines[3][0]]
        results << [sample, tag, info_type, "Mapped", lines[4][0]]
        results << [sample, tag, info_type, "Paired in sequencing", lines[5][0]]
        results << [sample, tag, info_type, "Read1", lines[6][0]]
        results << [sample, tag, info_type, "Read2", lines[7][0]]
        results << [sample, tag, info_type, "Properly paired", lines[8][0]]
        results << [sample, tag, info_type, "With itself and mate mapped", lines[9][0]]
        results << [sample, tag, info_type, "Singletons", lines[10][0]]
        results << [sample, tag, info_type, "With mate mapped to a different char", lines[11][0]]
        results << [sample, tag, info_type, "With mate mapped to a different char mapQ get 5", lines[12][0]]
        return results
    }
    .set { samtools_flagstats_aftFilt_orph_summary }
}

phantompeakqual_stats.flatMap { sample, tag, info_type, f ->
    def reader = f.newReader()
    def token = reader.readLines()[0].trim().tokenize()
    def results = []
    results << [sample, tag, info_type, "N Fragments", token[1]]
    results << [sample, tag, info_type, "Top 3 estimates for fragment length", token[2].replaceAll(/,/,';')]
    results << [sample, tag, info_type, "Top 3 cross-correlation values", token[3].replaceAll(/,/,';')]
    results << [sample, tag, info_type, "Phantom peak location", token[4]]
    results << [sample, tag, info_type, "Phantom peak Correlation", token[5]]
    results << [sample, tag, info_type, "Minimum cross-correlation shift", token[6]]
    results << [sample, tag, info_type, "Minimum cross-correlation value", token[7]]
    results << [sample, tag, info_type, "NSC", token[8]]
    results << [sample, tag, info_type, "RSC", token[9]]
    results << [sample, tag, info_type, "Qtag", token[10]]
    return results
}
    .set { phantompeakqual_summary }

macs2_peaks_counting.flatMap { sample, tag, info_type, f->
    def reader = f.newReader()
    def lines = reader.readLines()*.tokenize()
    def results = []
    results << [sample, tag, info_type, "MACS2 peaks", lines[0][1]]
    return results
}
    .set { macs2_peaks_summary }

macs2_peaks_frip.flatMap { sample, tag, info_type, f->
    def reader = f.newReader()
    def lines = reader.readLines()*.tokenize()
    def results = []
    results << [sample, tag, info_type, "MACS2 FRiP", lines[0][1]]
    return results
}
    .set { macs2_frip_summary }

if (params.single_end) { fastqc_summary
    .mix(fastqc_rmAdapt_summary)
    .flatMap { sample, tag, info_type, f ->
        def results = []
        def reader = f.newReader()
        reader.splitEachLine('\t') { line ->
          results << [sample, tag, info_type, line[1], line[0]]
        }
     return results}
     .mix(fastqc_data_summary, samtools_flagstat_summary, samtools_flagstats_aftFilt_summary, phantompeakqual_summary, macs2_peaks_summary, macs2_frip_summary)
    .map {
        def data = it[0].split('_')
        it[1]+=':['+data[1..-1].join(";")+']'
        it[0]=data[0]
        it.join(",").trim()
    }

An example of parsing the results from CellRanger output

def csv_parser = { string, quote='"', internal_separator=',' -> 
    def new_string = []
    def quote_started=false
    string.eachWithIndex { c, i ->
      if (quote_started) {
        if (c == quote ) {
          quote_started = false
        } else if (c != internal_separator) {
      new_string << c
    }
      } else if ( c == quote ) {
          quote_started = true 
      } else {
        new_string << c
    }
    }
    new_string.join().trim()
}

metrics_rna.flatMap { sample, tag, info_type, f ->
    def reader = f.newReader()
    def lines = reader.readLines()
    def results = []
    def columns_names = csv_parser(lines[0]).tokenize(",")
    def columns_values = csv_parser(lines[1]).tokenize(",")
    columns_names.eachWithIndex { item, index ->
      results << [sample, tag, info_type, item, columns_values[index]]
    }
    return results
}
    .set { summary_rna }

metrics_vdj.flatMap { sample, tag, info_type, f ->
    def reader = f.newReader()
    def lines = reader.readLines()
    def results = []
    def columns_names = csv_parser(lines[0]).tokenize(",")
    def columns_values = csv_parser(lines[1]).tokenize(",")
    columns_names.eachWithIndex { item, index ->
      results << [sample, tag, info_type, item, columns_values[index]]
    }
    return results
}
    .set { summary_vdj }

metrics_feature.flatMap { sample, tag, info_type, f ->
    def reader = f.newReader()
    def lines = reader.readLines()
    def results = []
    def columns_names = csv_parser(lines[0]).tokenize(",")
    def columns_values = csv_parser(lines[1]).tokenize(",")
    columns_names.eachWithIndex { item, index ->
      results << [sample, tag, info_type, item, columns_values[index]]
    }
    return results
}
    .set { summary_feature }

Additional context

No response

edmundmiller commented 1 year ago

I can see the need you're describing.

I don't know if it's necessary for every module. Does the cat_fastq module need this?

I think this could be brought back with a functions.nf in those modules that you pointed out, this could be useful.

Maybe a POC of one or two of these and some test examples of the output could spark some more interest.

This may also just be pipeline specific for now, you're not going to use cellranger in chipseq for example.