Open ewels opened 6 months ago
I have been using collectFile for this
Just to add more details than sending a link to sarek.
So this should do the trick in an easier/nicer way:
GATK_HAPLOTYPECALLER.out.collectFile() { id, vcf, idx ->
["vcf.csv", "${id},${vcf},${idx}\n"]
}
Having a delimiter field would be nice to handle both csv
and tsv
at once, but to be honest, I'm happy enough with having to put ,
or \t
directly.
What I found was the most tricky part with the collectFile()
is to deal properly with a header and a sorted list, so for that I use a mix of keepHeader: true, skip: 1,sort: true
as options for the collectFile()
.
Would be interesting to know more about the use case. For example in https://github.com/nextflow-io/nextflow/issues/4670 was discussed to have implicit creation of csv based index file for outputs
For Sarek, the use case is simple, when we started working on it and using it to remap the data for the Swedish Genome Project, we had some some hardware issues, so resumability was often not working properly, so around 10 % of samples run were failing and couldn't resume. So being able to create csv was a way to have multiple re-entrypoint without having to rely on the Nextflow caching possibilities. For me these csv files are enpoint/entrypoint, and sarek is kinda a workflow of workflows.
In my opinion, they way we do that in fetchngs, we create such files to be entry point for other pipelines, so that's the same idea.
But my guess is that one could also want to create a file for reporting, or even to gather results and be used by a downstream tool.
The problem with creating files in an operator is that there is no task hash associated with an operator execution, which hinders caching and provenance tracking. An exec
process is essentially a map
operator that can be more easily tracked. IMO the best pattern is to provide a regular function which the user wraps in an exec
process.
Having said that, I understand that an exec
process is more verbose than an operator. One way that we might have the best of both worlds is to provide some kind of "inline exec process", like an anonymous function. This "exec" operator would produce a hash for every execution and emit lifecycle events like a regular task, but wouldn't require an explicit process
definition. An equivalent solution would be to add this behavior to operators like collectFile
and the proposed CSV writer.
See also:
I have seen this pattern happen again and again where a framework implements some special way to execute code (promises in JS, observables, etc) that requires every new function under the sun to be re-implemented in this special way. I don't want us to repeat this cycle with operators.
A common example I see is, a user wants to use the splitFastq
operator to split a fastq file into chunks, using a chunk size which is based on dataflow logic. But an operator cannot use a dataflow result to set a parameter like chunk size. Fortunately, splitFastq
is also available as a regular function (linked above), so you can just do this:
ch_fastq | flatMap { splitFastq(it) }
But then what's the point of the splitFastq
operator? It seems like a very thin and inflexible wrapper over a map operation.
This is basically how I feel about collectFile
and the proposed CSV operator as well. I think they should just be regular functions, and we should just document how to use them with a map / flatMap operator or an exec process, and that would save us a lot of headache trying to deal with edge cases where they don't work due to the limitations of operators.
Operators should be used when they provide some kind of dataflow functionality. The groupTuple
operator is useful because it can do a "streaming" reduction, collecting items into groups as they arrive, rather than waiting for all items and collecting them all at once -- it uses less time and memory. The sum
operator does a similar thing, though I don't see much value in using a streaming reduction just to add some numbers.
Regarding the collectFile
operator, here are some of its options:
cache
: Controls the caching ability of thecollectFile
operator when using the resume feature. It follows the same semantic of the cache directive (default:true
).
storeDir
: Folder where the resulting file(s) are stored.
tempDir
: Folder where temporary files, used by the collecting process, are stored.
It's interesting to me how collectFile
needs things like (1) a working directory to store temp files, (2) a directory to store (i.e. "publish") the result files, (3) a way to cache the result on subsequent runs.
If only there were some native way to do these things in Nextflow...
Simple sketch for an inline exec
process:
ch_records
| collect
| exec(name: 'RECORDS_TO_CSV') { records -> mergeCsv(records) }
The exec
operator is just a map
operator that executes each item as a native task, just like an exec
process, but without having to define the process separately. The operator would show up in the console output as <workflow>:...:RECORDS_TO_CSV
. Each execution would be an actual task, with its own hash, working directory, and cache entry for resuming.
It is more verbose than just having a mergeCsv
operator, but it is much more flexible. Whenever a user wants to do a similar but slightly different thing like write to a different format, we don't have to make a whole new operator, just a new function, which they can easily do in their pipeline.
The same treatment can be applied to collectFile
to remove all of the options (listed above) that have been grafted on to make it behave like a process.
@bentsherman regarding the use of a Nextflow process for this, especially exec
, I had some notes about the pitfalls here; https://github.com/stevekm/nextflow-demos/tree/master/metaMap-write-JSON
some of the issues with a Nextflow process for this included having to launch a new container instance, potentially on a new cloud instance, at least when using a script
block. And as per my notes there, using exec
had some severe issues related to staging and writing to files, especially with -resume
using collectFile
ended up being the better method because for the most part it just worked better in a diverse number of situations (e.g. with / without -resume
, with / without cloud storage such as S3 for work dir, etc)
on the other hand, its worth noting that CSV has an actual spec https://datatracker.ietf.org/doc/html/rfc4180 which includes things like carriage returns. You'll also need to handle quoting, and potentially escape characters, I think. CSV is such a mess of a format though, in practice with real life files, that it would be great if its usage could be discouraged. Not sure that will ever happen though. It would be nice if we could convince people to stop using CSV so much and just switch to something like JSON, which coincidentally makes your pipeline code easier as well since it supports typing and nested data structures so you dont need ridiculous things like multiple samplesheets or samplesheet duplicated grouping fields per-row, etc.. If a tabular text delimited file must be written, my experience has been that TSV is generally a better option.
Agree that a script
process would be overkill for such a small task. If there are functional gaps in the exec
process around file I/O and resumability then they should be addressed. since the whole point of an exec
process is to provide all of the typical process functionality to what is otherwise a regular Groovy function.
hey maybe stuff like this would be good candidates for inclusion in something like an nf-core community utility Nextflow plugin :)
This thread derailed a bit from the original request. @vdauwera what's your use case for having a mergeCsv
operator?
The specific use case was a section of GATK pipeline that proceeds as follows:
call variants per sample (takes a bam file per sample and produces a GVCF file and its index)
consolidate all GVCF files produced into a single genomicsDB 'workspace' (mini variant store) and run joint genotyping to produce a single VCF output for the group of samples.
The second step requires a sample map, defined as a TSV file (that's tab separated, not a typo) listing sample ID (as used in the original data RG:SM tag, and subsequently in the VCF) and the file paths to the GVCF and its index for each sample; one sample per line.
This is a requirement of the GATK tools involved.
Sorry for the digression, I will save my notes elsewhere. We can consider the inline exec idea separately. My point is that operators that write files are pesky and should be avoided in favor of regular functions where possible.
@vdauwera this is possible to do with some custom Groovy code, e.g. the SRA_TO_SAMPLESHEET
process in fetchngs, if you need a quick solution. But I would support a mergeCsv
std lib function as a counterpart to splitCsv
, it is easy to implement. TSV can be supported with a separator option.
@stevekm we could also have a mergeJson
function to write JSON files from records, that seems like what you were trying to do in your linked example.
My point is that operators that write files are pesky
I don't believe so. For example collectFile
is an essential operator in many workflows. I believe Geraldine has a good point and it would be relatively easy to add it.
IMO the operator is justified for these kinds of small operations where you need to generate a small intermediate file that's needed for technical reasons but is not a proper data output of the pipeline.
I agree that collectFile
and mergeCsv
are important, but I'm saying they should be regular functions used in an exec
process, not operators. Files created by operators are difficult to track for provenance and caching purposes
Files created by operators are difficult to track for provenance and caching purposes
Also automatic cleanup, which relies on the lifecycle of tasks and publishing, and has no visibility of files created by operators like collectFile
@vdauwera I have implemented a mergeCsv
operator in my nf-boost plugin:
https://github.com/bentsherman/nf-boost
I know you're writing content and a plugin might be a weird thing to introduce, but this is a nice way for you to play with an experimental version before we add it is a core feature. There is an example pipeline which shows you how to use it.
Feel free to try it out and let me know if there's anything more you'd like to see from it.
Hey, sorry for the lag. Will definitely try this out. Probably don't want to introduce a plugin in the most basic training module but this might actually slot nicely into a follow-up. As a way to introduce plugins with a basic use case that builds on an earlier training.
Agreed. Do let me know if it al least satisfies your use case so that can we works towards adding it as a core feature
Hello someone could help me with that WARN: Input directory 'Control_R1' was found, but sample sheet '/mnt/c/Users/juanj/Desktop/maestria ric/Sample_sheet.csv' has no such entry.
My sample sheet is> barcode,alias,imput,condition barcode01,Control_R1,Control_R1,control
New feature
We have had the
splitCSV
operator for a long time and it's commonly used. @vdauwera was asking me how to do the reverse - write to a CSV file - and I found it surprisingly difficult to find a clean syntax.Usage scenario
The example that @vdauwera was a process with the following output:
Going into another process which collects those outputs and needs a CSV sample sheet file.
The closes I could find was this CSV writing implementation in nf-core/fetchngs:
Only four lines of code, but has to happen within a dedicated process and it's kind of ugly 👀 It'd be nicer to be able to do this with a channel operator on the way into a process, like with
collectFile
.Suggest implementation
Ideally could extend
collectFile
to be able to handle tuples like this with control of separators etc. Alternatively a new core Nextflow function to do this (toCSV
?), or if "there's an easy way to do this in Groovy" then a new pattern to point people to in the Nextflow Patterns docs could also work.