metrumresearchgroup / mrgsolve

Simulate from ODE-based population PK/PD and QSP models in R
https://mrgsolve.org
129 stars 36 forks source link

parallelizing simulations on Windows #111

Closed JasonW000 closed 7 years ago

JasonW000 commented 8 years ago

I am trying to do trial simulation using R shiny/mrgsolve on my Windows machine and based on existing code that worked well on Metworx (see below for excerpt)

iter<-reactive({ RNGkind("L'Ecuyer-CMRG") set.seed(input$seed) sample(1:6000,input$ntrial) })

simdata<-eventReactive(input$gosim,{ RNGkind("L'Ecuyer-CMRG") set.seed(as.integer(input$seed)) mc.reset.stream() sim <- mclapply(iter(),mc.cores=mc_cores, simdv,model=mod2,nid=input$nid, ndos=input$ndos,intv=input$intv,dose=input$dose,dhr=input$dhr,cmt=as.numeric(input$roa), ka=input$ka, cl=input$cl, v1=input$v1,q=input$q,v2=input$v2,bl=input$bl,kout=input$kout, emax=input$emax,ec50=input$ec50,omegam=omegam(),fe=input$doseratio) sim })

I understand that mclapply doesn't work on Windows but wondering if there is an easy fix to allow this to either run just the same code but not relying on pararellizing or if there is another option (e.g. using parLapply) that you have already worked out for running in a Windows environment?

kylebaron commented 8 years ago

If you need to run parallel, try doParallel (an example here https://github.com/mrgsolve/examples/blob/master/PrTS/pts.pdf see section 7.3). It's a different, more complicated setup so you might need to make a change to the structure of your app.

It's easier to just run the same code sans-parallel:

library(parallel)

if(.Platform$OS.type == "windows") {
  mc.reset.stream <- function() return(invisible(NULL))
  mclapply <- function(...,mc.cores=NULL) { lapply(...) }
}

Important to load parallel first ... and make sure your versions of the mc functions get the last word.

kylebaron commented 8 years ago

@dpastoor might know if the future package (https://github.com/HenrikBengtsson/future) would allow you to write portable, parallelized (in some form) code for this, running on both linux and Windows ... I am not sure.

dpastoor commented 8 years ago

@kylebmetrum Yes I saw this issue and tagged it to respond after some meetings tomorrow.

The short is, the future package is the way you want to go, works great on windows, but the documentation is still a bit spartan so a couple useful patterns will go a long way.

@JasonW000 If I haven't re-responded to this issue by Thursday ping me again and I can provide some examples of using mrgsolve with futures for parallelization

JasonW000 commented 8 years ago

@kylebmetrum Thank you for the simple approach to substitute lapply for mclapply on Windows. This worked. I also appreciate the information for using doParallel or future in case I need to come back to this at a later time. I'm good for now :)

billdenney commented 6 years ago

@dpastoor Ping for the example?

dpastoor commented 6 years ago

basically the simplest example you can get:

library(future)
#> Warning: package 'future' was built under R version 3.4.1
library(mrgsolve)
library(dplyr)
#> Warning: package 'dplyr' was built under R version 3.4.1
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union

simplest_model <- mrgsolve:::house()
reps <- 1:10

plan(multiprocess) # control how the execution flow will occur
results <- future_lapply(reps, function(.rep) {
  simplest_model %>% 
    ev(amt = 100, ii = 12, addl = 2) %>%
    req(DV, RESP) %>%
    mrgsim(end = 48) %>% 
    mutate(REP = .rep)
}) %>% 
  # this is a list of results back so will
  # need to bind back to a df
  bind_rows()

head(results)
#> # A tibble: 6 x 6
#>      ID  time     RESP       DV       CP   REP
#>   <dbl> <dbl>    <dbl>    <dbl>    <dbl> <int>
#> 1     1  0.00 50.00000 0.000000 0.000000     1
#> 2     1  0.00 50.00000 0.000000 0.000000     1
#> 3     1  0.25 48.68223 1.287441 1.287441     1
#> 4     1  0.50 46.18005 2.225208 2.225208     1
#> 5     1  0.75 43.61333 2.904129 2.904129     1
#> 6     1  1.00 41.37943 3.391488 3.391488     1
tail(results)
#> # A tibble: 6 x 6
#>      ID  time     RESP       DV       CP   REP
#>   <dbl> <dbl>    <dbl>    <dbl>    <dbl> <int>
#> 1     1 46.75 37.95487 3.094686 3.094686    10
#> 2     1 47.00 38.06878 3.056243 3.056243    10
#> 3     1 47.25 38.18195 3.018278 3.018278    10
#> 4     1 47.50 38.29437 2.980784 2.980784    10
#> 5     1 47.75 38.40605 2.943756 2.943756    10
#> 6     1 48.00 38.51698 2.907188 2.907188    10

lets break it down:

1) the future package gives a wonderful abstraction on multiple parallel processing packages. More importantly, it also allows you to switch between threaded execution in those separate processes, and a synchronous calculation in the main process. This is amazing for debugging as stuff running in the forks/threads gobbles messages/warnings/errors.

So switching between sync/parallel is as simple as changing the plan()

2) to execute in parallel, the future_lapply() is the easiest imo to use as it is a pattern most know or can pick up quickly.

caveats

This tips extend beyond mrgsolve, but are common things people to run in

1) be very careful in how code is parallelized. mrgsolve is really really fast with multiple individuals because the R --> cpp --> R is minimized, so unlike RxODE or other single individual simulators does not linearly scale in the time per individual simulated. Eg RxODE is 100x longer to simulate 100 inds than 1, mrgsolve may only be 15-20x for 100 individuals.

The pattern I typically use is to fine the coursest chunks to simulate from, then work from there. In fact, I have a function in PKPDmisc chunk_df that will break a dataframe into course chunks to simulate from

df %>% 
  group_by(REP) %>%
  chunk_df %>% # this will evenly split reps up into chunks equal to number of cores on computer
  split(.$chunk__) %>%
  future_lapply(...)

2) mrgsolve seems to have some issues with the pointer to a model in memory when forking. @kylebmetrum I don't think has reported seeing that issue himself, regardless, using futures it crashes sometimes given a model from the global environment in the future() call. Hence, I make liberal use of mread_cache() and just pull in a fresh copy for each thread.

I actually use a small package I created https://github.com/dpastoor/overseer to manage multiple models this way with an easier syntax and transparent caching (I need to work on the documentation badly though so feel free to ask questions)

3) Be careful with how big of data is going into each thread. A subtle error that can cause crashing is when getting sloppy with sending in large global data or generating and keeping around huge simulation outputs. Especially on remote machines with high CPU/ram ratios, I've dealt with RAM constrained simulations if trying to keep raw results around too long before reducing it.

If you do have adequate ram options(future.globals.maxSize = 1000*1024^2) will at least let you pull in larger data (defaults to 500 MB before throwing an error). But generally the crashes come from out of control kicking out too much raw individual timecourse data per rep.

Let me know if you have other questions

billdenney commented 6 years ago

Thanks for the quick and detailed response!

kylebaron commented 6 years ago

@dpastoor Thanks for the nice example. I really need to get back into using future.

Regarding mrgsolve having difficulty finding the model when forking. If this is happening sometimes and other times it doesn't but with the same parallel method, I'm not sure why that might be.

But sometimes (depending on the method) when you fork a process for running the job, you can get the model object (mod) but the shared object isn't loaded. This shouldn't be random but is a "feature" of the parallelization method. If this is the case, use loadso(mod), which will use mrgsolve:::sodll to locate the shared object and load it. If you are pushing jobs out to grid engine or on to other compute nodes, it's best to build the model in a local directory (mread("mymodel", soloc = getwd()) or something like that). Otherwise, the job running on the other node might not be able to reach the tempdir() for the parent R session.

mattfidler commented 6 years ago

@dpastoor Thanks for the tip about future. I do agree that travelling to R has its costs. I also would like to note that the development RxODE does not solve one individual at a time. I do not know the speed differences between the approaches, though.

dpastoor commented 6 years ago

@mattfidler sorry if that is my mistake. As of the last time I took a look at development (admittedly was before you really got going hard on refactoring) it still involved looping over each individual one at a time.

You may want to update the examples if there is a way to do this in one shot, such as:

nobs <- ev$get.nobs()
set.seed(1)
cp.all <- matrix(NA, nobs, nsub)
for (i in 1:nsub)
{
    theta <- theta.all[i,]
    x <- mod1$solve(theta, ev, inits=inits)
    cp.all[, i] <- x[, "C2"]
}

to reflect a way of doing this without needing to individually call each time. This is by far the biggest ding on performance because it forces O(n) hit on the R --> Cpp --> R, which I have found to be ~ 2-8 ms per invocation (with mrgsolve/other simulators at least)

mattfidler commented 6 years ago

I agree, once I smooth out the edges, I will provide example (I have opened up a bug report).

Also for is horribly inefficient. Even if this was a single simulator this should be updated.

I am curious, how does future work? As I understand it, the future would parallelize on the R level, not the C, am I somehow mistaking? That would make mrgsolve only group what simulations you want to be performed and then collate them at the end? This would add the R-level delay...? I know on Windows parallel forking isn't available. In nlmixr sometimes R-based parallel solves in windows were less efficient then a single threaded solution. Is that true of future?

dpastoor commented 6 years ago

You are correct - and thus the recommendation part 1 - minimize the number of 'chunks' you have to break apart the designs/scenarios into and give as much to a single mrgsolve invocation as possible.

Eg in a naive simulation (future or otherwise) calling

lapply(1:nsub, function(id} {
  mod %>% mrgsim()
})

will cause the back and forth to happen for each individual.

For mrgsolve it is much better to generate a dataframe of all ids, then simulate in one go

mod %>% idata_set(individuals) %>% mrgsim()

and you can see the impact of this very clearly when benchmarking

the median time per individual:

image

This is also basically a surrogate for the R/C/R interop of about 5 ms + sim time of ~ 0.25 ms per individual.

Hence, if you simulate with mrgsolve completely naively, you can easily have 10-20x "slower" simulation times than if you chunk the simulation scenarios.

Code attached to generate the plot/do benchmarking

library(tidyverse)
library(mrgsolve)
library(PKPDmisc)
code <- '
$GLOBAL
#define CP (CENT/VC)

$SET delta=0.1

$PARAM TVCL=1, TVVC=20, KA = 1.3

$CMT GUT CENT 

$MAIN
double CL = exp(log(TVCL) + ETA(1));
double VC = exp(log(TVVC) + ETA(2));

$OMEGA 0.09 0.09

$ODE
dxdt_GUT = -KA*GUT;
dxdt_CENT = KA*GUT - (CL/VC)*CENT;

$CAPTURE CP
'
mod <- mread(code = code, "demo")

sim_inds <- function(.nind) {
  mod %>% 
    ev(ID = 1:.nind, amt = 100, addl = 2, ii = 12) %>%
    mrgsim(end = 48) 
}

bench <- microbenchmark::microbenchmark(
  sim_inds(1),
  sim_inds(2),
  sim_inds(5),
  sim_inds(10),
  sim_inds(20),
  sim_inds(50),
  sim_inds(100),
  sim_inds(200),
  sim_inds(400),
  sim_inds(600),
  sim_inds(800),
  sim_inds(1000),
  times = 100L
)

data_frame(expr = bench$expr,
           # time in milliseconds
           time = bench$time/1000000) %>%
  group_by(expr) %>%
  summarize_at(vars(time), funs(median)) %>%
  mutate(nids = as_numeric(stringr::str_extract(expr, "\\d+")),
         ntime = time/nids) %>%
  ggplot(aes(x = nids, y = ntime)) + geom_point(size = 3) +
  geom_line() +
  theme_bw() +
  base_theme(
    axis_text_x = 14, 
    axis_title_x = 16
  ) +
  labs(x = "Number Individuals", y = "Time per Individual (ms)") 

plot demonstrating that it is simulating with variability

sim_inds(10) %>% plot

image

dpastoor commented 6 years ago

And regarding how future works, to a large extent it is an abstraction over the more base parallelization packages, that also takes care of a lot of hand-work for you (shuttling global variables, packages, etc into the forked process). One other very cool feature is that abstraction has also been extended to actual grids as well so you can use it with SGE/torque/SLURM with the same API

So as to penalty, its not a really better/worse than using any of those packages directly from a performance perspective, its just for any forked process, you have to take into account the data transfer time to get everything into the thread, which I've clocked at anywhere from 100 ms-multiple seconds depending on the size of data (eg transferring gigabyte+ objects into each fork).

Thus my rule of thumb is as long as you're at less than ~2000 individuals then you aren't going to see any performance gains, even with optimal forking

dpastoor commented 6 years ago

In particular, in an optimal scenario for parallelization (virtually no data transfer, on a imac with ssd) here is the difference:

image

around 1000 individuals you see parallelization starting to take the lead, but again, in real life with also passing in data or other scenarios or passing large data back I have empirically seen a breakpoint closer to 2000 inds where parallelization makes sense.

dpastoor commented 6 years ago
library(future)
...
sim_inds_parallel <- function(.nind, ncores) {
  plan(multiprocess)
  future_lapply(1:ncores, function(x) {
    sim_inds(.nind/ncores)
  }) # this is still a list really and needs to be combined
}

bench <- microbenchmark::microbenchmark(
  sim_inds_parallel(100, 4),
  sim_inds(100),
  sim_inds_parallel(400, 4),
  sim_inds(400),
  sim_inds_parallel(800, 4),
  sim_inds(800),
  sim_inds_parallel(1000, 4),
  sim_inds(1000),
  sim_inds_parallel(1600, 4),
  sim_inds(1600),
  sim_inds_parallel(2000, 4),
  sim_inds(2000),
  times = 20L
)
kylebaron commented 6 years ago

@dpastoor @mattfidler Good stuff, good discussion. And it looks like @dpastoor is trying to sneak another chapter into his thesis ... just print off this thread and stick it in the back :)

Most large-scale simulation we do involves replicates ... so some number of individuals simulated nrep times. In my typical workflow, I almost take my number-of-individuals and make that a unit simulation (pass everything in that unit to the C++ code, single run) and use the parallelization / forking at the replicate level (split my 1000 replicates over ncores resources).

dpastoor commented 6 years ago

Ha that is ancient history (as of last Thursday) :-)

kylebaron commented 6 years ago

@dpastoor Yeah ... Vijay sent me a picture of the historic event. Congrats!

mattfidler commented 6 years ago

Congratulations @dpastoor , I'm sure it is well deserved!

billdenney commented 6 years ago

Congratulations, @dpastoor!

mattfidler commented 6 years ago

I think @kylebmetrum has a point about discretizing the simulations. I also know that with RxODE, some R code is faster ie ~1 sec for 1800 subjects (single solve without parallization or ~0.5 ms/subject). The same simulation with for takes >10 minutes...