mschubert / clustermq

R package to send function calls as jobs on LSF, SGE, Slurm, PBS/Torque, or each via SSH
https://mschubert.github.io/clustermq/
Apache License 2.0
145 stars 26 forks source link

Trouble using clustermq as the backend for BiocParallel #302

Closed bhayete-empress closed 7 months ago

bhayete-empress commented 1 year ago

To begin, I've read the description here: https://mschubert.github.io/clustermq/articles/userguide.html#troubleshooting I've just cross-posted the issue here: https://support.bioconductor.org/p/9152460/ The crux of the matter is that the example in the troubleshooting guide for ClusterMq works for foreach but bplapply generates errors along the liness of

(Error #1) could not find function ".bpworker_EXEC"

For reference, here's the example that I tried to reproduce: clustermq::register_dopar_cmq(n_jobs=2, memory=1024) # this accepts same arguments as Q x = foreach(i=1:3) %dopar% sqrt(i) # this will be executed as jobs

> Running sequentially ('LOCAL') ...

As BiocParallel supports foreach too, this means we can run all packages that use BiocParallel on the cluster as well via DoparParam.

library(BiocParallel) register(DoparParam()) # after register_dopar_cmq(...) bplapply(1:3, sqrt)

bhayete-empress commented 1 year ago

I thought I'd add a small, complete, reproducible example, along with the template file:


library(foreach)
library(clustermq)
library(doParallel)
library(BiocParallel)

TIMEOUT = 10000
NJOBS = 100
options(
  clustermq.scheduler = "slurm",
  clustermq.template = 'slurmMq.tmpl',
  clustermq.data.warning=5000 #megabytes
)
register_dopar_cmq(n_jobs=NJOBS,
                   fail_on_error=FALSE,
                   verbose=TRUE,
                   log_worker=TRUE,
                   timeout = TIMEOUT, #how long to wait on MQ side
                   pkgs=c('BiocParallel'), 
                   template=list(
                     timeout=TIMEOUT, #how long to wait on SLURM side
                     memory=5000,
                     cores=1,#how many cores to use (to throttle down memory usage),
                     partition = 'compute',
                     r_path = file.path(R.home("bin"), "R")
                   )  
)

print(paste(getDoParWorkers(), "workers", sep = '_'))
p <- DoparParam()
register(p, default=TRUE)
x = foreach(i=1:300) %dopar% sqrt(i)
bpoptions(bplog=TRUE, log=TRUE, packages='BiocParallel', workers = NJOBS, tasks = NJOBS,
          exportglobals=TRUE, exportvariables=TRUE)
x2 = bplapply(1:300, sqrt, BPPARAM=p)
#!/bin/bash

#SBATCH --job-name={{ job_name }}
#SBATCH --output={{ log_file | /dev/null }}
#SBATCH --error={{ log_file | /dev/null }}
#SBATCH --mem-per-cpu={{ memory | 7000 }}
#SBATCH --partition={{ partition }} #intentionally no default - be cognizant of where you are running!
#SBATCH --array=1-{{ n_jobs }}
#SBATCH --cpus-per-task={{ cores | 1 }}
#SBATCH --time={{ timeout }}
##SBATCH --log_file="/path/to.file.%a"

CMQ_AUTH={{ auth }} {{ r_path }} --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
mschubert commented 1 year ago

Yes, that's a bug - I can confirm on my machine with the CRAN version

bhayete-empress commented 1 year ago

Is this fixed in the GitHub version, or a problem across the code base?

mschubert commented 11 months ago

The underlying issue is that the following expression is to be evaluated on workers

{
    task <- .task_remake(task, const.value)
    if (task$type == "EXEC")
        value <- .bpworker_EXEC(task)
    else value <- NULL
    list(value = value)
}

but foreach::getexports(expr, e=export_env, env=envir) does not correctly identify .task_remake and .bpworker_EXEC as required on the workers (both are in namespace:BiocParallel, the latter unexported).

By contrast, future:::getGlobalsAndPackages(expr, envir=envir) does identify both .bpworker_EXEC and the BiocParallel package. Using the globals package, the use should probably be:

globals::globalsOf(expr, envir=envir)

but for now I'm getting:

Error: Identified global objects via static code inspection ({; task <- .task_remake(task, const.value); if (task$type== "EXEC"); value <- .bpworker_EXEC(task); else value <- NULL; list(value = value); }). Failed to locate global object in the relevant environments: ‘task’

A possible workaround is:

register(DoparParam())
register_dopar_cmq(n_jobs=2, memory=1024, pkgs="BiocParallel", export=list(
    .bpworker_EXEC=BiocParallel:::.bpworker_EXEC,
    .log_buffer_get=BiocParallel:::.log_buffer_get,
    .log_data=BiocParallel:::.log_data,
    .log_buffer_init=BiocParallel:::.log_buffer_init,
    .VALUE=BiocParallel:::.VALUE
))
bplapply(1:3, sqrt)

Pinging @HenrikBengtsson, what would your current recommendation be to get DoparParam() working with the missing BiocParallel exports? (ideally minimizing the number of dependencies outside of foreach)

bhayete-empress commented 11 months ago

Thank you, this works for the test example; will now try in real code to double-check, but it's looking great!

mschubert commented 7 months ago

This is now fixed in the current version of develop and will be included in the next CRAN release.

Thanks for reporting!