HenrikBengtsson / future.batchtools

:rocket: R package future.batchtools: A Future API for Parallel and Distributed Processing using batchtools
https://future.batchtools.futureverse.org
84 stars 9 forks source link

Simple chunking with nested parallelism #72

Closed rrichmond closed 1 year ago

rrichmond commented 3 years ago

Hello,

I'm trying to get nested parallelism and some sort of chunking to work in a simple way with doFuture and %dopar%. Here's a simple example:

library(doFuture)
library(future.batchtools)

registerDoFuture()

plan(list(tweak(batchtools_slurm,
                template = "./slurm-simple.tmpl",
                resources = list(ncpus = 25,
                                 memory = "48GB",
                                 walltime="2:00:00",
                                 nodes=1)),
          multicore),
     workers=4)

## I want to loop over allvals and have the code run on the 4 worker, each in parallel across the 25 cores. The following works:

allvals <- 1:100

## This is custom chunking, but doesn't scale well and requires me to customize every time I change the number of values I want to loop over
out <- foreach(idx=1:4) %dopar% {
    outinner <- foreach(idx2=1:25) %dopar% {
        Sys.sleep(120)
        idx*idx2
    }
}

## What I really want is simple chunking of the entirety of allvals, but for the multicore inner plan to work. Something like the following would be great to be able to do and to have load balancing across the 4x25 cores:

out <- foreach(idx=allvals) %dopar% {
        Sys.sleep(120)
        idx
}

The issue with this later version is that it only runs on a single core on each of the 4 workers. Is there a simple way to distribute each of the 100 values to the 4 workers (25 to each, order doesn't matter) and ensure that on each worker the jobs run in parallel across the 25 cores? I find myself running into this sort of problem often with my cluster jobs, but haven't found a good solution with doFuture.

Thanks!

HenrikBengtsson commented 1 year ago

Sorry for the two-year radio silence here.

You've got the idea correct, but you placed the workers = 4 argument in the wrong location. You can see this if we take your:

plan(list(tweak(batchtools_slurm,
                template = "./slurm-simple.tmpl",
                resources = list(ncpus = 25,
                                 memory = "48GB",
                                 walltime="2:00:00",
                                 nodes=1)),
          multicore),
     workers=4)

and then look at what it sets up:

> plan("list")
List of future strategies:
1. batchtools_slurm:
   - args: function (..., template = "./slurm-simple.tmpl", resources = list(ncpus = 25, memory = "48GB", walltime = "2:00:00", nodes = 1), workers = NULL, envir = parent.frame())
   - tweaked: TRUE
   - call: plan(list(tweak(batchtools_slurm, template = "./slurm-simple.tmpl", resources = list(ncpus = 25, memory = "48GB", walltime = "2:00:00", nodes = 1)), multicore), workers = 4)
2. multicore:
   - args: function (..., workers = availableCores(constraints = "multicore"), envir = parent.frame())
   - tweaked: FALSE
   - call: plan(list(tweak(batchtools_slurm, template = "./slurm-simple.tmpl", resources = list(ncpus = 25, memory = "48GB", walltime = "2:00:00", nodes = 1)), multicore), workers = 4)

Note that the first ("outer") layer has workers = NULL, which corresponds to the default workers = 100L, and the second ("inner") layer has workers = 4.

This means that your inner parallelization only use four (4) workers. You can also see this if you query the process ID via your two foreach loops:

res <- foreach(idx = 1:4, .combine = rbind) %dopar% {
  pid <- Sys.getpid()
  nworkers <- nbrOfWorkers()  ## Number of "inner" workers 
  foreach(idx2 = 1:25, .combine = rbind) %dopar% {
    data.frame(idx = idx, idx2 = idx2, nworkers = nworkers, pid = pid, pid2 = Sys.getpid())
  }
}
> res
    idx idx2 nworkers     pid    pid2
1     1    1        4 1965097 1965106
2     1    2        4 1965097 1965106
3     1    3        4 1965097 1965106
4     1    4        4 1965097 1965106
5     1    5        4 1965097 1965106
6     1    6        4 1965097 1965106
7     1    7        4 1965097 1965108
8     1    8        4 1965097 1965108
...
97    4   22        4 1965109 1965128
98    4   23        4 1965109 1965128
99    4   24        4 1965109 1965128
100   4   25        4 1965109 1965128

If you inspect the output, you'll see that there are only four unique pid2:s for each pid.

A corrected configuration is:

plan(list(
  tweak(batchtools_slurm, workers = 4,
        template = "./slurm-simple.tmpl",
        resources = list(ncpus = 25,
                         memory = "48GB",
                         walltime = "2:00:00",
                         nodes = 1)),
  multicore
))

with this, the PID example gives:

> res
    idx idx2 nworkers     pid    pid2
1     1    1       25 1946061 1946065
2     1    2       25 1946061 1946067
3     1    3       25 1946061 1946069
4     1    4       25 1946061 1946076
5     1    5       25 1946061 1946081
6     1    6       25 1946061 1946083
...
97    4   22       25 1946064 1946195
98    4   23       25 1946064 1946200
99    4   24       25 1946064 1946203
100   4   25       25 1946064 1946204

As you can see, there 25 unique pid2:s for each pid.

Now, there should be no need for you to specify workers = 4 at all. As I mentioned above, the default is workers = 100.