Closed micans closed 6 years ago
Could you include an example of ch_sample
and the resulting ch_subsample
for either snippets ?
An example of ch_sample
fits, in our code (https://github.com/cellgeni/rnaseq-noqc/blob/scratchfiles/main.nf), in this process, where cram_files
would be ch_sample
:
process irods {
tag "${sample}"
maxForks 29
input:
val sample from sample_list.flatMap{ it.readLines() }
output:
set val(sample), file('*.cram') optional true into cram_files
script:
"""
irods.sh ${sample}
"""
}
This process creates a variable number of cram files coming from an IRODS server. These are merged in the next process crams_to_fastq
using samtools sort
and samtools merge
. Rather than merging I would want to (sort or collate first and then) align the individual cram files independently by the aligner, and then merge the resulting bam
files. I include a self-contained toy example below and list the correspondences here. The genesis
process is just to have some input files. The sample_split
is the equivalent of the irods.sh
shell command above (generating a variable number of files). The process sample_subparallel
is the equivalent of running an aligner on a subsample (cram) file. Then using groupTule()
the channel ch_subsample_result
is re-grouped into the channel ch_subsample_join
; the consumer of that channel could e.g. merge aligned bam
files.
// multiple samples.
process genesis {
output:
file '*.txt' into ch_genesis
script:
'''
echo amazingly few discotheques > sample1.txt
echo a quart jar of oil > sample2.txt
echo about sixty codfish eggs > sample3.txt
'''
}
// each sample generates multiple files
process sample_split {
input:
file x from ch_genesis.flatten()
output:
set val(samplename), file('*.[ABCDEF].txt') into ch_sample
// Below mimicks one sample encoded in multiple cram files
script:
samplename = x.toString() - ~/.txt$/
"""
letters=( _ A B C D E F )
myrandval=\$(seq 2 1 5 | shuf | head -n 1) # create variable number of files.
for i in \$(seq 1 1 \$myrandval); do
(echo \$i; cat $x) > ${samplename}.\${letters[\$i]}.txt
done
"""
}
// track each sub-sample file with the sample ID, example provided by Luke Goodsell.
ch_sample
.flatMap { item ->
samplename = item[0];
files = item[1];
files.collect { onefile -> return [ samplename, files.size(), onefile ] }
}
.set { ch_subsample }
// Process the sub-sample file.
process sample_subparallel {
tag "${samplename}-${x}"
input:
set val(samplename), val(nfiles), file(x) from ch_subsample
output:
set val(samplename), file('*.pal') into ch_subsample_result
script:
"""
(echo "Done and dusted"; cat $x) > out.${samplename}.${x}.pal
"""
}
ch_subsample_result.map { mytuple ->
def key = mytuple[0]
def myfiles = mytuple[1]
return tuple(key.toString(), myfiles) }
.groupTuple()
.set { ch_subsample_join }
// Merge the results back to the sample level
process sample_reconstitute {
publishDir "results", mode: 'copy'
input:
set val(samplename), file(x) from ch_subsample_join
output:
file('*.concat')
// Use ls as we need different batches sorted in the same lexicographic way.
script:
"""
cat \$(ls $x) > out.${samplename}.concat
"""
}
I was meaning an example of the data structure. How is the structure of an element emitted by ch_sample
?
This feature request is to remove the groupTuple()
wait where a process would benefit from sub-parallelisation and the size of an emitted tuple is known at emit stage. The bottom part of the diagram (NEW
) is currently possible but will require the align
stage to finish entirely before the merge
stage can begin.
I see, but could your provide an example of the data structure ? :D
ch_sample
emits a tuple. The first item is used as the key to group over. The rest is up to the consumer of the channel to use. Does that answer the question!?
I would you provide an example, like here, either of source channel and what you are expecting to achieve with the proposed operator. Does it make sense?
It makes sense. Perhaps what I want is contrary to the channel philosophy. This is what I want:
Channel
.from([ 'A', ['foo', 'bar', 'zut']], ['B', ['tim', 'flu', 'obo', 'axolotl']])
.splitTuple()
.subscribe { println it }
will result in
['A', 'foo']
['A', 'bar']
['A', 'zut']
['B', 'tim']
...
But the important part is that global state is tracked; something needs to be aware globally that there are now three tuples indexed 'A'; the contract would be that no other process or process instance will generate more tuples indexed 'A'. There would be a global dictionary now tracking 'A' => 3 (the number of tuples indexed 'A'). Then later a call to joinTuple()
(could be a new parameter to groupTuple()
) enables NF to start outputting the tuple indexed 'A' once it has all three of them.
Want you want is transpose, eg
Channel
.from([ 'A', ['foo', 'bar', 'zut']], ['B', ['tim', 'flu', 'obo', 'axolotl']])
.transpose()
.println()
But the important part is that global state is tracked. etc
Still confused here. You need to thunk in terms of transformation. What's the ultimate channel structure you need ?
The thing that I want has nothing to do with the data structure; it is that NF can start with downstream processes without waiting for all the tuples to be ready, so to get rid of the pink synchronisation line in the diagram. I understand (from memory; not sure whether it is discussion or documentation) that if you use groupTuple()
(say after the align
process in the drawn diagram above), then NF will have to wait for all align processes to finish before it knows what the tuples will be (so there is a global synchronisation step before the pink line). If three 'A' tuples have become available (three aligned subsamples), I'd like them to be pushed into the merge
stage, without having to wait for all the B
tuples et cetera. In our case we have often thousands of samples and the pipeline would stall if it has to wait for all align processes to finish before the pink line. Unless I misunderstand something completely.
To approach this from the other side: Is the pink line in the diagram above correct? If I use groupTuple() for that merge, does NF wait for all the align processes to finish first?
I have a very limited brain power, I need to see a practice example how you are using/want to use groupTuple
.
I've understood the waiting problem, but you need provide an example clarifying how that information can be inferred in your proposal.
The problem may well be that I am proposing something completely against the NF philosophy; but I don't know. There is a chance it's not. I really hope I'm not wasting your time -- thanks a lot for persisting with me.
How that information can be inferred: In my proposal that would happen at the splitTuple()
step above. It sees something like ['A', ['foo', 'bar', 'zut']]
, and at that stage it will know (by contract) that these are the only three A
subsamples it will ever see. My assumption is that there is a single central controlling task processing that splitTuple()
invocation, I would assume that to be the NF
main task. In our case A
and B
are sample names, so the contract is naturally satisfied and this situation would come up a lot. Later joinTuple()
would again go through the same controlling task that knows how many A
tuples there are, so once the A
tuple (or any other tuple) is filled it can release it.
If this this does not make sense, then at least one of my assumptions must be wrong. For example my assumption about the single controlling task. Or I'm making an assumption that is not explicit in the above. My completely naive idea is that NF will have a dictionary of key => { N, tuple() }
to keep track of this. splitTuple()
would set N
(tied to key). joinTuple()
would append to tuple()
(tied to key). Once tuple().size
reaches N
the tuple can be released. There is an issue whether there is only one such dictionary or multiple; perhaps that is a crucial issue.
This problem is quite tricky. However we are working with @emi80 to provide a way to handle it.
That's awesome. Imagine mountains with beerfalls.
I've just uploaded a possibile solution. The idea is to use a builtin function named groupKey
to define the group key and include in the key the expected group size. For example:
Channel
.from([ 'A', ['foo', 'bar', 'zut']], ['B', ['tim', 'flu', 'obo', 'axolotl']])
.map { key, files -> tuple( groupKey(key, files.size()), files ) }
.set { ch_sample }
Then you will be able to use ch_sample
as before, however when the groupTuple
is going to be applied on the result channel, it will be able to infer the size info associated with the key, therefore without waiting the channel completion.
You can give a try using the following command:
NXF_VER=0.32.0-SNAPSHOT nextflow run .. etc
Hello, I have used this in production now. We have a workflow roughly described as samplename -> multipe-cram files -> merge
. Previously the (unpredictable amount of) multiple cram files were obtained in a single process from IRODS storage. I have split this now using the groupKey()
feature (see below). The pipeline seems to run a lot faster on LSF with lustre
file system. I do not completely understand this except that smaller and shorter jobs are generally preferable. There is a lof IO involved (20 samples, 50G each). The speed-up may be partly IRODS specific or even August-specific, but the feature works and our pipeline is faster, so I really hope this will make it into NF. I have a second idea in the same pipeline where I will actually aim to have two nested levels of groupKey()
. The first level is a grouping of samples (20 samples per group) for which the cram files will be tarred up as a group, the second level is as above. Is this compatible with groupKey()
?. Our code is here: https://github.com/cellgeni/guitar/releases/tag/state-of-the-onion (not pretty code, in terms of conditional definitions of processes). This is the excerpt where I use groupKey()
.
process from_sample_lines {
tag "${sample}"
publishDir "${params.outdir}/cramlists"
input:
val sample from ch_samplelines
output:
set val(sample), file('*.igetlist.txt') optional true into ch_iget_file
script:
"""
irods.sh -s ${sample} -t ${params.studyid} -l "${params.librarytype}" -q ${params.manualqc} -D > ${sample}.igetlist.txt
"""
}
ch_iget_file
.map { tag, file -> [tag, file.readLines() ] }
.map { tag, lines -> tuple( groupKey(tag, lines.size()), lines ) }
.transpose()
.set { ch_iget_item }
process from_iget_item {
tag "${samplename}-${igetitem}"
maxForks 30
input:
set val(samplename), val(igetitem) from ch_iget_item
output:
set val(samplename), file('*.cram') optional true into ch_iget_merge
script:
"""
iget -K ${igetitem}
"""
}
ch_iget_merge
.groupTuple()
.set { ch_cram_set }
process merge_from_cram_set {
tag "${sample}"
input:
set val(sample), file(crams) from ch_cram_set
output:
file "${sample}.cram" into ch_sample_cram_file
script:
cram0 = crams[0]
"""
num=\$(for f in ${crams}; do echo \$f; done | grep -c ".cram\$");
if [[ \$num == 1 ]]; then
ln -s "${cram0}" ${sample}.cram
else
if ! samtools cat -o ${sample}.cram ${crams}; then
samtools merge -@ ${task.cpus} -f ${sample}.cram ${crams}
fi
fi
"""
}
I had to kill a pipeline instance, and got these java errors upon resumption in .nextflow.log
. I can give more information if needed.
Aug-14 15:06:26.687 [Task submitter] DEBUG nextflow.executor.GridTaskHandler - [LSF] submitted process from_iget_item (QC1Hip-11697-/seq/19888/19888_5#9_yhuman.cram) >
jobId: 5967292; workDir: /lustre/scratch117/cellgen/cellgeni/tic-74/work/e8/269131118b1b5b0fe209607c49bf0f
Aug-14 15:06:26.688 [Task submitter] INFO nextflow.Session - [e8/269131] Submitted process > from_iget_item (QC1Hip-11697-/seq/19888/19888_5#9_yhuman.cram)
Aug-14 15:06:31.303 [Task monitor] DEBUG n.processor.TaskPollingMonitor - Task completed > TaskHandler[jobId: 5967230; id: 48; name: from_iget_item (QC1Hip-11275-/seq/20375/20375_2#10_yhuman.cram); status: COMPLETED; exit: 0; error: -; workDir: /lustre/scratch117/cellgen/cellgeni/tic-74/work/a1/a6f475722ad11d0a0b7ff0a27dd113 started: 1534255556287; exited: 2018-08-14T14:06:29Z; ]
Aug-14 15:06:31.309 [Actor Thread 39] DEBUG n.extension.GroupTupleOp$KeyWrap - Dyanmic GroupTuple key=QC1Hip-11275 size=38
Aug-14 15:06:31.310 [Actor Thread 35] DEBUG nextflow.util.CacheHelper - [WARN] Unknown hashing type: class nextflow.util.GroupKey -- QC1Hip-11697
Aug-14 15:06:31.314 [Actor Thread 35] WARN nextflow.processor.TaskProcessor - [from_iget_item (QC1Hip-11697-/seq/19888/19888_6#9.cram)] Unable to resume cached task -- See log file for details
com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): nextflow.util.GroupKey
at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
at nextflow.util.InstantiationStrategy.newInstantiatorOf(SerializationHelper.groovy:233)
at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at com.esotericsoftware.kryo.Kryo$readClassAndObject$36.call(Unknown Source)
at nextflow.util.KryoHelper.deserialize(SerializationHelper.groovy:185)
at nextflow.util.KryoHelper.deserialize(SerializationHelper.groovy)
at nextflow.util.KryoHelper$deserialize$9.call(Unknown Source)
at nextflow.processor.TaskContext.deserialize(TaskContext.groovy:191)
at nextflow.CacheDB.getTaskEntry(CacheDB.groovy:166)
at nextflow.CacheDB$getTaskEntry$5.call(Unknown Source)
at nextflow.processor.TaskProcessor.checkCachedOutput(TaskProcessor.groovy:948)
at nextflow.processor.TaskProcessor$checkCachedOutput$32.callCurrent(Unknown Source)
at nextflow.processor.TaskProcessor.checkCachedOrLaunchTask(TaskProcessor.groovy:833)
at sun.reflect.GeneratedMethodAccessor284.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.groovy.runtime.callsite.PogoMetaMethodSite$PogoCachedMethodSiteNoUnwrapNoCoerce.invoke(PogoMetaMethodSite.java:210)
at org.codehaus.groovy.runtime.callsite.PogoMetaMethodSite.callCurrent(PogoMetaMethodSite.java:59)
at org.codehaus.groovy.runtime.callsite.AbstractCallSite.callCurrent(AbstractCallSite.java:185)
at nextflow.processor.TaskProcessor.invokeTask(TaskProcessor.groovy:674)
at sun.reflect.GeneratedMethodAccessor33.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.groovy.runtime.callsite.PogoMetaMethodSite$PogoCachedMethodSite.invoke(PogoMetaMethodSite.java:169)
at org.codehaus.groovy.runtime.callsite.PogoMetaMethodSite.call(PogoMetaMethodSite.java:71)
at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:128)
at nextflow.processor.TaskProcessor$InvokeTaskAdapter.call(TaskProcessor.groovy:225)
at groovyx.gpars.dataflow.operator.DataflowOperatorActor.startTask(DataflowOperatorActor.java:120)
at groovyx.gpars.dataflow.operator.ForkingDataflowOperatorActor.access$001(ForkingDataflowOperatorActor.java:35)
at groovyx.gpars.dataflow.operator.ForkingDataflowOperatorActor$1.run(ForkingDataflowOperatorActor.java:58)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
I see, this stack trace is useful. Thanks for reporting it.
Solved the error report above, cleaned up and merged in the master branch. I'm planning to include it with version 0.32.0
. If you to try it, refresh your snapshot with the following command:
NXF_VER=0.32.0-SNAPSHOT CAPSULE_RESET=1 nextflow info
it should print:
N E X T F L O W
version 0.32.0-SNAPSHOT build 4888
Then use NF as shown below:
NXF_VER=0.32.0-SNAPSHOT nextflow run .. etc
Reminder to myself: update the documentation with an example.
Hello, I am facing a similar problem. I am importing a variable number of fastq sequences from multiple samples (the number of sequences can vary from 1 to 6). I am performing alingment separately for each sequence and want to merge the resulting bam files for each sample (I am aware of the possibility of merging the fastq files first but don't want to lose QC data for each alingment). However, the groupTuple() operator is waiting on all the alignments to finish before it starts emitting the resulting groups of bam files further, which is unfortunate. I tried to apply the groupKey(str, int) method on process output which however resulted in the following error:
ERROR ~ Unexpected error [ClosedByInterruptException]
-- Check '.nextflow.log' file for details
ERROR ~ No signature of method: static nextflow.Nextflow.groupKey() is applicable for argument types: (org.codehaus.groovy.runtime.GStringImpl, org.codehaus.groovy.runtime.GStringImpl) values: [<org.codehaus.groovy.runtime.GStringImpl@????>, 1]
Possible solutions: groupKey(java.lang.Object, int)
I guess there is a problem with type of values emitted from the bwa_aling process. Is there any way to implement the groupKey() method on the process output directly in the output: declaration? I am using NextFlow ver 19.04.1 on a HPC via slurm Here is my code:
process bwa_align {
cpus 10
memory '62 GB'
errorStrategy 'retry'
maxErrors 5
echo true
scratch true
stageInMode 'copy'
stageOutMode 'copy'
input:
set val(id), val(group), file(r1), file(r2), val(len) from fastq // len is the number of fastq sequnces for sample
output:
set groupKey("${id.toString()}", "${len.toInteger()}"), group, file("${r1}_part.bam"), file("${r1}_part.bam.bai") into part_bam
when:
!params.shardbwa
"""
pwd
echo "Performing alingment on sample $id reads $r1 and $r2, lenght $len"
RG_ID="\$(echo ${r1.target} | cut -d '/' -f 9)_\$(echo ${r1.target} | cut -d '/' -f 10 | cut -d '_' -f 3,4)_$id"
echo "RG_ID: \$RG_ID"
sentieon bwa mem \\
-M \\
-R "@RG\\tID:\$RG_ID\\tSM:${id}\\tPL:ILLUMINA" \\
-t 14 \\
$genome_file $r1 $r2 2>"${OUTDIR}/log/bwa/\$RG_ID.log" | \\
sentieon util sort \\
-r $genome_file \\
-o ${r1}_part.bam \\
-t ${task.cpus} \\
--sam2bam \\
-i - \\
2>"${OUTDIR}/log/bwa/\$RG_ID.sort.log"
"""
}
part_bam.groupTuple().into{ grouped_bam; grouped_bam_qc}
process merge_bam {
cpus 20
memory '120 GB'
errorStrategy 'retry'
maxErrors 5
scratch true
echo true
input:
set id, group, group_bam, group_bai from grouped_bam
output:
set id, file("${id}_merged.bam"), file("${id}_merged.bam.bai") into bam1, bam2
"""
echo "Merging sample $id, group ${group[0]} bam files: ${group_bam.join(", ")}"
echo "Corresponding bai files: ${group_bai.join(", ")}"
sentieon driver \\
-t 20 \\
-r $genome_file \\
-i ${group_bam.join(" -i ")} \\
--algo ReadWriter ${id}_merged.bam
"""
}
I tried output: groupKey(id, len), .....
and output: groupKey(id.toString(), lentoInteger())
and got ERROR ~ No such variable: id
I guess I will declare output as output: id, len, group, bam, bai
and use .map {} to apply groupKey() for later groupTuple() but was wondering if there is a viable way to use the groupKey() directly in the process output to avoid this extra step. Thank you.
Suggestion:
A way to split, parallelise and combine from samples to sub-samples and back with NF being aware of sub-sample set sizes, to prevent waiting during the combine stage (which will be groupTuple like).
Use case:
Our sample data often comes in multiple cram files. Merging these cram files and sorting them with samtools (in case we need the paired end reads in FASTQ format) takes a large amount of memory and time, and results in large files (up to 50G seen so far).
Both samtools and downstream STAR alignment of these files place require huge resources (RAM and cpus) and often require too much time on our regular queue. Their outlier status causes issues in our pipeline predictability, and the resource/retry regime is very difficult to parameterise.
Ideally we want to process these multiple cram files (call them sub-samples), independently. This is currently possible using flatMap(), then later groupTuple(). However, NF will wait at the groupTuple channel until all incoming process have finished. We cannot use the groupTuple
size:
parameter as the number of sub-samples is variable.The current step from sample to subsamples may look like this:
I propose something like
where the idea is that emitFlatTuples keeps track of the tuples it emits, by default using the first element as key. In this case, samplename would be tracked so that NF knows how many tuples
[ samplename, onefile ]
exist for each samplename. Perhaps groupTuple could have an argument such asby_emit_key
. All this requires global state.