HenrikBengtsson / future.batchtools

:rocket: R package future.batchtools: A Future API for Parallel and Distributed Processing using batchtools
https://future.batchtools.futureverse.org
84 stars 9 forks source link

setting 1 job per worker core after chunking the elements #39

Open yonicd opened 5 years ago

yonicd commented 5 years ago

I am using an sge template.

Is there a native way in future plan to have elements of a list be allocated to each available core in preset number of workers (ie chunking the list)?

ie if i have a workers with 8 cores and a vector (x) of length 16, i want future to send each element to a core on the workers to run in parallel. (assuming i am sending out jobs from the remote master)

sge <- future::tweak(
    future.batchtools::batchtools_sge,
    template = 'batchtools.sge-mrg.tmpl',
    workers = 2,
    resources = list(slots = 1)
  )

  future::plan(list(sge))

  x <- vector('list',16)

  ret <- furrr::future_map(x, .f = foo)
#!/bin/bash

## The name of the job, can be anything, simply used when displaying the list of running jobs
#$ -N <%= job.name %>

## Combining output/error messages into one file
#$ -j y

## Giving the name of the output log file
#$ -o <%= log.file %>

## One needs to tell the queue system to use the current directory as the working directory
## Or else the script may fail as it will execute in your top level home directory /home/username
#$ -cwd

## Use environment variables
#$ -V

## Use correct queue
##$ -q all.q

#$ -pe smp <%= resources[["slots"]] %>

## Export value of DEBUGME environemnt var to slave
export DEBUGME=<%= Sys.getenv("DEBUGME") %>

<%= sprintf("export OMP_NUM_THREADS=%i", resources$omp.threads) -%>
<%= sprintf("export OPENBLAS_NUM_THREADS=%i", resources$blas.threads) -%>
<%= sprintf("export MKL_NUM_THREADS=%i", resources$blas.threads) -%>

Rscript -e 'batchtools::doJobCollection("<%= uri %>")'
exit 0

cc @wlandau

yonicd commented 5 years ago

is this related to this issue? https://github.com/HenrikBengtsson/future.batchtools/issues/18

HenrikBengtsson commented 5 years ago

Not 100% sure I understand what you're aiming for; it could be interpreted in a few different ways. Forgetting about your job scheduler for a while, assume you have two compute nodes n1 and n2, do you want n1 to process 8 of the elements and n2 the remaining 8? Something like:

workers <- rep(c("n1", "n2"), each = 8L)
plan(cluster, workers = workers)

x <- vector("list", length = 16L)
res <- future.apply::future_lapply(x, FUN = foo, future.chunk.size = 1L)

That will create 16 futures, one per element in x, and evaluate them on 8 workers per machine (two compute nodes).

... or are you looking for nested parallelization?

wlandau commented 5 years ago

Another thing: if you need 8 cores per node on SGE, I recommend resources = list(slots = 8).

yonicd commented 5 years ago

@wlandau that doesnt give me the right allocation. if i set slots to 8 then each element uses 8 cores. i wanted to load balance all elements across all the cores available.

in this setup i have the right allocation with internal parallelizing

sge <- future::tweak(
  future.batchtools::batchtools_sge,
  label = 'test2',
  template = 'batchtools.sge-mrg1.tmpl',
  workers = rep(c("n1", "n2"), each = 8L),
  resources = list(slots = 2)
)

future::plan(list(sge,future::multiprocess))

x <- furrr::future_map(1:16,.f = function(x){
  system.time({furrr::future_map(1:8,.f=function(y) Sys.sleep(10))})
})
qstat -f
queuename                      qtype resv/used/tot. load_avg arch          states
---------------------------------------------------------------------------------
all.q@ip-....us-west- BIP   0/8/8          0.35     lx-amd64      
    737 0.55500 test2      yonis        r     02/27/2019 14:53:44     2        
    739 0.55500 test2      yonis        r     02/27/2019 14:53:44     2        
    741 0.55500 test2      yonis        r     02/27/2019 14:53:44     2        
    743 0.55500 test2      yonis        r     02/27/2019 14:53:44     2        
---------------------------------------------------------------------------------
all.q@ip-....us-west-2. BIP   0/8/8          0.30     lx-amd64      
    736 0.55500 test2      yonis        r     02/27/2019 14:53:44     2        
    738 0.55500 test2      yonis        r     02/27/2019 14:53:44     2        
    740 0.55500 test2      yonis        r     02/27/2019 14:53:44     2        
    742 0.55500 test2      yonis        r     02/27/2019 14:53:44     2        

############################################################################
 - PENDING JOBS - PENDING JOBS - PENDING JOBS - PENDING JOBS - PENDING JOBS
############################################################################
    744 0.55500 test2      yonis        qw    02/27/2019 14:53:40     2        
    745 0.55500 test2      yonis        qw    02/27/2019 14:53:41     2        
    746 0.55500 test2      yonis        qw    02/27/2019 14:53:42     2        
    747 0.55500 test2      yonis        qw    02/27/2019 14:53:43     2        
    748 0.00000 test2      yonis        qw    02/27/2019 14:53:44     2        
    749 0.00000 test2      yonis        qw    02/27/2019 14:53:46     2        
> x
[[1]]
   user  system elapsed 
  0.113   0.017  40.185 

[[2]]
   user  system elapsed 
  0.089   0.010  40.149 

[[3]]
   user  system elapsed 
  0.105   0.016  40.179 

[[4]]
   user  system elapsed 
  0.082   0.016  40.157 

[[5]]
   user  system elapsed 
  0.088   0.016  40.160 

[[6]]
   user  system elapsed 
  0.076   0.013  40.155 

[[7]]
   user  system elapsed 
  0.080   0.029  40.165 

[[8]]
   user  system elapsed 
  0.077   0.013  40.156 

[[9]]
   user  system elapsed 
  0.112   0.016  40.184 

[[10]]
   user  system elapsed 
  0.080   0.011  40.148 

[[11]]
   user  system elapsed 
  0.087   0.014  40.158 

[[12]]
   user  system elapsed 
  0.102   0.013  40.186 

[[13]]
   user  system elapsed 
  0.076   0.026  40.158 

[[14]]
   user  system elapsed 
  0.077   0.013  40.154 

[[15]]
   user  system elapsed 
  0.095   0.018  40.169 

[[16]]
   user  system elapsed 
  0.104   0.013  40.179 
HenrikBengtsson commented 5 years ago

It's still not clear to me but have a look at future.apply::future_lapply() and the argument future.scheduling or alternatively future.chunk.size, which allows to you control elements per chunk (=elements per future).

yonicd commented 5 years ago

i added that argument and got a similar result. thanks for the help!

HenrikBengtsson commented 5 years ago

i added that argument ...

to future_lapply()? ... because future_map() don't take "that argument"(?).

Also, https://github.com/cloudyr/googleComputeEngineR/issues/129#issuecomment-467897422 might help/clarify what can be done with nested-levels of workers.

yonicd commented 5 years ago
x <- furrr::future_map(1:16,.f = function(x){
  system.time({furrr::future_map(1:8,.f=function(y) Sys.sleep(10))})
},.options = furrr::future_options(scheduling = 1))
HenrikBengtsson commented 5 years ago

Note that scheduling = 1 is the default, i.e. you haven't changed anything - all load balancing is done at the first layer. Try with future_lapply() and the future.chunk.size option, which might be easier to grasp. When you confirmed that you get what you want there, you can translate it one-to-one to use future.scheduling (see the help page). Then go back to furrr.

yonicd commented 5 years ago

is it possible to allocate slots on a heterogeneous cluster?

for example if i have 2 workers one with 4 cores and another with 8, is there a way to set something along the lines of

workers=rep(c('n1','n2'),c(4,8))

resources(slots=c(2,4))

This would allow me to send jobs to workers while controlling what resources are set for nested parallelization

yonicd commented 5 years ago

i tried using future_apply and the analogous furrr::future_map. i think i understand now how to better control the chunk sizes.

i am confused though on the solution in you gave in the link above nesting two future_lapply calls to cause jobs to load balanced on multiple workers.


sge <- future::tweak(
    future.batchtools::batchtools_sge,
    label = 'pred',
    template = 'batchtools.sge-mrg.tmpl',
    workers = rep(sprintf('n%s',seq_len(10)), each = 8L)
  )

  future::plan(list(sge))

future_sim <- future_lapply(1:10000, FUN = function(x) {
  future_lapply(x, FUN = slow_func)
}, future.chunk.size = 1)

if i do this then sge fills up all the cores properly have jobs waiting in queue

but if i do

future_sim <- future_lapply(1:10000, FUN = slow_func, future.chunk.size = 1)

the jobs seemed to be throttled and not be submitted at once into the queue

what is happening in the nested version?

HenrikBengtsson commented 5 years ago

For clarity, how familiar are you with HPC scheduling outside of R? Knowing that might help me address your questions. The reason why I'm asking is that you very rarely want to specify what compute nodes where your jobs should run, but it could also be that this is the model used on your cluster. Although that is not how it is used, it looks like you are attempting to do exactly that with workers = rep(sprintf('n%s',seq_len(10)), each = 8L)`. Note that my comment in https://github.com/HenrikBengtsson/future.batchtools/issues/39#issuecomment-467870757 was mean to clarify your objectives and it explicitly meant to skip SGE until the problem was understood.

I'd argue that the only reason you should specify argument workers for batchtools_sge() is to control how many concurrently running jobs, i.e. specify an integer. The default is workers = +Inf, which makes batchtools_sge think there's an infinite number of workers. This with cause nbrOfWorkers() to return +Inf. nbrOfWorkers() is important, because it affects how future_lapply(X, ...) and friends chunks up the elements. If nbrOfWorkers() == 1, then all elements in X will be processed by a single job, i.e. lapply(X, ...). If nbrOfWorkers() == 2, the there will be two chunks effectively doing lapply(X[1:(n/2)], ...) and lapply(X[((n/2)+1):n], ...), and so on. With nbrOfWorkers == +Inf, there will be n = length(X) chunks each processing a single element. So, by setting workers=2 for batchtools_sge as you did in your original comment, you'll get two chunks, i.e. two jobs will be used to process your data.

So, I'm still don't understand what you want to achieve. If you could explain how many jobs your want to use to process your data, that would help. If your objective is that you want to use exactly 8 cores per job, then your objective is to split up your X in length(X) / 8 jobs. But it's a bit unclear what you want because you're keep changing your questions (e.g. https://github.com/HenrikBengtsson/future.batchtools/issues/39#issuecomment-467909711) without following up on my request to clarify - it makes it hard to help you. It could be that we're talking past each other. Please try to explain verbatim with a single example how and where you want your X to be processed, and I try to explain how to do that, if it is possible.

yonicd commented 5 years ago

sorry for the confusion.

I am not well versed in HPC terminology. I am working with an elastic cluster (from my understanding workers spin up as needed and scale back down when not).

My use of "workers" was from patching together different issues answers trying to understand how to tweak a template, your explanation above cleared up a lot of my confusion how jobs are allocated.

Thank you for your patience and the thorough explanations!

HenrikBengtsson commented 5 years ago

I am not well versed in HPC terminology. I am working with an elastic cluster (from my understanding workers spin up as needed and scale back down when not).

Oh... I never worked with "elastic clusters" - maybe they and/or your needs requires additional features in the future framework and/or the batchtools package. If you learn something else or manage to narrow down exactly what you need, please follow up.