nextflow-io / nextflow

A DSL for data-driven computational pipelines
http://nextflow.io
Apache License 2.0
2.64k stars 608 forks source link

Emit n (or at least 2) items from collectFile closure #4800

Open GallVp opened 4 months ago

GallVp commented 4 months ago

Emit n items (or at least 2) from collectFile closure

Usage scenario

nf-core data flows use a meta map to propagate meta-data along with the data. When collectFile is used in combination with the meta map, a number of channel manipulations have to be done to match the meta map with the collected file and to use the meta.id as the prefix for the collected file. Kindly see an example at https://github.com/nf-core/modules/pull/5002#issuecomment-1970701393, where meta.id is injected into the data list so that it is used as the file name: [ "${meta.id}.mapped.monoploid.seqs.txt" ] + mapped_ids. Moreover, the channel manipulations needed to retain meta-data with the collected file are described at https://github.com/nf-core/website/issues/2242#issue-2102434383

Suggest implementation

The data flow in the example attached above can be greatly simplified if the following capability is implemented for collectFile:

Scenario I:

Channel.of( [ [ id:'test', etc:'etc' ], [ 'a', 'b', 'c', 'd' ] ] )
| collectFile { meta, data ->  [ "${meta.id}.txt", data.join("\n"), meta ] } // Closure returns n items: file name, data, any other items which are passed through as is
| map { file, meta -> [ meta, file ] }

Scenario II:

Channel.of( [ [ id:'test', etc:'etc' ], [ 'a', 'b', 'c', 'd' ] ], [ [ id:'test', etc:'etc' ], [ 'j', 'k', 'l', 'm' ] ] )
| collectFile { meta, data ->  [ "${meta.id}.txt", data.join("\n"), meta ] } // Closure returns n items: file name, data, any other items which are passed through as is
| map { file, meta -> [ meta, file ] }

A single file should be emitted because meta matches across the two samples and they have the same file names.

Scenario III:

Channel.of( [ [ id:'test', etc:'etc' ], [ 'a', 'b', 'c', 'd' ] ], [ [ id:'test', etc:'etc2' ], [ 'j', 'k', 'l', 'm' ] ] )
| collectFile { meta, data ->  [ "${meta.id}.txt", data.join("\n"), meta ] } // Closure returns n items: file name, data, any other items which are passed through as is
| map { file, meta -> [ meta, file ] }

An exception should be raised because the file name is same but the meta is different, therefore, collectFile cannot emit a single file with differing meta-data.

Thank you

bentsherman commented 4 months ago

I think the fundamental problem is that collectFile does too many things without being flexible enough to configure it. I would propose breaking the operator into a few pieces:

then you could do something like:

ch_records
  | group { /* ... */ }
  | map { key, meta, items ->
    def file = mergeText(items)
    return [ meta, file ]
  }
  | // ...

I've been playing with some of these ideas in a plugin called nf-boost. Your example is an interesting use case that I'd like to try. Let me get back to you with a more concrete implementation

pditommaso commented 4 months ago

@GallVp essentially you are proposing collectFile should return the meta object along the collected file.

What it's not convincing when saying.

An exception should be raised because the file name is same but the meta is different

Is this a fair expectation?

GallVp commented 4 months ago

Thank you for the feedback @pditommaso

It is not necessary that an exception is raised. If two files with same name, different data and different meta-data can be emitted, that's also okay. It might be confusing in some cases, but if the documentation is clear then people can get used to it.

I assumed that the file name will be used to perform an implicit groupTuple which will result in [ "file name", [data1, data2, data3, ...], [ meta1, meta2, meta3,... ], [...], ... ]. In such a case, emitting multiple files with the same name might be confusing.

I am thinking out load and some of the ideas here might not make practical sense from the implementation point of view.

bentsherman commented 4 months ago

@GallVp I have expanded nf-boost to support something like collectFile which is more flexible. See this example: https://github.com/bentsherman/nf-boost/blob/main/examples/merge-text.nf

The plugin adds a mergeText function which is the core functionality of collectFile, that is to concatenate a list of files. Because it's a function, you can use it in an operator or process which gives you much more flexibility.

You can recover the rest of collectFile -- grouping and sorting -- using groupTuple and standard Groovy list sorting methods.

You should be able to customize it further to meet your needs. Basically, after the groupTuple, you can inspect the meta map of each item in the group to make sure they match and throw an error if they don't.

My hope is that mergeText will be added to core Nextflow, but until then you can use it by including the nf-boost plugin in your pipeline. Let me know how it goes!

GallVp commented 3 months ago

Hi @bentsherman

Thank you for the detailed examples and mergeText provides very useful functionality. Instead of creating more flavours of collectFile, publishing the underlying generic function which can be used with both operators and processes is a good idea.

Until mergeText makes its way into NextFlow, this solution won't work for nf-core modules and sub-workflows.