ropensci / drake

An R-focused pipeline toolkit for reproducibility and high-performance computing
https://docs.ropensci.org/drake
GNU General Public License v3.0
1.34k stars 129 forks source link

Dynamic branching #685

Closed wlandau closed 4 years ago

wlandau commented 5 years ago

We want to declare targets and modify the dependency graph while make() is running. Sometimes, we do not know what the targets should be until we see the values of previous targets. The following plan sketches the idea.

library(dplyr)
library(drake)
drake_plan(
  summaries = mtcars %>%
    group_by(cyl) %>%
    summarize(mean_mpg = mean(mpg)),
  individual_summary = target(
    filter(summaries, cyl == cyl_value),
    transform = cross(cyl_value = summaries$cyl)
  )
)

Issues:

  1. How will outdated() work now? Do we have to read the targets back into memory to check if the downstream stuff is up to date?
  2. This is the biggest implementation challenge drake has faced. Hopefully the work will migrate to the workers package.
billdenney commented 5 years ago

One implementation idea that seems as though it could be "simple" to implement (says the guy who hasn't implemented anything in drake yet): Could there be a "plan in a plan" concept?

My thought is:

wlandau commented 5 years ago

A similar idea was proposed in #304. I would actually prefer to avoid nested plans because of the complexity and pre-planning they would require.

billdenney commented 5 years ago

Fair enough. If I think of something else, I'll post it.

brendanf commented 5 years ago

I want to point out the way Snakemake currently handles this, as a possible inspiration:

The second part is quite idiosyncratic to Python, so I wouldn't suggest it be implemented in the same way, but it seems easier to make the user explicitly mark the cases where dynamic branching needs to happen, than to try to detect it from the structure of their dependencies.

Using your example:

library(dplyr)
library(drake)
drake_plan(
  summaries = mtcars %>%
    group_by(cyl) %>%
    summarize(mean_mpg = mean(mpg)),
  individual_summary = target(
    filter(summaries, cyl == cyl_value),
    transform = cross(cyl_value = dynamic(summaries$cyl))
  )
)

One clear difference is that, using an R-based framework rather than a file-based framework, the output of summaries is still only one object. In a file-based framework, it might have been an unknown number of files.

As I wrote that example, I realized that it's actually much more similar to how Snakemake used to do it. In the end, they decided against that way, so maybe it would be good to know why and learn from that. Perhaps it's not relevant in drake's framework though.

wlandau commented 5 years ago

Discussion from #233 carries over to this thread.

wlandau commented 5 years ago

Users really want this flexibility, and often just assume drake already supports it, but I am beginning to question this dream scenario. If we try to implement dynamic branching deeply in drake's internals, we would need to rip half the package apart and double the complexity. Even if we offload scheduling to workers, we would still be in trouble. We would need to update config$graph, config$queue (the priority queue), and config$layout all mid-make(). drake is simply not designed for this.

The more I think about it, the more wisdom I see in @krlmlr's thinking behind #304. Possible compromise: a new split() transformation. @kendonB's #833 certainly seems to address this use case, not to mention #77.

wlandau commented 5 years ago

Update: we now have split() in the dev version: https://ropenscilabs.github.io/drake-manual/plans.html#split. Should cover many use cases that would have otherwise required dynamic branching.

wlandau commented 5 years ago

A more expedient approach

After talking with @dgkf at SDSS last week, I am no longer as reluctant as in https://github.com/ropensci/drake/issues/685#issuecomment-493838200. We can avoid a mess if we give dynamic branching its own DSL that works in tandem with the existing transformation DSL. This new dynamic DSL is just the transformation DSL invoked at runtime.

Proposal

library(drake)
plan <- drake_plan(
    vector_of_settings = target(
        f(x),
        transform = map(x = c(1, 2))
    ),
    analysis = target(
        g(x, y),
        transform = map(x),
        dynamic = map(y = vector_of_settings)
    )
)

print(plan)
#> # A tibble: 4 x 3
#>   target               command dynamic   
#>   <chr>                <expr>  <list>    
#> 1 vector_of_settings_1 f(1)    <lgl [1]> 
#> 2 vector_of_settings_2 f(2)    <lgl [1]> 
#> 3 analysis_1           g(1, y) <language>
#> 4 analysis_2           g(2, y) <language>

print(plan$dynamic)
#> [[1]]
#> [1] NA
#> 
#> [[2]]
#> [1] NA
#> 
#> [[3]]
#> map(y = vector_of_settings_1)
#> 
#> [[4]]
#> map(y = vector_of_settings_2)

drake_plan_source(plan)
#> drake_plan(
#>   vector_of_settings_1 = f(1),
#>   vector_of_settings_2 = f(2),
#>   analysis_1 = target(
#>     command = g(1, y),
#>     dynamic = map(y = vector_of_settings_1)
#>   ),
#>   analysis_2 = target(
#>     command = g(2, y),
#>     dynamic = map(y = vector_of_settings_2)
#>   )
#> )

Created on 2019-06-03 by the reprex package (v0.3.0)

When we create new targets, we probably do not need to register them in config$layout or the priority queue. Suppose make() is running, it just built/checked vector_of_settings_1, and we are about to build analysis_1. Suppose vector_of_settings_1 evaluated to c("a", "b"). Then, we could

  1. Create new "target" names analysis_1_a and analysis_1_b.
  2. Create new commands for each.
  3. Submit those new commands to the scheduler as consecutive jobs, skipping outdated targets.
    • Need to check analysis_1_a and analysis_1_b individually, based on previously-cached metadata.
  4. Store a value and metadata list for each target.
  5. Store a value for target analysis_1 that
    1. Is human-readable, and
    2. Reacts to changes in analysis_1_a and analysis_1_b.
cache <- drake_cache() # successor of get_cache()
cache$get_hash("analysis_1_a")
#> [1] "1d5108bacae437a0"
cache$get_hash("analysis_1_b")
#> [1] "17b1fbe1609400b9"
readd(analysis_1)
#> target             hash
#> 1 analysis_1_a 1d5108bacae437a0
#> 2 analysis_1_b 17b1fbe1609400b9

Remarks

Thanks

This idea, along with the original DSL, were inspired by @krlmlr in #233

304.

wlandau commented 5 years ago

Hmm... what about targets downstream of analysis_1_a and analysis_1_b?

wlandau commented 5 years ago

Easy, actually: just give a special attribute (maybe an S3 class) to the analysis_1 value (the data frame of hashes). That way, when we load analysis_1 as a dependency of a dynamic transformation (say, dynamic = map(analysis_1)) we will know to map over analysis_1_a and analysis_1_b instead.

wlandau commented 5 years ago

We also need to think about how the new target names and splits are constructed. If vector_of_settings_1 is a list of large objects, we need to make up sensible names. Hashes? could be slow. We should also make an effort to handle group_by() data frames.

wlandau commented 5 years ago

Come to think of it, we probably need a trace (drake_plan(trace = TRUE)) in those special data frames so that combine(.by) still works.

brendanf commented 5 years ago

We also need to think about how the new target names and splits are constructed. If vector_of_settings_1 is a list of large objects, we need to make up sensible names. Hashes? could be slow. We should also make an effort to handle group_by() data frames.

The useful cases seem to be:

If users want the hash behavior, then they can use (and perhaps drake can provide?) a function which names a vector/list according to the hashes of its elements. e.g.

name_by_hash <- function(x, ...) {
  n <- vapply(x, digest::digest, "", ...)
  names(x) <- n
  x
}

Alternatively, always default to integer indices, and if the user wants something smarter, they can specify it with .id = (and allow tidy evaluation, so that something like .id = !!vapply(vector_of_settings, digest::digest, "") would work.

dpmccabe commented 5 years ago

Hi, I just wanted to describe another use case that would greatly benefit from dynamic branching. In my case, I have a very large data frame somewhat like this:

> d <- tibble(year = rep(2010:2015, each = 5), x = 1:30)
# A tibble: 30 x 2
    year     x
   <int> <int>
 1  2010     1
 2  2010     2
 3  2010     3
 4  2010     4
 5  2010     5
 6  2011     6
 7  2011     7
 8  2011     8
 9  2011     9
10  2011    10
# ... with 20 more rows

I'd like to be able to split d by year, and then create a new column f(x) := x + 100 in those batches (where f is actually an expensive function). Since I'm not often adding new rows to the data frame for very old years, I want to benefit from cached results for the 2010, 2011, ... splits and only recalculate for 2019, which is the only split that has changed/new data.

It would be a game-changer to be able to use drake like this, since more often than not I can think of a splitting scheme that would effectively partition the data into stale and up-to-date splits.

wlandau commented 5 years ago

@dpmccabe, I see what you mean. I just encountered a very similar situation for a project at work.

I am realizing that https://github.com/ropensci/drake/issues/685#issuecomment-498382301 has serious problems:

  1. It would be extremely difficult to make the current DSL behave exactly the same for dynamic branching, and the inevitable subtle differences would add a lot of confusion.
  2. To work around (1), we would need to make an entire new DSL just for dynamic branching, and that adds a lot of additional complexity for users.
  3. Either way, https://github.com/ropensci/drake/issues/685#issuecomment-498382301 could add a lot of complexity to the code base.

An alternative is @brendanf's suggestion of checkpointing (https://github.com/ropensci/drake/issues/685#issuecomment-466927091). For drake, this essentially means turning a plan into a bunch of subplans. Take this plan as an example:

drake_plan(
  vector_of_settings_1 = f(1),
  vector_of_settings_2 = f(2),
  analysis_1 = target(
    command = g(1, y),
    transform = map(y = vector_of_settings_1)
  ),
  analysis_2 = target(
    command = g(2, y),
    transform = map(y = vector_of_settings_2)
  )
)

It is already natural for users to think about it as two separate plans:

drake_plan(
  vector_of_settings_1 = f(1),
  vector_of_settings_2 = f(2)
)

drake_plan(
  analysis_1 = target(
    command = g(1, y),
    transform = map(y = vector_of_settings_1)
  ),
  analysis_2 = target(
    command = g(2, y),
    transform = map(y = vector_of_settings_2)
  )
)

Maybe make() could do something similar: split a monolithic plan into chunks as appropriate, and then transform/make() those chunks in topological order.

wlandau commented 5 years ago

...but then r_make() would might have issues, depending on the implementation.

wlandau commented 5 years ago

But I still think it is worth trying https://github.com/ropensci/drake/issues/685#issuecomment-498382301 in a branch. That approach to dynamic branching might allow drake to scale better with large numbers of targets.

wlandau commented 5 years ago

Changing my mind back

I no longer believe https://github.com/ropensci/drake/issues/685#issuecomment-498382301 needs to complicate the internals or slow us down. With careful refactoring, we can

  1. Drop config$layout.
  2. Set transform = FALSE as default in drake_plan().
  3. Transform commands and analyze them for dependencies as make() is running.

Benefits

Drawbacks

A remark on implementation

Yes, I know this issue has been on the table for a year and a half and so far I have not implemented a thing. But now that we have a path to simplify drake instead of complicating it, I am more likely to get started. But first I should finish #820, which is time-consuming in its own right.

brendanf commented 5 years ago

Are you still talking about

split a monolithic plan into chunks as appropriate, and then transform/make() those chunks in topological order.

or are you now thinking of transforming each rule individually? If there is still a notion of "chunks", where the division between the chunks are the "checkpoints", then we could have something like:

drake_plan(
  clusters = checkpoint(
    make_clusters() # returns a list
  ),
  A = target(
    do_A(x),
    transform = map(x = clusters)
  ),
  B = target(
    do_B(A),
    transform = map(A)
  )
)

All A and B targets would obviously have to wait for clusters before being transformed and running. But once the checkpoint is passed, then all transformations up to the next checkpoint (in this case there is none) could be applied at the same time. b_2 would be able to start as soon as a_2 is finished, regardless if a_1 is still running.

An alternate syntax would be:

drake_plan(
  clusters = target(
    make_clusters(), # returns a list
    unpack = TRUE # tell drake that we want targets clusters_1, clusters_2, clusters_3...
  ),
  A = target(
    do_A(clusters),
    transform = map(clusters)
  ),
  B = target(
    do_B(A),
    transform = map(A)
  )
)
wlandau commented 5 years ago

I was thinking about the non-checkpoint route because we avoid creating extra API functionality.

drake_plan(
  data = make_clusters()
  clusters = target(
    data,
    transform = split(slices = length(data))
  ),
  A = target(
    do_A(x),
    transform = map(x = clusters)
  ),
  B = target(
    do_B(A),
    transform = map(A)
  )
)

And I think that's what we should go with first. But I do like your checkpoint() API idea because it avoids duplicating the data.

wlandau commented 5 years ago

Also, I keep forgetting about the inevitable different behavior of the current DSL vs dynamic branching. In the first rollout at least, I think we really do need target(..., dynamic = ...), and we should caution users to not put transform downstream of dynamic.

drake_plan(
  data = make_clusters()
  clusters = target(
    data,
    dynamic = split(slices = length(data))
  ),
  A = target(
    do_A(x),
    dynamic = map(x = clusters)
  ),
  B = target(
    do_B(A),
    dynamic = map(A)
  )
)
wlandau commented 5 years ago

Back to https://github.com/ropensci/drake/issues/685#issuecomment-511386400: maybe split() should just behave differently for dynamic branching. Maybe it should take no arguments and make a decision based on what the return value looks like. If clusters is a list, the following could save each element as a target. Vectors, and anything with a length(), could behave the same way.

drake_plan(
  clusters = target(
    get_list_of_datasets(),
    dynamic = split()
  ),
  A = target(
    do_A(x),
    dynamic = map(x = clusters)
  ),
  B = target(
    do_B(A),
    dynamic = map(A)
  )
)

In the case of data frames, we could listen to groupings specified with dplyr::group_by().

drake_plan(
  clusters = target(
    get_data_frame() %>%
      group_by(var1, var2),
    dynamic = split()
  ),
  A = target(
    do_A(x),
    dynamic = map(x = clusters)
  ),
  B = target(
    do_B(A),
    dynamic = map(A)
  )
)
wlandau commented 5 years ago

Glad I took care of #820 before implementation. Should also do #822.

wlandau commented 5 years ago

For the second time, I am questioning the need for a new "dynamic" argument of target(). Currently, we ignore grouping variables we do not already have.

library(drake)
drake_plan(
  datasets = get_datasets(),
  analysis = target(
    analyze_one_dataset(datasets),
    transform = map(datasets)
  )
)
#> Warning: A grouping or splitting variable for target 'analysis' is missing
#> or undefined. Transformation skipped and target deleted.
#> # A tibble: 1 x 2
#>   target   command     
#>   <chr>    <expr>      
#> 1 datasets get_datasets

Created on 2019-08-12 by the reprex package (v0.3.0)

Instead, transform_plan() could append a new "dynamic" column of parsed "transform" objects, which we could finish interpreting during make(). Those transform objects could have grouping variables to evaluate at runtime.

wlandau commented 5 years ago

Otherwise, I like https://github.com/ropensci/drake/issues/685#issuecomment-498382301. A few more remarks:

wlandau commented 5 years ago

Prep work:

krlmlr commented 5 years ago

Do you want to store vctrs::vec_size() instead of the length? That might work better for data frames and POSIXlt objects, for instance.

wlandau commented 5 years ago

Hmm... maybe emulate it so we can avoid depending on vctrs? What are the relevant cases to consider? It might be nice to use dim()[1] for array-like objects and length() for everything else. Are there situations where vctrs::vec_size() is safer? When does it handle POSIXlt objects differently?

x <- as.POSIXlt(Sys.time(), "EST5EDT")  
y <- c(x, x)
library(vctrs)
vec_size(x)
#> [1] 1
length(x)
#> [1] 1
vec_size(y)
#> [1] 2
length(y)
#> [1] 2

Created on 2019-09-14 by the reprex package (v0.3.0)

krlmlr commented 5 years ago

{vctrs} conceptually treats data frames as a vector of rows, which I think would be idiomatic here. It's becoming a "free dependency" just like {rlang}, and imports only {rlang} which we already have.

As a stop-gap we can use NROW() which matches the behavior of vec_size() in many cases.

wlandau commented 5 years ago

Indeed, I now see in the help file:

vec_size() is equivalent to NROW() but has a name that is easier to pronounce, and throws an error when passed non-vector inputs.

I like the child-proofing.

library(R6)
library(vctrs)
Accumulator <- R6Class("Accumulator", list(
  sum = 0,
  add = function(x = 1) {
    self$sum <- self$sum + x 
    invisible(self)
  })
)
x <- Accumulator$new()
NROW(x)
#> [1] 4
vec_size(x)
#> `x` must be a vector, not a `Accumulator/R6` object

Created on 2019-09-14 by the reprex package (v0.3.0)

wlandau commented 5 years ago

I am starting to question https://github.com/ropensci/drake/issues/685#issuecomment-498382301 for different reasons:

  1. Step 5 requires us to define an extra target for bookkeeping. If analysis_1_a and analysis_1_b are dynamic targets, the proposal also requires us to define a dummy analysis_1 target, which is a data frame of hashes. That's messy.
  2. For HPC, the proposal makes it hard to do anything other than staged parallelism. If we're going to implement something as difficult as dynamic branching, I would like to do better.

We should declare dynamic targets that can exist autonomously, and we should push them to the priority queue.

wlandau commented 5 years ago

Another challenge is to make a dynamic target depend on part of a dependency, not the whole thing. Example:

library(drake)
plan <- drake_plan(
    y = target(
        f(x),
        transform = map(x = c(1, 2))
    ),
    z = target(
        g(y),
        transform = map(y)
    )
)

z_1 depends on y[[1]], z_2 depends on y[[2]], and neither z_1 nor z_2 depends on all of y. However, I think we need the entirety of y in memory in order to check if z_1 and z_2 are up to date, and I think we need to check all the z_*'s at the same time. Then, we can push the outdated z_*'s to the priority queue and tell drake to just build them without checking them a second time.

wlandau commented 5 years ago

Re https://github.com/ropensci/drake/issues/685#issuecomment-534540987, maybe we do need a bookkeeper target (like analysis_1) in order to implement combine(). However, we should be able to do map(), split(), and cross() (and eventually group_map()) without this dummy target.

Also, I think we should store the dummy target in a different storr namespace to separate it from the true targets.

wlandau commented 5 years ago

Re #685 (comment), maybe we do need a bookkeeper target

And not just for dynamic targets, but also for dynamic dependencies as well: the things that get split up to make dynamic targets.

wlandau commented 5 years ago

I reflected on dynamic branching (and #1027 and #1028) and came up with a clearer plan.

Goals of dynamic branching

  1. Define new targets while make() is running.
  2. Enable Ω(1) processing of large collections of up-to-date targets. I think we can make outdated() nearly instantaneous in many common use cases.
  3. Reduce the number of files in the cache for large workflows.

Sketch

drake_plan(
  w = seq_len(1e6),
  x = target(f(w), transform = map(w)),
  y = target(x, transform = map(x)),
  z = target(y, transform = combine(y))
)

Definitions

Note: a static target can be a dynamic dependency of a dynamic target downstream.

Target status

When we check whether a dynamic target is up to date, sometimes we can just check the target as a whole (fast). Other times, we need to look at its dynamic sub-targets too (slow). In the flowchart below, the yellow/orange steps should be time-savers and the blue ones should be time-consuming.

chart

outdated() should always assume the worst when it comes to (3). That is what will make so fast for up-to-date targets.

Building dynamic targets

When it comes time to build x:

  1. Declare the x_i's using the NROW() or vec_size() available in the metadata of the dynamic dependencies.
  2. Insert the x_i's as direct dependencies of x in the graph. Each x_i is a leaf node, with no dependencies of its own. We need them to be in the graph so we can decrease the key of x in the priority queue as the x_i's finish.
  3. Insert the x_i's at the head of the priority queue.
    • Give each x_i an ndeps key of 0.
    • Increase the ndeps key of x from 0 to the number of x_i's.
    • Remember to call $sort() afterwards because we're not using conventional priority queue algorithms. (It's never the bottleneck anyway.)
  4. Continue building targets as usual. Because of the way we updated the priority queue, we should reach all the x_i's before x itself. In the meantime, completely different targets can begin while the x_i's are in progress (if there are workers available).
  5. When we reach an x_i, store the target, but not the metadata.
  6. We should reach x again only after all the x_i's are checked and built. When that happens, the value of x should be a composite of the names and hashes of the x_i's. Store it as a regular target, metadata and all.

(2) and (3) become much simpler if there is no parallel computing. All we need to do is augment the character vector over which we are looping.

API and prep

The API is the same as the current DSL. However, we need to process things differently. Let's go with the dynamic column from https://github.com/ropensci/drake/issues/685#issuecomment-498382301. We should also start saving NROW() or vec_size() for all targets, not just the ones that are likely to be dynamic dependencies.

Aftermath and remarks

wlandau commented 5 years ago

For predict_runtime() and predict_workers(), I think it would be reasonable to take the overall recorded time of x and divide it evenly into into predicted runtimes for x_1, ..., x_1000000.

wlandau commented 5 years ago

Unfortunately, I think we need separate a new dynamic argument to target(). Otherwise, the static transforms tend to turn targets into grouping variables when they should really be part of dynamic branching.

wlandau commented 5 years ago

Last week, I wrote enough of the API and documentation to get started. Implementation roadmap.

Prep

Primary action items

Wrap-up

wlandau commented 5 years ago

For splitting, we probably need separate split_list() and split_array() transforms.

wlandau commented 5 years ago

Scratch that.

For splitting, we can probably make use of a .by argument to make it like group_map(). For both split() and combine(), .by should be a single object (import or target) and it alone determines the number of dynamic sub-targets. Simple and sensible.

Also, split() and combine() should work on one non-.by symbol at a time. Only map() and cross() can handle multiple symbols. So the signatures should look like this:

map(...)
cross(...)
split(target, .by)
combine(target, .by)
wlandau commented 5 years ago

Utility functions:

wlandau commented 5 years ago

Another simplifying assumption for all transforms: for vector-like and list-like objects, map/reduce over the length. For array-like objects, map/reduce over the first dimension (e.g. rows for data frames).

wlandau commented 5 years ago

A couple notes:

  1. For the non-HPC case, let's do the simple thing and spawn dynamic targets inside something like local_build(). We may even be able to leverage that technique for the HPC backends.
  2. Data recovery for dynamic sub-targets is going to require some work. Fortunately, a dynamic target is just a vector of hashes, and we can just match up those hashes to the sub-target names.
wlandau commented 5 years ago

As I attempt an implementation, I am finding that because I am trying to avoid saving metadata lists, I have to reinvent a lot of internal machinery. Maybe it's better to save that metadata for dynamic sub-targets. The internal overhaul may not be as catastrophic, and we still gain efficiency because we do not need to actually check the metadata as often.

wlandau commented 5 years ago

Yeah, we will need metadata for things like seeds and warnings. But we will still see performance gains in other ways.

wlandau commented 5 years ago

It is coming time to work on split(), and I am rethinking it. It should look like split(..., .by) and probably take multiple variables for ... and .by.

wlandau commented 5 years ago

On second thought, let's hold off on split(). map() might already have everything we need.

wlandau commented 5 years ago

Thoughts on dynamic triggering:

wlandau commented 5 years ago

On second thought, let's leave the condition and change triggers as they are. Let's prevent people from using dynamic grouping variables inside condition and change. E.g. this should not be allowed:

drake_plan(
  x = seq_len(4),
  y = target(x, trigger = trigger(condition = x > 2), dynamic = map(x)),
  z = target(x, trigger = trigger(change = x), dynamic = map(x)),
)
wlandau commented 5 years ago

To avoid duplicating code over various HPC backends, let's have backend_loop() use the priority queue. With that direction, I will likely work on HPC before triggers.