HenrikBengtsson / parallelly

R package: parallelly - Enhancing the 'parallel' Package
https://parallelly.futureverse.org
130 stars 7 forks source link

HELP WANTED: Agility of availableCores() #17

Open HenrikBengtsson opened 9 years ago

HenrikBengtsson commented 9 years ago

parallelly::availableCores() returns the number of cores available for multicore processing. R itself provides parallel::detectCores() for this. There is also the mc.cores option (default to environment variable MC_CORES) set when parallel is loaded. Beyond this, various systems/setups set specific environment variables to reflect the number of available/allocated cores. For instance, resource manager PBS, sets environment variable PBS_NUM_PPN on the compute node specifying the number of allotted cores.

Currently, availableCores() defaults to return the first valid value of (in order):

  1. PBS_NUM_PPN
  2. mc.cores (and MC_CORES)
  3. parallel::detectCores()

I would like to add support for more resource/workload managers and other distributed processing environments. For instance,

What else am I missing?

I appreciate any feedback on what environment variables or commands that are available to a compute node to query the number of allotted cores, iff at all. Please try to provide links to documentations if you can.

References

HenrikBengtsson commented 8 years ago

For Slurm, I believe we should use SLURM_CPUS_PER_TASK. This from running a few sample jobs:

> sbatch --cpus-per-task=1 hello.sh
> sbatch --cpus-per-task=2 hello.sh
HenrikBengtsson commented 8 years ago

I just found http://slurm.schedmd.com/rosetta.pdf (added reference to top post).

HenrikBengtsson commented 8 years ago

@veseshan, you've mentioned that you work with a Sun Grid Engine (SGE) cluster. Do you know which environment variable SGE sets on the computer node process indicating how many tasks/cores/processes it allotted to the job? availableCores() tries to be agile to these type of cluster settings, but I haven't figured out this for SGE. Full details, see this issue on GitHub.

HenrikBengtsson commented 7 years ago

On TORQUE / PBS, there's an alternative to

qsub -l nodes=1:ppn=32 foo.pbs

which set PBS_NUM_NODES=1 and PBS_NUM_PPN=32 (and PBS_NP=32), for requesting a machine for 32 parallel tasks. One can also do:

```sh
qsub -l procs=32 foo.pbs

which set PBS_NP=32 (and PBS_NUM_NODES=1 and PBS_NUM_PPN=1).

In other words, availableCores() should be updated to (also) look at PBS_NP.

HenrikBengtsson commented 7 years ago

Actually, it could be that my TORQUE / Moab test system is faulty when it comes to interpreting -l procs=<x>. I think it should always be that PBS_NP = PBS_NUM_NODES * PBS_NUM_PPN, which is true when using -l nodes=<y>:ppn=<z>. However, when using -l procs=<x> it is only PBS_NP that is set but not PBS_NUM_NODES and PBS_NUM_PPN, which are both 1 (one). Looking allocated set of nodes (e.g. qstat) it looks like we should only trust PBS_NUM_NODES and PBS_NUM_PPN.

My own tests give:

$ echo "export | grep PBS_" | qsub -l nodes=1:ppn=4 -
=> 700801: PBS_NP=4, PBS_NUM_NODES=1, PBS_NUM_PPN=4

$ echo "export | grep PBS_" | qsub -l nodes=2:ppn=4 -
=> 700802: PBS_NP=8, PBS_NUM_NODES=2, PBS_NUM_PPN=4

$ echo "export | grep PBS_" | qsub -l procs=8 -
=> 700803: PBS_NP=8, PBS_NUM_NODES=1, PBS_NUM_PPN=1

$ echo "export | grep PBS_" | qsub -l nodes=2 -l procs=3 -
=> 700804: PBS_NP=3, PBS_NUM_NODES=2, PBS_NUM_PPN=1

$ echo "export | grep PBS_" | qsub -l procs=3 -l nodes=2 -
=> 700806: PBS_NP=3, PBS_NUM_NODES=2, PBS_NUM_PPN=1

BTW, qstat -n -1 shows this too, i.e. NDS = PBS_NUM_NODES and TSK = NDS * PBS_NUM_PPN:

$ qstat -n -1 -u $USER

ob ID  Job name         PID      NDS    TSK    RAM      Time S     Since   Nodes/cores
------- ---------------- ------ ----- ------ ------ --------- - ---------   -----------
700801  STDIN               --      1      4    --   99:23:59 Q       --     -- 
700802  STDIN               --      2      8    --   99:23:59 Q       --     -- 
700803  STDIN               --      1      1    --   99:23:59 Q       --     -- 
700804  STDIN               --      2      2    --   99:23:59 Q       --     --
700806  STDIN               --      2      2    --   99:23:59 Q       --     -- 

See also torqueusers thread 'only one processor is used when using qsub -l procs flag', Jan 2012, http://www.supercluster.org/pipermail/torqueusers/2012-January/013959.html. In that thread it's suggested that it could be a configuration issue in Maui (open-source replacement for Moab) or a bug in Maui.

From this I conclude that it's best to ignore PBS_NP and always rely on PBS_NUM_NODES and PBS_NUM_PPN. For availableCores() we're only interested in the latter.

Phhere commented 5 years ago

I think new Versions of PBSPro are not supported correctly. It now uses OMP_NUM_THREADS. You could also fallback to nprocs on Unix to respect cgroups

HenrikBengtsson commented 5 years ago

Thanks @Phhere. I don't have access to PBSPro myself. Do you have access to a PBSPro environment? If so, would you mind submitting a simple job that calls env and then report back here on what looks like PBS-related environment variables? Don't cut'n'paste secrets! I'd expect there to be some PBS_* env vars and I'd be surprised if OMP_NUM_THREADS is the only thing set to control the number of parallel tasks.

Phhere commented 5 years ago

Hello, no Problem

NCPUS=10 OMP_NUM_THREADS=10 PBS_ACCOUNT=BenchMarking PBS_ENVIRONMENT=PBS_BATCH PBS_JOBID=3947145.** PBS_JOBNAME=multi_core.sh PBS_NODEFILE=/var/spool/pbs/aux/3947145.***** PBS_NODENUM=0 PBS_O_LANG=de_DE.UTF-8 PBS_O_QUEUE=default PBS_O_SHELL=/bin/bash PBS_O_SYSTEM=Linux PBS_O_WORKDIR=/home/phreh100/test PBS_QUEUE=short PBS_TASKNUM=1

This was a job with 1 Chunk of 10 Cores. So you could use $NCPUS or $OMP_NUM_THREADS

Do you support submitting jobs to multiple servers? Because PBSPro allow to request multiple servers / chunks within one job and than you can use ssh / pdsh or something else to distribute your job

HenrikBengtsson commented 5 years ago

Thanks.

Yes, availableWorkers() should parse the PBS_NODEFILE file and using plan(cluster) will default to that set of workers.

HenrikBengtsson commented 5 years ago

@Phhere, I've updated the develop branch to have availableCores() recognize NCPUS as well. Please give it a try:

remotes::install_github("HenrikBengtsson/future@develop")

and see if future::availableCores() returns what you expect given various job submission setups.

Also, if you could play with various multinode requests (e.g. -l nodes=3:ppn=2) and see what NCPUS is set to on the master process, that would be great. On my Torque/PBS system, I get:

echo 'echo "NCPUS=$NCPUS"' | qsub -l nodes=1:ppn=2  ## => NCPUS=2
echo 'echo "NCPUS=$NCPUS"' | qsub -l nodes=2:ppn=2  ## => NCPUS=2
echo 'echo "NCPUS=$NCPUS"' | qsub -l nodes=3:ppn=1  ## => NCPUS=1
leitec commented 5 years ago

On Linux, you may want to consult a process's CPU affinity mask. This would cover cases where a process can only use a subset of the system's available cores, with e.g. taskset or cgroup's cpusets, like Docker's --cpuset-cpus option.

The mask is available from /proc. I attempted a quick-and-dirty R implementation:

count_allowed_cpus <- function() {
        ## read affinity mask for current process
        self_stat <- readLines("/proc/self/status")
        mask_line <- grep("^Cpus_allowed:", self_stat, value=TRUE)
        mask_raw <- gsub("^Cpus_allowed:[[:space:]]+([0-9a-f,]+)$", "\\1", mask_line)

        ## the mask may be split with commas
        mask_clean <- gsub(',', '', mask_raw)

        ## the mask contains a binary 1 for each CPU we're
        ## allowed to use. we can get a total CPU count by
        ## summing the binary digits.
        mask_int <- strtoi(mask_clean, base=16)
        sum(as.integer(intToBits(mask_int)))
}

On my four-core system, this gets the correct count:

$ Rscript count_cpus.R 
[1] 4
$ taskset -c 0 Rscript count_cpus.R 
[1] 1
$ taskset -c 0,2 Rscript count_cpus.R 
[1] 2
$ taskset -c 1,2,3 Rscript count_cpus.R 
[1] 3
$ 

On the systems I've been able to test, an unbound process's mask has only as many ones as there are cores on the machine. It will return nonsense on other systems where the default mask is a wordful of ones, e.g. ffffffff. That would overflow the integer type, at the least.

HenrikBengtsson commented 5 years ago

@leitec, thanks again for these pointers. I'll see if I can incorporate it. I might start by adding it as an internal function for people to try out until everything has been figured out. Do you have any official references/manual where the Cpus_allowed field is described? It would be nice to be able to decide when its value can be used and when it cannot be used (e.g. ffffffff).

Having said this, this seems like something that parallel::detectCores() should/could support, either by default or via an additional argument, e.g. affinityMask=TRUE. May I recommend that you propose this on the R-devel mailing list. (It could be that there is some old thread on this already, don't know, but I wouldn't be surprised if someone proposed it but then it was forgotten about).

EDIT: Here's a tweaked version:

#' @importFrom utils file_test
countAllowedCPUs <- function() {
  pathname <- "/proc/self/status"
  if (!file_test("-f", pathname)) return(NA_integer_)

  ## Read affinity mask for the current process
  self_stat <- readLines(pathname, warn = FALSE)

  ## Identify the affinity-mask entry
  pattern <- "^Cpus_allowed:[[:space:]]+([0-9a-f,]+)$"
  mask_line <- grep(pattern, self_stat, value = TRUE)
  if (length(mask_line) == 0L) return(NA_integer_)
  if (length(mask_line) > 1L) {
    warning("Detected more than one 'Cpus_allowed' entry in ", sQuote(pathname), ", but will only use the first one: ", paste(sQuote(mask_line), collapse = "; "))
    mask_line <- mask_line[1L]
  }

  ## Extract the affinity mask values
  mask_raw <- gsub(pattern, "\\1", mask_line)

  ## The mask may be separated by commas
  mask_clean <- gsub(",", "", mask_raw, fixed = TRUE)

  ## Each CPU available corresponds to binary '1' in the mask
  mask_int <- strtoi(mask_clean, base = 16L)
  mask_bits <- intToBits(mask_int)

  sum(mask_bits == as.raw(1L))
}
leitec commented 5 years ago

Thanks for the tweaked version. My R is quite poor.

I can't find an exact reference on the mask. Perhaps this is a sign that it's not a good idea to use it. I did eventually find a system that I can use with all f's in the mask, and it's not clear why that would be. I could of course look at the Linux kernel source, but that seems like overkill.

I'm looking at another approach that would be appropriate either for inclusion in the parallel package, or perhaps as a tiny package of its own. I'll follow up when I have something ready.

HenrikBengtsson commented 5 years ago

You're R code looked just fine to me - it's just me adding a few, sometimes, overly conservative, tweaks. Also, it also helps me working through someone else's code.

Just a wild guess, but it could be that a mask with all f's, i.e. all 1:s, is just saying "all cores may be used". Maybe the proper approach would be to do something like:

  1. Expand all cores into bits, e.g. 4 cores => 1111
  2. Apply bitwise-AND of this to the mask, e.g. 1111 & 1010 = 1010
  3. Cores available: 1010 => 2 cores

If the mask is all 1:s, that bitwise-AND will do nothing.

R code example with a four (4) core machine:

n <- 4L
mask_all <- rep(1L, times=n)
int_mask_all <- sum(2^(seq_along(mask_all)-1) * mask_all)
print(int_mask_all)
## [1] 15

With

mask <- c(1,0,1,1)
int_mask <- sum(2^(seq_along(mask)-1) * mask)
print(int_mask)
## [1] 13
int_mask_avail <- bitwAnd(int_mask_all, int_mask)
print(int_mask_avail)
## [1] 13
mask_avail <- intToBits(int_mask_avail)
sum(mask_avail == as.raw(1L))
## [1] 3

and with all f:s, we get:

mask <- rep(1, times = 16)
int_mask <- sum(2^(seq_along(mask)-1) * mask)
print(int_mask)
## [1] 65535
int_mask_avail <- bitwAnd(int_mask_all, int_mask)
print(int_mask_avail)
## [1] 15
mask_avail <- intToBits(int_mask_avail)
sum(mask_avail == as.raw(1L))
## [1] 4

Again, just a wild guess.

leitec commented 5 years ago

Yeah, that's probably the way to do it. I believe your interpretation of "all fs" meaning any core is correct. On that same system, if I restricted the available cores using taskset or a cpuset, the mask was set as expected. It was just when there was no affinity set that it printed a long string of fs.

However, while looking into this, I came across the nproc utility from GNU coreutils. It's powered by the nproc module from Gnulib: https://github.com/coreutils/gnulib/blob/master/lib/nproc.c which has the nice property of being cross-platform.

I made a trivial R wrapper around the nproc module and it works fine. I just need to clean it up, once I figure out how to do Gnulib's autoconf/automake stuff properly. I'll follow your advice and solicit discussion on R-devel. A cross-platform solution might be more palatable for the developers. If not, I can turn this into a generic package that just does this one thing, and perhaps that could then include the various job schedulers and other systems that don't necessarily use cpusets or set affinity for their processes.