ropensci / drake

An R-focused pipeline toolkit for reproducibility and high-performance computing
https://docs.ropensci.org/drake
GNU General Public License v3.0
1.34k stars 128 forks source link

Parallel processing efficiency #1360

Closed dleopold closed 3 years ago

dleopold commented 3 years ago

Prework

Question

I am trying to use Drake to manage a highly parallel data processing pipeline hosted on an Amazon EC2 instance. I have everything working well on my local machine, but when I run the pipeline on the EC2 instance the efficiency of the parallel processing is significantly diminished. I am using clustermq, and it appears that the first batch of workers are initiated in parallel, but things slow down after that. I am not exactly sure what is happening, but it appears that the jobs are not being handled by persistent workers, as expected for clustermq. I can observe this by watching the activity of the individual processors/cores - on my local machine the cores are maxed out until the entire call to make() is complete, but on the EC2 instance the cores are maxed out for the first batch of jobs, but then activity drops and targets are completed at a significantly reduced rate (faster than in series, but much slower than expected).

Reproducible example

Although this may not be entirely reproducible without an similar EC2 instance to work with, here is a simple example demonstrating the behavior I am seeing.

library(drake)
library(tictoc)
library(foreach)
options(clustermq.scheduler = "multicore")

# a function that waits
waiter <- function() {
  Sys.sleep(1)
  return(rnorm(1))
}

plan <- drake_plan(
  foo=target(
    command = waiter(),
    transform = map( val = !!(1:120) ),
    trigger = trigger( condition=T )
  )
)

tic()
make(plan, 
     parallelism = "clustermq",
     jobs=6)
toc()

On my local machine the above call to make() takes ~21 seconds, but on the EC2 instance it is ~30+ seconds. In contrast, the following code takes ~21 seconds on both my local machine and the EC2 instance:

clustermq::register_dopar_cmq(n_jobs=6) 
tic()
x = foreach(i=1:120) %dopar% Sys.sleep(1)
toc()

Any idea why parallel processing with drake / clustermq would be less efficient on the EC2 instance? Or how I could get it to behave as it does locally?

Here are the details of the R session on the EC2 instance:

> sessionInfo()
R version 4.0.4 (2021-02-15)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Ubuntu 20.04.2 LTS

Matrix products: default
BLAS:   /usr/lib/x86_64-linux-gnu/blas/libblas.so.3.9.0
LAPACK: /usr/lib/x86_64-linux-gnu/lapack/liblapack.so.3.9.0

locale:
 [1] LC_CTYPE=C.UTF-8           LC_NUMERIC=C              
 [3] LC_TIME=en_US.UTF-8        LC_COLLATE=C.UTF-8        
 [5] LC_MONETARY=en_US.UTF-8    LC_MESSAGES=C.UTF-8       
 [7] LC_PAPER=en_US.UTF-8       LC_NAME=C                 
 [9] LC_ADDRESS=C               LC_TELEPHONE=C            
[11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C       

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] tictoc_1.0    foreach_1.5.1 drake_7.13.1 

loaded via a namespace (and not attached):
 [1] Rcpp_1.0.6         igraph_1.2.6       rstudioapi_0.13    magrittr_2.0.1    
 [5] hms_1.0.0          progress_1.2.2     tidyselect_1.1.0   R6_2.5.0          
 [9] rlang_0.4.10       fansi_0.4.2        storr_1.2.5        tools_4.0.4       
[13] parallel_4.0.4     utf8_1.1.4         cli_2.3.1          iterators_1.0.13  
[17] ellipsis_0.3.1     base64url_1.4      digest_0.6.27      assertthat_0.2.1  
[21] tibble_3.0.6       lifecycle_1.0.0    crayon_1.4.1       txtq_0.2.3        
[25] purrr_0.3.4        vctrs_0.3.6        codetools_0.2-18   glue_1.4.2        
[29] clustermq_0.8.95.1 pillar_1.5.0       compiler_4.0.4     filelock_1.0.2    
[33] backports_1.2.1    prettyunits_1.1.1  pkgconfig_2.0.3   
wlandau commented 3 years ago

Not sure what could be the issue. What do the worker logs say (make(..., parallelism = "clustermq", log_worker = TRUE))? Are any workers quitting in error?

dleopold commented 3 years ago

I think I have tracked down the issue. My EC2 instance is primarily working out of a shared EFS (elastic file system) drive. The extra processing time appears to be due to additional latency associated with each of the workers needing to write files to the drake cache on the EFS. By manually creating the cache on the low-latency (EBS) file system dedicated to the EC2 instance, or simply working from the EBS drive entirely, the parallel processing time works as expected. In my use case I would like to keep all of the files on the EFS drive, so I may just have to live with the additional latency. I have not tried out the targets package yet, but perhaps there would be some performance improvements if fewer files are written to the cache.

wlandau commented 3 years ago

Glad you figured it out. For what it's worth, the main process writes the return values to storage by default. make(caching = "worker") will make the parallel workers write to the cache.

And yes, I believe this bottleneck will be less severe in targets, especially with tar_option_set(storage = "worker").