nextflow-io / patterns

A curated collection of Nextflow implementation patterns
http://nextflow-io.github.io/patterns/
MIT License
332 stars 71 forks source link

Example of aggregating file with associated metadata #40

Open nick-youngblut opened 1 year ago

nick-youngblut commented 1 year ago

Currently, there is no example of aggregating files AND associated metadata. For instance, in many/most nf-core pipelines the process outputs are something like:

output:
tuple val(meta), path("file.txt")

...but what if one wants to then aggregate all of the file.txt outputs into one table AND include the meta metadata in that output table?

As far as I can tell from scouring the nextflow slack channel, one must "embed" the metadata in the file paths and then parse the file paths in the aggregation step. For example:

Per-file process:

output:
tuple val(meta), path("${meta}.txt")

Aggregation process:

input:
path("*")

script:
"""
[somehow parse {meta} from input file path] 
"""

Is there a better way, especially given the substantial limitations of trying to embed metadata into a file path (eg., dealing with multiple values and special characters in the metadata values)?

I'm sure a lot of pipeline developers would like a best-practices example of how to deal with this situation (without having to decipher how meta is dealt with in aggregation steps of nf-core pipelines).

nick-youngblut commented 1 year ago

As another example, what is the best practice for aggregating multiple sets of files. For example:

per-file job:

input:
tuple val(sample_id), path("file.txt")

output:
path "${sample_id}_output1.txt", emit: output1
path "${sample_id}_output2.txt", emit: output2

For one output channel, one could just do the following:

process SUMMARY{
  input:
  path ("*")

  output:
  path "summary.txt"

  script:
  """
  [somehow get {sample_id} from the file path, and then aggregate the files, while including {sample_id}]
  """

...but for multiple output channels, the following doesn't work:

process SUMMARY{
  input:
  path ("*")
  path ("*"

  output:
  path "summary.txt"

  script:
  """
  # for output1:
  [somehow get {sample_id} from the file path, and then aggregate the files, while including {sample_id}]
  # for output2:
  [somehow get {sample_id} from the file path, and then aggregate the files, while including {sample_id}]
  """

Must one create a SUMMARY process for each set of files (each path channel) emitted from the per-file process (e.g., process SUMMARY_OUTPUT1 and process SUMMARY_OUTPUT2), or is there a better, more scalable & maintainable approach?

nick-youngblut commented 1 year ago

Assuming that one must provide metadata values via the file paths, when aggregating multiple files in Nextflow, I created the following script to aggregate files (assumes structured table files) and extracts values from the input file names (e.g., the sample name from files named "{sample_name}.tsv") and adds those extracted values as extra columns in the aggregated output table. I just used the same python code as used in snakemake for the same purpose.

concat_tables.py ```python #!/usr/bin/env python from __future__ import print_function import os import re import sys import argparse import logging import collections from itertools import chain from glob import glob # logging logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.DEBUG) # argparse class CustomFormatter(argparse.ArgumentDefaultsHelpFormatter, argparse.RawDescriptionHelpFormatter): pass desc = 'Concatenate tables with the same headers' epi = """DESCRIPTION: Aggregate >=1 structured table. The columns are assumed to be all of the same among the tables. Values can be extracted from the input file names (e.g., sample_id from sample1.txt) and added as extra columns in the aggregated output table. Use a glob pattern to identify >=1 input file. Use --pattern to pull out values from the file path. For example, use "{sample_id}.txt" to get the sample id from files named "sample1.txt", "sample2.txt", etc. Output written to STDOUT. """ parser = argparse.ArgumentParser(description=desc, epilog=epi, formatter_class=CustomFormatter) parser.add_argument('glob', type=str, help='Glob pattern to find the input files, Example: "*.stats.tsv"') parser.add_argument('-p', '--pattern', type=str, default=None, help='Matching pattern in file naming scheme. Example: "{sample_id}_taxid-{taxid}"') parser.add_argument('-f', '--fields', type=str, default=None, nargs='+', help='Pattern matching fields to use in output. Example: "taxid"') parser.add_argument('-d', '--delimiter', type=str, default='\t', help='Field delimiter') parser.add_argument('-H', '--no-header', action='store_true', default=False, help='No headers?') parser.add_argument('--version', action='version', version='0.0.1') _wildcard_regex = re.compile( r""" \{ (?=( # This lookahead assertion emulates an 'atomic group' # which is required for performance \s*(?P\w+) # wildcard name (\s*,\s* (?P # an optional constraint ([^{}]+ | \{\d+(,\d+)?\})* # allow curly braces to nest one level ) # ... as in '{w,a{3,5}}' )?\s* ))\1 \} """, re.VERBOSE, ) def regex(filepattern): """ # Function taken from snakemake repo """ f = [] last = 0 wildcards = set() for match in _wildcard_regex.finditer(filepattern): f.append(re.escape(filepattern[last : match.start()])) wildcard = match.group("name") if wildcard in wildcards: if match.group("constraint"): raise ValueError( "Constraint regex must be defined only in the first " "occurence of the wildcard in a string." ) f.append("(?P={})".format(wildcard)) else: wildcards.add(wildcard) f.append( "(?P<{}>{})".format( wildcard, match.group("constraint") if match.group("constraint") else ".+", ) ) last = match.end() f.append(re.escape(filepattern[last:])) f.append("$") # ensure that the match spans the whole file return "".join(f) def glob_wildcards(pattern, files=None, followlinks=False): """ # Function taken from snakemake repo Glob the values of the wildcards by matching the given pattern to the filesystem. Returns a named tuple with a list of values for each wildcard. """ pattern = os.path.normpath(pattern) first_wildcard = re.search("{[^{]", pattern) dirname = ( os.path.dirname(pattern[: first_wildcard.start()]) if first_wildcard else os.path.dirname(pattern) ) if not dirname: dirname = "." names = [match.group("name") for match in _wildcard_regex.finditer(pattern)] Wildcards = collections.namedtuple("Wildcards", names) wildcards = Wildcards(*[list() for name in names]) pattern = re.compile(regex(pattern)) if files is None: files = ( os.path.normpath(os.path.join(dirpath, f)) for dirpath, dirnames, filenames in os.walk( dirname, followlinks=followlinks ) for f in chain(filenames, dirnames) ) for f in files: match = re.match(pattern, f) if match: for name, value in match.groupdict().items(): getattr(wildcards, name).append(value) return wildcards def main(args): # listing files infiles = glob(args.glob) logging.info(f'No. of files found: {len(infiles)}') # reading header_written = False for F in infiles: # file path pattern matching if args.pattern is not None: wcs = glob_wildcards(args.pattern, files=[F]) else: wcs = None ## select just certain fields if args.fields is not None: wcs_fields = args.fields else: wcs_fields = list(wcs._fields) ## pattern didn't match? if (wcs is not None and any([len(getattr(wcs, x)) == 0 for x in wcs_fields])): logging.warning(f'>=1 pattern field is empty for file: {F}') wcs = None # reading file with open(F) as inF: logging.info(f'Reading file: {F}') skip_cnt = 0 for i,line in enumerate(inF): # parsing line = line.rstrip().split(args.delimiter) # skipping lines if line[0] == '' or line[0].startswith('#'): skip_cnt += 1 continue # header if not args.no_header and i - skip_cnt == 0: if header_written: continue header_written = True # adding extra fields if wcs is not None: line = wcs_fields + line else: # adding extra fields if wcs is not None: line = [getattr(wcs, x)[0] for x in wcs_fields] + line # printing line to STDOUT print(args.delimiter.join(line)) ## script main if __name__ == '__main__': args = parser.parse_args() main(args) ```

Is this the "best" way of aggregating files and associated file metadata?