ropensci / drake

An R-focused pipeline toolkit for reproducibility and high-performance computing
https://docs.ropensci.org/drake
GNU General Public License v3.0
1.34k stars 128 forks source link

Suggestions on how I could alter my workflow re: parallel processing to work inside a drake plan? #1099

Closed hlynurhallgrims closed 4 years ago

hlynurhallgrims commented 4 years ago

I have a Windows work computer and I rely rather heavily on parallel processing in my workflow. It usually involves sampling and then applying a function.

library(doParallel)
#> Loading required package: foreach
#> Loading required package: iterators
#> Loading required package: parallel
library(purrr)
#> 
#> Attaching package: 'purrr'
#> The following objects are masked from 'package:foreach':
#> 
#>     accumulate, when
library(dplyr)
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union

my_n <- 10 # We're gonna be sampling 10 rows each time

cars_list <- mtcars %>% 
  as_tibble() %>% 
  list() %>% 
  rep(16)

cl <- makeCluster(2)
registerDoParallel(cl)
result_list <- foreach (i = 1:20, .packages = c("dplyr", "purrr")) %dopar% {

  map2(.x = cars_list, .y = my_n, 
       .f = ~sample_n(tbl = .x, size = .y, replace = TRUE, weight = wt)) %>% 
    map(~summarize_at(.,
                      .vars = vars(mpg, drat),
                      .funs = median))
}

stopCluster(cl)

Created on 2019-12-09 by the reprex package (v0.3.0)

In reality I'd be doing 15,000 to 100,000 iterations. Which can take everything from 20 minutes to 3 days on 8 cores, depending on the task at hand and the complexity of the function we'd be applying (we're doing tax benefit microsimulation, modeling differing policy rules). For my current drake plans I've been doing this outside of the drake plan, and then having a caching function where I input the dependencies and then read the cached result file - basically just so that it shows up in the depenendy graph.

I've only been using drake for 5 weeks, but seeing how incredibly useful and powerful it is, I'm basically never going back. So now I'm trying to figure out how to implement the above code in a drake plan.

From reading questions on drake parallelism on Windows it seems that something like parLapply would be the way to go, but I haven't gotten that to work.

my_n = 10 # We're gonna be sampling 10 rows each time

cars_list = mtcars %>% 
  as_tibble() %>% 
  list() %>% 
  rep(16)

my_func <- function(x) {
  print(x)
  map2(.x = cars_list, .y = my_n, 
       .f = ~sample_n(tbl = .x, size = .y, replace = TRUE, weight = wt)) %>% 
    map(~summarize_at(.,
                      .vars = vars(mpg, drat),
                      .funs = median))
}

cl <- makeCluster(2)

result_list_2 = parLapply(cl, 1:20, fun = my_func(1))

I'll spare you the crazy, giant error message, and I'll just assume that the above code is very silly on my part.

Ideally I'd love to figure out how I can do this with a future backend and furrr, but I have no idea how I'd go about doing that. Silly non-working example:

library(furrr)
#> Loading required package: future
library(dplyr)
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union

my_n <- 10 # We're gonna be sampling 10 rows each time

cars_list <- mtcars %>% 
  as_tibble() %>% 
  list() %>% 
  rep(16)

# What I feel like would be a nice approach (in some backwards world where pmap accepted elements of differing lenghts)
my_function <- function(x, y, z) {

  set.seed(z)
  sample_n(tbl = x, size = y, replace = TRUE, weight = wt) %>% 
    summarize_at(.vars = vars(mpg, drat),
                 .funs = median)

}

list = future_pmap(.l = list(cars_list, my_n, 1:20), .f = ~my_function(..1, ..2, ..3)) #Essentially, I want 20 iterations of this
#> Error: Element 1 has length 16, not 1 or 20.

Created on 2019-12-09 by the reprex package (v0.3.0)

I realise that this is a rather weird question, as it doesn't actually have to do with drake, but rather how I can do other things in a manner that fits inside a drake plan. If this is outside of the scope of questions here within the drake repo, please feel free to delete.

Thanks in advance and thanks for an incredible package.

wlandau commented 4 years ago

I'm glad you are starting to use furrr. A couple notes:

  1. For parallel computing, I recommend the future.callr backend with furrr.
  2. pmap() and future_pmap() do require all the list elements to be the same length.

See below.

library(furrr)
#> Loading required package: future
library(dplyr)
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union
library(tidyr)

future::plan(future.callr::callr, workers = 2)

cars_list <- mtcars %>% 
  as_tibble() %>% 
  list() %>% 
  rep(16)

my_function <- function(data, size, seed) {
  set.seed(seed)
  sample_n(tbl = data, size = size, replace = TRUE, weight = wt) %>% 
    summarize_at(
      .vars = vars(mpg, drat),
      .funs = median
    )
}

wrapper_function <- function(index, size, seed) {
  data <- cars_list[[index]]
  my_function(data = data, size = size, seed = seed)
}

grid <- expand_grid(
  index = seq_along(cars_list),
  size = 10,
  seed = seq_len(20)
)

list <- future_pmap(.l = grid, .f = wrapper_function)
#> Warning in saveRDS(list(options$func, options$args), file = tmp):
#> 'package:tidyr' may not be available when loading

#> Warning in saveRDS(list(options$func, options$args), file = tmp):
#> 'package:tidyr' may not be available when loading

list[seq_len(2)]
#> [[1]]
#> # A tibble: 1 x 2
#>     mpg  drat
#>   <dbl> <dbl>
#> 1  20.3  3.46
#> 
#> [[2]]
#> # A tibble: 1 x 2
#>     mpg  drat
#>   <dbl> <dbl>
#> 1  18.4  3.15

Created on 2019-12-09 by the reprex package (v0.3.0)

hlynurhallgrims commented 4 years ago

Thanks so much for the help @wlandau. I'll admit that I'm a bit out of my element but I'm trying to wrap my head around this level of abstraction and I'm super appreciative of your help. What I'm running into is that the code you supplied fits the bill and executes outside of a drake plan, but for me it's not working inside a call to drake_plan.

#> target my_list
#> fail my_list
#> Error: Target `my_list` failed. Call `diagnose(my_list)` for details. Error message:
#>   object 'cars_list' not found

What I've tried doing (flailing in the dark essentially)

Here's the reprex of running it once outside of a drake plan, and once inside.

library(furrr)
#> Loading required package: future
library(dplyr)
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union
library(tidyr)
library(drake)
#> 
#> Attaching package: 'drake'
#> The following objects are masked from 'package:tidyr':
#> 
#>     expand, gather
#> The following object is masked from 'package:future':
#> 
#>     plan

my_function <- function(data, size, seed) {
  set.seed(seed)
  sample_n(tbl = data, size = size, replace = TRUE, weight = wt) %>% 
    summarize_at(
      .vars = vars(mpg, drat),
      .funs = median
    )
}

wrapper_function <- function(index, size, seed) {
  data <- cars_list[[index]]
  my_function(data = data, size = size, seed = seed)
}

# Outside of a drake plan this works fine ---------------------------------

cars_list = mtcars %>% 
  as_tibble() %>% 
  list() %>% 
  rep(16)

grid = expand_grid(
  index = seq_along(cars_list),
  size = 10,
  seed = seq_len(20)
)

my_list = future_pmap(.l = grid, .f = wrapper_function)

# But within a drake plan it fails ----------------------------------------

my_plan = drake_plan(

cars_list = mtcars %>% 
  as_tibble() %>% 
  list() %>% 
  rep(16),

grid = expand_grid(
  index = seq_along(cars_list),
  size = 10,
  seed = seq_len(20)
),

my_list = future_pmap(.l = grid, .f = wrapper_function)
)

make(my_plan)
#> unload targets from environment:
#>    cars_list
#>   grid
#>   my_list
#> target cars_list
#> target wrapper_function
#> target grid
#> target my_list
#> fail my_list
#> Error: Target `my_list` failed. Call `diagnose(my_list)` for details. Error message:
#>   object 'cars_list' not found

Created on 2019-12-12 by the reprex package (v0.3.0)

wlandau commented 4 years ago

Ah, my mistake. cars_list is a target so it should be an argument to wrapper_function() rather than a global variable dependency. In the latter case, we have an imported function that depends on a target, which is bad. Notice how cars_list points to wrapper_function in the graph. We want to avoid that.

config <- drake_config(my_plan)
vis_drake_graph(config)

Screen Shot 2019-12-12 at 9 26 16 AM

This works:

library(furrr)
#> Loading required package: future
library(dplyr)
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union
library(tidyr)
library(drake)
#> 
#> Attaching package: 'drake'
#> The following objects are masked from 'package:tidyr':
#> 
#>     expand, gather
#> The following object is masked from 'package:future':
#> 
#>     plan

my_function <- function(data, size, seed) {
  set.seed(seed)
  sample_n(tbl = data, size = size, replace = TRUE, weight = wt) %>% 
    summarize_at(
      .vars = vars(mpg, drat),
      .funs = median
    )
}

# add a cars_list argument
wrapper_function <- function(index, size, seed, cars_list) {
  data <- cars_list[[index]]
  my_function(data = data, size = size, seed = seed)
}

my_plan = drake_plan(

  cars_list = mtcars %>% 
    as_tibble() %>% 
    list() %>% 
    rep(16),

  grid = expand_grid(
    index = seq_along(cars_list),
    size = 10,
    seed = seq_len(20)
  ),

  my_list = future_pmap(
    .l = grid,
    .f = wrapper_function,
    cars_list = cars_list # supply the cars_list target as an argument.
  )
)

make(my_plan)
#> target cars_list
#> target grid
#> target my_list

Created on 2019-12-12 by the reprex package (v0.3.0)

hlynurhallgrims commented 4 years ago

It works! Thanks so much for the speedy reply. You're amazing.