Open SHuang-Broad opened 5 years ago
Tree structure of designated working directory, and directory creations.
$ tree -d workdir/
workdir/
├── canu-logs
├── canu-scripts
└── haplotype
└── 0-kmers
├── reads-Father
└── reads-Mother
6 directories
workdir
is the directory designated by the user, and created by canu.pl
very early in the pipeline (before configuration).
canu-logs
and canu-scripts
are setup up in canu.pl
as well, after all configurations have happened, and right before triobinning stage, i.e. much later compared to workdir
. User doesn't have control over this (and this shouldn't matter much).
haplotype
this directory is created together with its sub-directory 0-kmers
in one go by canu::HaplotypeReads::haplotypeCountConfigure
. Both names are hard coded there.
reads-Father
and reads-Mother
under 0-kmers
are generated by canu::HaplotypeReads::haplotypeSplitReads
for storing re-partitioned parental reads.
The "Father" and "Mother" strings were provided by user via -haplotypeFather
and -haplotypeMother
.
There will be an empty "reads-$hap.success" file after the repartition is finished successfully, and this is the file for pipeline to check if it needs to be done before next stages can be executed.
meryl-count.sh
, meryl-merge.sh
, meryl-subtract.sh
reside at the same level as reads-Father
and reads-Mother
. There's also a meryl-count.memory
that stores the memory used for all three shell scripts. Each of the three will generate its corresponding empty ".success" file to indicate that the script was executed successfully.
Note that each shell script also generate "meryl-$task.$jobid.out" under haplotype/0-kmers
hold stdout and stderr. For count
, the jobid will be batch ids; for merge
and subtract
, there should be practically two, 1 and 2, for father and mother.
canu::Defaults::addSequenceFile
Since practically reads will not be stored in a single file (FASTQ), this subroutine practically takes
then returns the pasted "dir/file" string.
The calling function takes the responsibility to iterate over the FASTQs in the dir.
This is what happened in canu.wdl
in the block while (scalar(@ARGV))
, in two places:
$haplotypeReads{$hapname}
dictionary/hashThis calling function then parses the returned file name string into a command line argument via addCommandLineOption()
.
These options are then parsed via canu::Defaults::setParametersFromCommandLine
, -> canu::Defaults::setGlobal
-> canu::Defaults::setGlobalSpecialization
for actually setting the appropriate parameters (note that currently none seem to affect the read haplotyping stage).
canu::Defaults::setDefaults
A "global" Hash/Dictionary is present in this module named %global
.
This function sets many parameters (keys of the hash/dictionary) of this %global
to their "default" values.
Of the parameters, several are important for us to understand:
canuIteration
: [0] can allows multiple attempts for each stage, this tells which attempt is currently being made; reset after each stage is successfully finished.canuIterationMax
: [2] max number of attempts allowed.shell
: which shell to use for a shell script that actually submits a job (related to subroutines defined in canu::Execution
.saveReadHaplotypes
: [0] see this ticketsaveMerCounts
: [0] nsee hereonSuccess
& onFailure
: [undef
] execute user specified command on success/failure. Documented in canu doc.And the following
#%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%#
##### Mers
#%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%#
setDefault("merylMemory", undef, "Amount of memory, in gigabytes, to use for mer counting");
setDefault("merylThreads", undef, "Number of threads to use for mer counting");
setDefault("merylConcurrency", undef, "Unused, there is only one process");
#%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%#
##### Haplotyping
#%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%#
setDefault("hapUnknownFraction", 0.05, "Fraction of allowed unknown bases before they are included in the assembly, between 0-1; default 0.05");
setDefault("hapMemory", undef, "Amount of memory, in gigabytes, to use for haplotype assignment");
setDefault("hapThreads", undef, "Number of threads to use for haplotype assignment");
setDefault("hapConcurrency", undef, "Unused, there is only one process");
Resources for meryl and actual child read assignment is set via (practically setting their resources to undef
)
setExecDefaults("meryl", "mer counting");
setExecDefaults("hap", "haplotype assignment");
canu::Defaults::checkParameters
The most relevant to us, is the if-else
block
if (defined(getGlobal("stopAfter"))) {}
which checks that the specified "stopping" stage is an accepted value. Acceptable values are listed here.
The assumed flow of working in this module is that tasks (shell scripts) are first "configured" by a configure subroutine, then "checked" (that is check if the tasks have finished successfully or else run them).
All $path
below is hard-coded as haplotype/0-kmers
.
canu::HaplotypeReads::haplotypeCountConfigure
This is the configuration step where the following happen in order
meryl-count.sh
meryl-count.sh
, meryl-merge.sh
, meryl-subtract.sh
Step-by-step, it does
erate
set to 0.001? How is it related to sequencing tech, heterozygosity?)$path/reads-$hap/reads-$hap-[0-9]{3,3}.fasta.gz
(where $hap
practically takes value of either "father" or "mother") in numerical order; stores file paths under each $hap
of a Hash, i.e. a Hash with $hap
as key and parental reads as valuesmeryl.memory
file which is a 1-line file holding memory allocated to all meryl steps, in GB$path/meryl-count.sh
, which when executed locally, takes one argument that is the batch index (1-based); $path/meryl-merge.sh
:$path/meryl-subtract.sh
stopAfter("meryl-configure")
;There are several points that are worth keeping in mind:
%global{merylThreads}
%global{merylMemory}
has no effect here, in fact the computed memory later overwrites %global{merylMemory}
haplotype*Check
steps via a call to Execution::submitOrRunParallelJob
, which essentially runs the script like $path/meryl-count.sh $batch_idx > $path/$meryl-count.$batch_idx.out 2>&1
canu::HaplotypeReads::haplotypeCountCheck
This subroutines assumes that two files $path/meryl-count.sh
and $path/meryl-count.memory
exit.
It then scans the shell script to determine how many jobs in total are supposed to run, and how many has succeeded/failed.
For successful batches, it needs to see $path/reads-$hap-$batch_idx.meryl(.tar.gz?)
exsits.
It then retries the failed ones (or ones yet to be run) via Execution::submitOrRunParallelJob
.
This way of "checking" as execution is possible because the check subroutines (including the next three) are evoked upto 1 + $maxAttempt
times in the main perl script canu.pl
, ensuring enough efforts have been made.
If all goes well,
$path/meryl-count.success
is generatedresetIteration("haplotype-merylCountCheck")
stopAfter("meryl-count")
canu::HaplotypeReads::haplotypeMergeCheck
This script is generated similarly to CountCheck. The differences in the shell scripts are
haplotypeCountConfigure
and meta-generated in the shell script$path/reads-$hap-$batch_idx.meryl(.tar.gz?)
that are actually output of the count scripts; the merged results are $path/reads-$hap.meryl(.tar.gz)?
(whose presence, apprently, will be used as a check for which haplotype has been successfully merged$path/reads-$hap.statisics
The module then
$path/meryl-merge.success
resetIteration("haplotype-merylMergeCheck")
stopAfter("meryl-merge")
canu::HaplotypeReads::haplotypeSubtractCheck
Structured exactly the same as merge check.
Outputs are two directories $path/haplotype-$hap.meryl/
.
It also cleans up (i.e. removes) the output of merge ($path/reads-$hap.meryl(.tar.gz)?
) unless users specifically turn that off by setting saveMerCounts
.
canu::HaplotypeReads::haplotypeReadsConfigure
The assumption is that if haplotype/splitHaplotype.sh
exists, the job has been done.
Otherwise the shell script is generated.
Note that a key difference from the configure subroutine above is how threads and memory are set. They are set by reading "hapThreads"
and "hapMemory"
, i.e. unless they are specifically overriden by users, they will take the "optimize" value computed in canu::Configure::getAllowedResources
.
It finally calls
resetIteration("haplotypeReadsConfigure")
stopAfter("haplotype-configure");
canu::HaplotypeReads::haplotypeReadsCheck
First checks for haplotype/haplotyping.success
for previous successful runs to avoid wasting efforts (hence one can image this "check" is responsible for generating that success file).
Note that
haplotype/haplotype.log
as logging, instead of those "out" files as in the meryl steps, although the script is still handed to Execution::SubmitOrRunParallelJob
, with effectively an "out" file generated as well. The thing here is that the shell script redirects its stdout to haplotype/haplotype.log
, therefore the "out" is probably only going to capture non-essential stdouts, and all stderrs.It finally calls
resetIteration("haplotype-reads")
stopAfter("haplotype");
canu::Configure::configureAssembler
Actually configuring the whole pipeline, though allowing later customization by the stages themselves. It
based on genome size estimate provided by user, adjusts some parameters & threads/memory for all stages. Two statements of interest to us
# for human
setGlobalIfUndef("merylMemory", "24-64"); setGlobalIfUndef("merylThreads", "1-8");
setGlobalIfUndef("hapMemory", "8-16"); setGlobalIfUndef("hapThreads", "16-64");
gets allowed resources for each algo. of each stage via
($err, $all) = getAllowedResources("", "meryl", $err, $all, 0);
($err, $all) = getAllowedResources("", "hap", $err, $all, 0);
One particular point to note here is that this subroutine can implement a check on stopAfter
that will stop the resource configuration process at the user-specified stage.
The solution might be not pretty, and so we need to issue some type of warning.
This is raised by this ticket.
canu::Configure::getAllowedResources
It takes 5 + optional parameters (optional for debugging). The first 5 are:
It first detects, from the %global
Hash in canu::Defaults
It then moves on to its main task: picking an "optimal" thread/memory configuration based on
If an optimal solution can not be found, the program errors and exits. Otherwise, stage-algo-specific thread/memory is reset to the picked optimal solution.
If the workflow is to be executed in local mode, it then also sets the concurrency to the minimum of (threads/taskThread, memory/taskMemory).
Finally it reports in a somewhat prettified format.
Questions remain
canu::Configure::expandRange
No idea how it works exactly, but seemingly not that important at this moment.
The following three usually are called in order together by clients (e.g. those "Configure" subroutines in HaplotypeReads.pm
) to meta-generate shell scripts:
canu::Execution::getBinDirectoryShellCode
emits a block of shell code to locate binaries during shell scripts execution.
canu::Execution::setWorkDirectoryShellCode
given a path, inject some more shell code to the shell script with platform specific, or objectStore specific code. In other words, currently if the computation is not using objectStore or grid or DNANexus, this code has no effect. This usually happens immediately after calls to getBinDirectoryShellCode
.
canu::Execution::getJobIDShellCode
injects even more shell code to the shell script. The code generated here is for the sole purpose of parallarization: the shell code computes which "batch" of the parallelized batches to run, depending on if jobs are to be run on a grid (controlled by the grid manager) or locally (batch id provided via cmd line--though also automatically generated).
canu::Execute::submitOrRunParallelJob
This subroutine takes 4 arguments (plus optionals):
In practice, the optional arguments provided is always the IDs of jobs (not PIDs) to be executed (including those previously failed ones).
It first checks available resources and see if everything can be handled by executiveMemory/executiveThreads (not exactly sure which scenario would trigger this).
Then it makes sure the current attempt is not exceeding the user-set tolerance, and advance that index.
Depending on if the workflow is to be run on grid or local, it submits the jobs to the available hosts/machines.
In particular, for locally executing workflows, it uses the provided job list to build up the queue of jobs to be executed via looped calls to schedulerSubmit
(given a cmd—shell script and cmd line args—to run, push it to the process queue waiting to be cleared).
Second to last, a final round of calculation of limits on concurrency is sent to schedulerSetNumberOfProcesses
, which simply set the number of currently-running tasks to the provided value; and once that is set, it stays put.
schedulerFinish
is finally called for actually execution.
What schedulerFinish
does is that it iterates through a queue of tasks it received from schedulerSetNumberOfProcesses
, and calls on to schedulerRun
for moving tasks from this large queue to a smaller queue whose size is controlled by the concurrency limit set in schedulerSetNumberOfProcesses
.
The implementation in this function is kind of strange (please see this ticket).
schedulerRun
THIS ONLY MOVES TASKS FROM A LARGE QUEUE TO A SMALL QUEUE AND LAUCH JOBS IN THE SMALL QUEUE BUT DOES NOT WAIT
schedulerForkProcess
vanila forking
schedulerReapProcess
simply reap processes finished successfully (note that this can be absorbed into schedulerRun
, and we might end up doing just that)
The following subroutines or blocks should be understood first before meaningful changes to the code can happen.
canu.pl
Relevant blocks:
while (scalar(@ARGV))
: parses cmd line options; see comment block forcanu::Defaults::addSequenceFile
for relevant readingif ((scalar(@haplotypes) > 0) && (setOptions($mode, "haplotype") eq "haplotype"))
: the short block that actually calls out tocanu/HaplotypeReads
for trio binning.canu/Defaults.pm
addSequenceFile
: parsing path of parental and child reads, DOES NOT actually load readsgetPhysicalMemorySize
&getNumberOfCPUs
: literally detects the number of CPUs and memory available on the machine; they are only non-trivially used inConfigure::getAllowedResources()
setDefaults
: see doc forDefaults.pm
belowsetExecDefaults($$)
: sets Memory, Threads, StageSpace and Concurrency of: 1) a specific stage with 2) description toundef
(yes, all four).checkParameters
: makes sure options and parameters make sensecanu/Configure.pm
configureAssembler
: see docs forConfigure.pm
belowgetAllowedResources
: see docs forConfigure.pm
belowfindGridMaxMemoryAndThreads
andexpandRange
: see docs forConfigure.pm
belowcanu/Execution.pm
submitScript
: returns without doing anything if no grid is detected incanu::Defaults
, which is our intended use case.getBinDirectoryShellCode
,setWorkDirectoryShellCode
, &getJobIDShellCode
stopAfter
: checks that given "stop after" stage is requested by user, and exit the program.submitOrRunParallelJob
: see docs forExecution.pm
belowcanu/HaplotypeReads.pm
haplotypeReadsExist
haplotypeSplitReads
haplotypeCountConfigure
haplotypeCountCheck
haplotypeMergeCheck
haplotypeSubtractCheck
haplotypeReadsConfigure
haplotypeReadsCheck
Logical steps in
canu.pl
options parsing, parameters setting, resources detection
minio IO prep
reads haplotyping