fulcrumgenomics / dagr

A scala based DSL and framework for writing and executing bioinformatics pipelines as Directed Acyclic GRaphs
MIT License
69 stars 14 forks source link

Build Status Coverage Status Maven Central License Language

Dagr

A task and pipeline execution system for directed acyclic graphs to support scientific, and more specifically, genomic analysis workflows. We are currently in alpha development; please see the Roadmap. The latest API documentation can be found here.

Goals

There are many toolkits available for creating and executing pipelines of dependent jobs; dagr does not aim to be all things to all people but to make certain types of pipelines easier and more pleasurable to write. It is specifically focused on:

It is a tool for working data scientists, programmers and bioinformaticians.

Building

DAGR uses sbt. Installation instructions are available here.

To build an executable jar run: sbt assembly. Tests may be run with: sbt test.

Command line

DAGR is run with: java -jar target/scala-2.13/dagr-1.0.0-SNAPSHOT.jar Running the above with no options will produce the usage documentation.

Include dagr in your project

You can include the three sub-projects that make up dagr using:

libraryDependencies += "com.fulcrumgenomics" %%  "dagr-core" % "1.0.0"
libraryDependencies += "com.fulcrumgenomics" %%  "dagr-tasks" % "1.0.0"
libraryDependencies += "com.fulcrumgenomics" %%  "dagr-pipelines" % "1.0.0"

Or you can depend on the following which will pull in the three dependencies above:

libraryDependencies += "com.fulcrumgenomics" %% "dagr" % "1.0.0",

Authors

License

dagr is open source software released under the MIT License.

Using DAGR

The following sections contain an overview of key features and principles behind DAGR that will be useful for anyone working with DAGR pipelines.

Understanding DAGR's pipeline model

The atom in DAGR's model is called a Task. There are many kinds of tasks, from ones that run a specific command line or execute a small piece of code, up to Pipelines that are tasks which manage the construction of chains of other tasks. The two main sub-types of Task are:

  1. Pipelines which chain together one or more other tasks (including other pipelines)
  2. UnitTasks which perform some task when executed. These can be tasks which run a command line inside a shell, or tasks that run some scala code within the JVM.

Pipelines in DAGR are written in scala, with the aid of an internal DSL for specifying and managing dependencies among other things.

DAGR pipelines are Directed Acyclic GRaphs (hence the name), meaning that a graph is contructed where the tasks are the nodes and the dependencies are the edges between the nodes. While a programming language might control flow with with conditional constructs (e.g. if statements), these don't fit into a graph model. In DAGR we allow some conditionality by defering construction of as much of the graph as possible until the last moment possible.

It's helpful to understand when code in Task and Pipeline classes is run:

Example Pipeline

The following is an example of a simple Example pipeline in dagr, minus import and package statements:

@clp(description="Example FASTQ to BAM pipeline.", group = classOf[Pipelines])
class ExamplePipeline
( @arg(flag="i", doc="Input FASTQ.")        val fastq: PathToFastq,
  @arg(flag="r", doc="Reference FASTA.")    val ref: PathToFasta,
  @arg(flag="t", doc="Target regions.")     val targets: Option[PathToIntervals] = None,
  @arg(flag="o", doc="Output directory.")   val out: DirPath,
  @arg(flag="p", doc="Output file prefix.") val prefix: String
) extends Pipeline(Some(out)) {

  override def build(): Unit = {
    val bam    = out.resolve(prefix + ".bam")
    val tmpBam = out.resolve(prefix + ".tmp.bam")
    val metricsPrefix: Some[DirPath] = Some(out.resolve(prefix))
    Files.createDirectories(out)

    val bwa   = new BwaMem(fastq=fastq, ref=ref)
    val sort  = new SortSam(in=Io.StdIn, out=tmpBam, sortOrder=SortOrder.coordinate)
    val mark  = new MarkDuplicates(in=tmpBam, out=Some(bam), prefix=metricsPrefix)
    val rmtmp = new DeleteBam(tmpBam)

    root ==> (bwa | sort) ==> mark ==> rmtmp
    targets.foreach(path => root ==> new CollectHsMetrics(in=bam, prefix=metricsPrefix, targets=path, ref=ref))
  }
}

The @clp and @arg annotations are required to expose this pipeline for execution via the command line interface. These annotations come from the sopt project which DAGR uses for command line parsing. For pipelines that do not need to be run via the command line (for example if they are only used as building blocks in other pipelines) they can be omitted.

Dependency Language

As can be seen in the example, DAGR uses a number of operators to wire together tasks in a pipeline. The following are all part of the dependency DSL:

It is worth noting that no harm is done by adding a dependency more than once. E.g. it is perfectly ok to write:

root ==> (a :: b :: c) ==> (d :: e)
root ==> (a :: c) ==> (e :: f) ==> g

Using Option

In the dependency DSL, anywhere you can use a Task you can also use an Option[Task]. For example:

...
val trim: Option[Int] = ...
val trimTask = trim.map(len => new TrimBam(in=bam, out=trimmedBam, length=len))
root ==> trimTask ==> nextTask
...

In the above example if trim is a Some(int) then we end up with a chain of root => TrimBam ==> nextTask. On the other hand if trim is None, then the dependency is automatically reduced to root ==> nextTask.

Examples

Example 1

root ==> a ==> (b :: c) ==> d

This example sets up the dependencies so that a will run immediately when the pipeline starts running, with b and c depending on a and finally with d depending on both b and c.

Example 2

val clipTasks = inputBams.map(b => new MarkIlluminaAdapters(in=b, out=Paths.get("trimmed." + b))
val merge = new MergeSamFiles(in=clipTasks.map(_.in), out=Paths.get("merged.bam"))
root ==> clippedTasks.reduce(_ :: _) ==> merge

Special Tasks & Classes

DAGR has a number of "special" tasks that are useful in their own right and also illustrate how to take advantage of the dynamic nature of DAGR's graph building.

Linker

The Linker object creates a mechanism to safely copy state from one task to another when the first task finishes running. This can be useful when one task computes a value that is not written to a file, that is needed by another task. The following is a toy example that demonstrates usage:

val input: PathToBam = ...
val counter = new SimpleInJvmTask {
  var count: Option[Int] = None
  def run(): Unit = { count = Some(SamSource(input).iterator.size) }
}
val downsample = new DownsampleSam(in=input, out=Paths.get("downsampled.bam"), target=1e6.toInt)
Linker(counter, downsample, (c, d) => d.inputReads = c.count)

The example shows one task that counts the reads in a BAM file and exposes the result in a var on the task when it's done running. The count is then needed by the downsample task in order to determine how best to reach the target number of reads. The Linker takes in the two tasks and a function that is to be run when the first task completes successfully, which transfers the count.

EitherTask

The EitherTask is similar to an Either in scala and other functional languages. It can be thought of as a decision node in the pipeline, which goes left or right depending on some value that isn't yet known. For example if a different duplicate marking algorithm should be used depending on whether the data has UMIs or not:

val umiCheck = new SimpleInJvmTask {
  var hasUmis: Boolean = false
  def run(): Unit = { hasUmis = SamSource(input).headOption.exists(_.get("RX").isDefined }
}
val deduper = Either.of(new MarkDuplicates(in=input, out=deduped), new SamBlaster(in=input, out=deduped), umiCheck.hasUmis)
root ==> umiCheck ==> deduper

Shell Piping

DAGR also has a built in DSL and facilities for wiring together tasks that should stream information using shell pipes and redirects. The piping is even typesafe to prevent, for example, accidentally piping SAM data into a process that expects FASTQ. To support piping tasks must implement one of:

For example the FastqToSam task is defined as follows:

class FastqToSam(fastq1: PathToFastq,
                 fastq2: Option[PathToFastq] = None,
                 out: PathToBam,
                 ...)
  extends PicardTask with Pipe[Fastq,SamOrBam]

To use piping you might write something like:

val unmappedBam  = output.resolve("unmapped.bam")
val fqToSam      = new FastqToSam(fastq1=fastq, out=Io.StdOut)
val markAdapters = new MarkIlluminaAdapters(in=Io.StdIn, out=Io.StdOut)
val makeUnmapped = fqToSam | markAdapters > unmappedBam
root ==> makeUnmapped

The following operators are supported for tasks that support piping:

Any types can be used when extending/implementing Pipe. A number of types common to bioinformatics are defined in the tasks package.

Note that because scala can mix-in traits at runtime, if you find a task you want to use piping with (and the tool supports reading from stdin or writing to stdout) but doesn't currently implement Pipe, you can add it on the fly. For exsample if you want to downsample a BAM only to generate a single set of metrics from it:

Seq(0.25, 0.5, 0.75).foreach { frac =>
  val ds = new DownsampleSam(in=input, out=Io.StdOut) with PipeOut[SamOrBam]
  val isize = new InsertSizeMetrics(in=Io.StdIn, out=Paths.get(s"isize.$frac")) with PipeIn[SamOrBam]
  root ==> (ds | isize)
}

Resource Management

DAGR schedules tasks for execution when all their dependencies have been met and when there are sufficient resources. Tasks in DAGR must therefore define the resources they need to execute. The resources managed by DAGR are cpu cores and memory.

Pipelines themselves do not consume resources, but the concrete tasks (UnitTasks) within pipelines do. Tasks manage their resources needs through a pair of functions:

def pickResources(availableResources: ResourceSet): Option[ResourceSet]
def applyResources(resources : ResourceSet): Unit

The first function is called by DAGR to inform the task of the available resources, and ask the task a) whether it can run with the available resources and b) what subset of those resources it would like. The second function is called by DAGR to inform the task of it's allocated resources immediately prior to execution. While these functions can be implemented for each task, they are usually implemented by mixing in one of the following traits:

For example a tool which can be multi-threaded but requires a fixed amount of memory per thread might extend VariableResources and define the following function to pick it's resources:

override def pickResources(resources: ResourceSet): Option[ResourceSet] = {
  val memPerThread = Memory("2G")
  val maxThreads = 48
  val cpus = Range.inclusive(start = 48, end = 2, step= -1)
  cpus.flatMap(c => resources.subset(Cores(c), Memory(c * memPerThread.value).headOption
}

What this does is attempt to allocate 48 cpu cores, then 47 etc. down to 2, with 2GB per core. The first combination that fits within the available resources (ResourceSet.subset() returns Option[ResourceSet], yielding the requested resources if they fit and None if they don't) is then returned. If no combination fits, None is returned and the task isn't scheduled yet.

Retry

DAGR has the ability to retry tasks that fail. The default behaviour is to try a task once and if it fails, it stays failed. This can be changed by mixing in the Retry trait to a task either when the task class is defined, or when the task is instantiated. The trait defines one function:

def retry(resources: SystemResources, taskInfo: TaskExecutionInfo): Boolean = false

If, when invoked, the function returns true then the task will be retried. Note that the task may modify it's internal state (arguments, resource requirements, etc.) when retry() is invoked to increase the chance of success.

A number of helper traits are available:

Scatter / Gather

DAGR includes a concise framework for performing scatter/gather operations, i.e. breaking an input into multiple partitions based on some criteria, processing each partition in parallel and then merging the results. A common example of this in variant calling where the genome is broken down into non-overlapping sets of regions which are called independently and then the resulting VCFs are merged to create a final VCF. DAGR's implementation supports the ability to chain together multiple tasks per partition before gathering.

Building a scatter/gather workflow is relatively simple and has few requirements beyond a normal workflow:

  1. A task that implements the Partitioner trait
  2. A task that can merge the results of the processing each partition

An example pipeline exists in DAGR that uses the GATK to genotype a sample using a chain of HaplotypeCaller, GenotypeGVCFs and FilterVcf on each partition. The full example can be found in the ParallelHaplotypeCaller workflow.

A simplified version follows:

import dagr.tasks.ScatterGather.Scatter
val scatterer = new SplitIntervalsForCallingRegions(ref=ref, intervals=intervals, output=Some(dir), maxBasesPerScatter=5e6.toInt)
val scatter = Scatter(scatterer)
val hc      = scatter.map(il => new HaplotypeCaller(ref=ref, intervals=Some(il), bam=input, vcf=PathUtil.replaceExtension(il, ".g.vcf.gz"))
val gt      = hc.map(h => GenotypeGvcfs(ref=ref, intervals=h.intervals, gvcf=h.vcf, vcf=replace(h.vcf, ".g.vcf.gz", ".vcf.gz")))
val filter  = gt.map(g => new FilterVcf(in=g.vcf, out=replace(g.vcf, ".vcf.gz", ".filtered.vcf.gz")))
val gather  = filter.gather(fs => new GatherVcfs(in=fs.map(_.out), out=output))

root ==> scatter
gather ==> new DeleteFiles(dir)

There's a lot going on in this short code snippet!

  1. SplitIntervalsForCallingRegions is a task that breaks apart an interval list into many smaller interval lists. It also implements Partitioner[PathToIntervals] which allows the scatter/gather framework to use it to initiate a scatter operation.
  2. On line 3 we instantiate a Scatter object which is used to wire together and manage all the tasks in the scatter/gather pipeline
  3. On line 4 we use the .map() method on Scatter to create a HaplotypeCaller task per partition. The .map() method takes as a parameter a function that is invoked once per input partition to create the jobs we need.
  4. On lines 5 and 6 we continue the scattered processing by extending the chain with further calls to .map(). In this case the function we provide to .map() receives as input the prior task in the chain.
  5. On line 7 we use the .gather() function to generate a task that gathers the results of all the scatters. It receives as input the last task from each branch of the scatter, and uses their outputs as the input to GatherVcfs.
  6. Lastly on lines 9-10 we wire in the dependencies. The framework has already hooked up all the dependencies between parts of the scatter/gather workflow so all that's left is to wire scatter to it's dependencies, and add any tasks that come afterwards

It should be noted that all of the objects generated during a scatter/gather workflow (e.g. scatter, hc, gt, filter) can be .map()'d and .gather()'d more than once. I.e. you can build a scatter/gather pipeline with multiple branches and multiple endpoints without repeating any work.