fulcrumgenomics / dagr

A scala based DSL and framework for writing and executing bioinformatics pipelines as Directed Acyclic GRaphs
MIT License
69 stars 15 forks source link

Various Task Manage Speedups #373

Closed nh13 closed 4 years ago

nh13 commented 4 years ago

I am using the pipeline below to do some stress testing:

Test Pipeline Place in ` pipelines/src/main/scala/dagr/pipelines/TestingPipeline.scala` ```scala /* * The MIT License * * Copyright (c) $year Fulcrum Genomics * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. * */ package dagr.pipelines import dagr.core.cmdline.Pipelines import dagr.core.tasksystem._ import com.fulcrumgenomics.sopt.{arg, clp} import dagr.core.execsystem.{Cores, Memory, ResourceSet} import com.fulcrumgenomics.commons.CommonsDef.forloop private trait GreedyResourcePicking extends UnitTask { override def pickResources(availableResources: ResourceSet): Option[ResourceSet] = { val mem = Memory("1g") val cores = Cores(1) val desiredResources = ResourceSet(cores, mem) availableResources.subset(desiredResources).map { _ => desiredResources } } } private class SleepProcessTask(seconds: Int = 1) extends ProcessTask with GreedyResourcePicking { override def args: Seq[Any] = "sleep" :: s"$seconds" :: Nil } private class SleepInJvmTask(seconds: Int = 1) extends SimpleInJvmTask with GreedyResourcePicking { def run(): Unit = { logger.info(s"Sleeping for $seconds") Thread.sleep(seconds * 1000) logger.info(s"I'm awake!") } } /** * Very simple example pipeline that creates random tasks and dependencies */ @clp(description="A bunch of sleep tasks.", group = classOf[Pipelines]) class TestingPipeline ( @arg(flag='j', doc="Use JVM tasks") val jvmTask: Boolean = false, @arg(flag='n', doc="The number of tasks to create") val numTasks: Int = 100, @arg(flag='p', doc="The probability of creating a dependency") val dependencyProbability: Double = 0.1, @arg(flag='s', doc="The seed for the random number generator") val seed: Option[Long] = None, @arg(flag='S', doc="The time for each task to sleep in seconds") val sleepSeconds: Int = 1, @arg(flag='f', doc="The failure rate of tasks") val failureRate: Double = 0.0 ) extends Pipeline { private val randomNumberGenerator = seed match { case Some(s) => new scala.util.Random(s) case None => scala.util.Random } private def toATask: (Int) => Task = (s) => { if (randomNumberGenerator.nextFloat() < failureRate) { ShellCommand("exit", "1") } else { new SleepProcessTask(s) } } private def toBTask: Int => Task = (s) => { if (randomNumberGenerator.nextFloat() < failureRate) { SimpleInJvmTask.apply(name = "Name", f = { if (true) throw new IllegalArgumentException("failed") else Unit }) } else { new SleepInJvmTask(s) } } private val toTask = if (jvmTask) toBTask else toATask private val taskType = if (jvmTask) "JVM" else "Shell" override def build(): Unit = { // create the tasks val tasks: Seq[Task] = for (i <- 0 to numTasks) yield toTask(sleepSeconds) withName s"task-${taskType}-$i" // make them depend on previous tasks var rootTasks = Seq.range(start=0, numTasks).toSet forloop(from = 0, until = numTasks) { i => forloop(from = 0, until = i) { j => if (randomNumberGenerator.nextFloat < dependencyProbability) { require(i != j) logger.info(s"Task $i will depend on task $j") tasks(j) ==> tasks(i) rootTasks = rootTasks - i } } } require(rootTasks.nonEmpty) rootTasks.foreach { i => root ==> tasks(i) } } } ```

In particular, these options:

> dagr TestingPipeline -n 3000 -S 0 -p 0.1 -j 
> dagr TestingPipeline -n 3000 -S 0 -p 1 -j 
> dagr TestingPipeline -n 5000 -S 0 -p 0.1 -j 
> dagr TestingPipeline -n 25000 -S 0 -p 0.1 -j 
codecov-io commented 4 years ago

Codecov Report

Merging #373 into master will decrease coverage by 0.41%. The diff coverage is 91.66%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #373      +/-   ##
==========================================
- Coverage   91.95%   91.53%   -0.42%     
==========================================
  Files          31       31              
  Lines        1156     1182      +26     
  Branches       65       73       +8     
==========================================
+ Hits         1063     1082      +19     
- Misses         93      100       +7     
Impacted Files Coverage Δ
.../main/scala/dagr/core/execsystem/TaskManager.scala 90.79% <73.91%> (-2.07%) :arrow_down:
.../main/scala/dagr/core/execsystem/TaskTracker.scala 93.97% <96.29%> (-1.09%) :arrow_down:
...rc/main/scala/dagr/core/execsystem/GraphNode.scala 94.11% <100.00%> (+0.36%) :arrow_up:
...src/main/scala/dagr/core/tasksystem/Pipeline.scala 88.00% <100.00%> (+3.00%) :arrow_up:
...ore/src/main/scala/dagr/core/tasksystem/Task.scala 95.71% <100.00%> (+0.19%) :arrow_up:

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update ad14fa1...c3dc024. Read the comment docs.