ropensci / drake

An R-focused pipeline toolkit for reproducibility and high-performance computing
https://docs.ropensci.org/drake
GNU General Public License v3.0
1.34k stars 129 forks source link

Future-based within-target parallelism on a Slurm cluster #777

Closed pat-s closed 5 years ago

pat-s commented 5 years ago

Related to #675. I decided to open a new one for clarity.

So we already found out that care needs to be taken when using certain future::plan() variants for within-target parallelism ("multicore", "multiprocess") and mclapply() (which is basically multicore).

Now there is another issue when one wants to specify within-target parallelism:

  1. If multiple future::plan() calls are set to narrow down the need of within-target parallelism of certain targets, these calls will repeatedly outdate the respective targets.
  2. If a global future::plan() is set in drake.R or _drake.R, this setting will not populate to workers on a HPC as these get called in a fresh R session specified in the respective scheduler template.

To solve this, one can set the env variable R_FUTURE_PLAN in the scheduler template right before the workers are spawned or specify options(future.plan = <>) in the .Rprofile. This way, within-target parallelism gets only set once and applies to all workers. The worker value needs to be set when invoking R via the scheduler template using the command-line option --parallel=<ncores>. It should be noted that the worker number should be set to the value of the target that needs most workers and that no differentiation between targets can be made.

(While this works in theory and also locally on my machine, the workers on the HPC will start R but do not start with the processing with option --parallel. I see nothing in the log files.)

So if certain targets require a smaller amount of workers due to potential memory issues, the only option to specify an upper limit would be the --cpus-per-task in the Slurm template. However, this is not a long-term solution as this highly limits the use of a cluster if certain targets would actually require a lot of more cores (think of a benchmark study that would be limited to let's say 5 cores because one targets consumes a lot of memory but others could actually use 40+).

To solve this, we could either try to find out why the workers are not spawned when using --parallel or to set target specific resources as proposed here. Maybe @mschubert does know more here?

Resulting slurm.tmpl:

export R_FUTURE_PLAN="multisession"

CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'

@HenrikBengtsson Is there an option in future to set the number of workers directly in the future_*apply() call? Or even just limit the usage of the global value defined in --parallel when starting R? This would help in setting target specific limits within a session.

pat-s commented 5 years ago

As @wlandau suggested in Slack, another alternative is the prework argument in make().

With the new r_make() approach of drake v7.0, this needs to be set in drake_config(prework = quote(future::plan(future.callr::callr, workers = 7)).

Of course, also other future-based plans can be specified.

A further enhancement would be the option to set workers for individual targets, either through clustermq or future.

wlandau commented 5 years ago

Thanks passing that along. Now that I look back at the code base, however, I am remembering that prework is called a bunch of other places, such as the beginning of a make() session and when imports are paralleled on Windows. So future::plan(multisession) may create multiple unwanted parallel socket clusters locally. The prework is meant for quick and cheap prep steps like loading global options and environment variables, and do_prework() loads packages too.

If you use prework this way, I recommend trying future::plan(future.callr::callr) instead, which is safer anyway. Please let me know how that goes.

pat-s commented 5 years ago

Indeed, when using multisession and and canceling the job, the workers keep running even if the job is manually canceled via Slurm. This does not apply when using future.callr::callr.

However, I face errors executing the task when using callr while multisession works fine. :roll_eyes:

wlandau commented 5 years ago

What sorts of errors? Maybe I can help.

pat-s commented 5 years ago
Error: Target `nri_indices` failed. Call `diagnose(nri_indices)` for details. Error message:
  callr failed, could not start R, exited with non-zero status, has crashed or was killed

This one only occurs with future.callr, not with multisession. It occurs in the middle of a target and running with future.callr works for other targets in the plan, so I'm a bit confused what might be going on in the specific target.

pat-s commented 5 years ago

Hm, now the target passed by simply using less workers. I watched the execution also beforehand and the process did not run out of memory when more workers were used. Strange.

wlandau commented 5 years ago

Yeah, from that error, resource constraints seem a likely explanation. Which kind of workers did you scale down, outer clustermq workers or inner future.callr workers? How many were you using before, and how many are you using now?

wlandau commented 5 years ago

Going back to your original post:

If multiple future::plan() calls are set to narrow down the need of within-target parallelism of certain targets, these calls will repeatedly outdate the respective targets.

I am not sure I know what you mean. Is this because you are supplying a specific number of workers to future::plan()?

wlandau commented 5 years ago

So if certain targets require a smaller amount of workers due to potential memory issues, the only option to specify an upper limit would be the --cpus-per-task in the Slurm template.

Wait, does that mean you are using transient workers at the top level, i.e. make(parallelism = "future")? If so, you can use a custom resources column in the plan. But if you are using make(parallelism = "clustermq"), it may not be worth fine-tuning the resource requirements for individual targets. clustermq workers are homogenous, each one can run multiple targets, and there is currently no way to manually require certain targets to go to certain clustemrq workers (worker affinities are a long-term goal). In other words, you are going to get basically the same pool of available resources for each target anyway.

So I guess I am unclear about exactly what you are trying to achieve.

pat-s commented 5 years ago

Which kind of workers did you scale down, outer clustermq workers or inner future.callr workers? How many were you using before, and how many are you using now?

I was scaling down the inner ones from 7 to 3.

I am not sure I know what you mean. Is this because you are supplying a specific number of workers to future::plan()?

I was previously using multiple future::plaN() calls with a different number of workers for specific targets. This is bad practice.

Wait, does that mean you are using transient workers at the top level, i.e. make(parallelism = "future")?

No, I am using 'clustermq`. The resources column for non-transient workers is on my wishlist :)

In other words, you are going to get basically the same pool of available resources for each target anyway.

I know. That is the reason why I have to limit my inner-parallelization workers to match the demand of the task that needs most memory.

wlandau commented 5 years ago

@pat-s, is your workflow up and running with future.callr-based parallelism within targets? Anything else I can do to help?

I was scaling down the inner ones from 7 to 3.

Hmm... 3 seems quite small. In future.callr::callr(), the default for workers is availableCores(), which seems sensible enough for HPC. It is easy to overdo multi-process/multicore parallelism. Even with hyperthreading, the speed gains are not usually too impressive beyond the number of cores.

Were you setting a non-default value of workers when you encountered those callr errors? How many cores per node did you request on SLURM at the time?

I know. That is the reason why I have to limit my inner-parallelization workers to match the demand of the task that needs most memory.

You bring up a good point. Some targets may need more memory per inner worker, reducing the number of inner workers you can use with the given resources. However, I do not think future plans are designed to handle this. In https://github.com/HenrikBengtsson/future/issues/263#issuecomment-445047269, @HenrikBengtsson advised me to not set the plan locally. However, in furrr::future_options() and future.apply::future_lapply(), there are arguments called scheduling and future.scheduling, respectively, which allow you to set the average number of chunks per worker. In other words, you do not have to use all the workers declared in the future plan.

HenrikBengtsson commented 5 years ago

Hi, without having read all of this thread, are you aware that the workers argument can also be a function that is evaluated when the plan is "deployed". That is, if you use a nested plan you can have the second layer to use a function for workers making it agile to whatever you want.

An example from https://github.com/HenrikBengtsson/future/issues/177#issuecomment-354224278:

plan(list(
  tweak(cluster, workers = cl), 
  tweak(multisession, workers = function() { max(1, round(0.7 * availableCores())) })
))

See also https://github.com/HenrikBengtsson/future/issues/235#issuecomment-398243029

HenrikBengtsson commented 5 years ago

... and, yes, if you can achieve what you need by controlling the amount of "chunking" (scheduling / chunk.size), then that is also a nice solution that is invariant to the number of workers.

wlandau commented 5 years ago

I do not have access to a SLURM cluster, but here are some things that work on SGE. I think this is enough for #777, and I will add it to the HPC chapter of the manual.

Locally

Use future.callr normally.

library(drake)

# The targets just collect the process IDs of the callr processes.
plan <- drake_plan(
  x = furrr::future_map_int(1:4, function(x) Sys.getpid()),
  y = furrr::future_map_int(1:4, function(x) Sys.getpid())
)

# Tell the drake targets to fork up to 4 callr processes.
future::plan(future.callr::callr, workers = 4L)

# Build the targets.
make(plan)

# Shows 4 unique process IDs.
readd(x)

Persistent workers

Each persistent worker needs its own future::plan(), so we supply custom code to the prework argument of make().

library(drake)

# The targets just collect the process IDs of the callr processes.
plan <- drake_plan(
  x = furrr::future_map_int(1:4, function(x) Sys.getpid()),
  y = furrr::future_map_int(1:4, function(x) Sys.getpid())
)

# Write a template file for clustermq.
writeLines(
  c(
    "#!/bin/bash",
    "#$ -N {{ job_name }}               # job name",
    "#$ -t 1-{{ n_jobs }}               # submit jobs as array",
    "#$ -j y                            # combine stdout/error in one file",
    "#$ -o {{ log_file | /dev/null }}   # output file",
    "#$ -cwd                            # use pwd as work dir",
    "#$ -V                              # use environment variables",
    "#$ -pe smp 4                       # request 4 cores per job",
    "module load R-qualified/3.5.2      # if loading R from an environment module",
    "ulimit -v $(( 1024 * {{ memory | 4096 }} ))",
    "CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker(\"{{ master }}\")'"
  ),
  "sge_clustermq.tmpl"
)

# Register the scheduler and template file with clustermq.
options(
  clustermq.scheduler = "sge",
  clustermq.template = "sge_clustermq.tmpl"
)

# Build the targets.
make(
  plan,
  parallelism = "clustermq",
  jobs = 2,
  # Each of the two workers can spawn up to 4 local processes.
  prework = quote(future::plan(future.callr::callr, workers = 4L))
)

# Shows 4 unique process IDs.
readd(x) 

Transient workers

As @HenrikBengtsson explained, we can nest our future::plans(). Each target gets its own remote SGE job, and each worker can spawn up to 4 callr processes.

library(drake)

# The targets just collect the process IDs of the callr processes.
plan <- drake_plan(
  x = furrr::future_map_int(1:4, function(x) Sys.getpid()),
  y = furrr::future_map_int(1:4, function(x) Sys.getpid())
)

# Write a template file for future.batchtools.
writeLines(
  c(
    "#!/bin/bash",
    "#$ -cwd                # use pwd as work dir",
    "#$ -j y                # combine stdout/error in one file",
    "#$ -o <%= log.file %>  # output file",
    "#$ -V                  # use environment variables",
    "#$ -N <%= job.name %>  # job name",
    "#$ -pe smp 4           # 4 cores per job",
    "module load R          # if loading R from an environment module",
    "Rscript -e 'batchtools::doJobCollection(\"<%= uri %>\")'",
    "exit 0"
  ),
  "sge_batchtools.tmpl"
)

# In our nested plans, each target gets its own remote SGE job,
# and each worker can spawn up to 4 `callr` processes.
future::plan(
  list(
    future::tweak(
      future.batchtools::batchtools_sge,
      template = "sge_batchtools.tmpl"
    ),
    future::tweak(future.callr::callr, workers = 4L)
  )
)

# Build the targets.
make(plan, parallelism = "future", jobs = 2)

# Shows 4 unique process IDs.
readd(x)

Locked binding errors

Some workflows unavoidably use external packages with mclapply() in the backend. In that case, there are 2 options.

  1. Use make(lock_envir = FALSE).
  2. Use the envir argument of make(). That way, drake locks your special custom environment instead of the global environment.
# Load the main example: https://github.com/wlandau/drake-examples
library(drake)
drake_example("main")
setwd("main")

# Define and populate a special custom environment.
envir <- new.env(parent = globalenv())
source("R/packages.R", local = envir)
source("R/functions.R", local = envir)
source("R/plan.R", local = envir)

# Check the contents of your environments.
ls(envir) # Should have your functions and plan
ls()         # The global environment should only have what you started with.

# Build the targets using your custom environment
make(envir$plan, envir = envir)
HenrikBengtsson commented 5 years ago

Somewhat off-topic comment: The default number of workers is inferred from future::availableCores(), which has the goal of being agile to various environment variables that HPC schedulers set. In other words, ideally you should be able to replace:

future::plan(list(
  future::tweak(future.batchtools::batchtools_sge, template = "sge_batchtools.tmpl")
  future::tweak(future.callr::callr, workers = 4L)
))

with:

future::plan(list(
  future::tweak(future.batchtools::batchtools_sge, template = "sge_batchtools.tmpl")
  future.callr::callr
))

Minimizing the use of hardcoded workers lowers the risk for overusing the system/machines.

Having said this, I don't have access to a Slurm cluster, so I haven't been able to test future::availableCores() there. It's is currently only looking at SLURM_CPUS_PER_TASK - are there other env vars it should acknowledge? Please comment https://github.com/HenrikBengtsson/future/issues/22 if you think this can be improved. Thanks.

pat-s commented 5 years ago

For Slurm, cpus-per-task can be specified by the user. If the value set > number of workers on a node, the job won't start (bc resources are not available).

So now, if the user wants to use all available cores on at least one target, one has to set cpus-per-task to the maximum amount of cpus that the node supports.

If one uses future::availableCores() for within-target parallelism, this works fine unless all targets can deal with the maximum amount of workers.

However, there might be cases in which one loops over a list of length 10 for one target but the user knows that the node can (for whatever reason, most often memory) just deal with a parallelization level of 3. In these cases, using future::availableCores() would overload the node.

The user now has two options:

  1. Either limit the amount of within-target workers to the lower amount possible that works for all targets

  2. Adjust the problematic targets to prevent those from possibly using all cores, e.g. by looping over sub-chunks of the desired list.

1) "Slows down" the plan. 2) Is tedious

Usually this problems applies to targets that either take long to run or take a huge amount of memory. And especially for these targets it would be very helpful to have a parallelization at the maximum amount of possible workers.

I know this is already possible for "transient" workers with future. Hopefully we can get this going soon for "non-transient" workers somehow in the future.

Sorry if all of this was maybe already crystal-clear to all of you.

Minimizing the use of hardcoded workers lowers the risk for overusing the system/machines.

Doesn't it do the opposite in fact? In case where the specified number of workers > number of workers available, the job won't start. But if workers are not limited, some targets might overload the server. Or am I wrong here?