mschubert / clustermq

R package to send function calls as jobs on LSF, SGE, Slurm, PBS/Torque, or each via SSH
https://mschubert.github.io/clustermq/
Apache License 2.0
146 stars 27 forks source link

Work hangs: mclapply() parallelism within clustermq jobs #103

Closed wlandau closed 6 years ago

wlandau commented 6 years ago

I suspect this is related to #99, but it is an important use case, so I thought I should post something for the record. Feel free to close if you think R-devel already fixed it.

The following little drake pipeline sends jobs to an SGE cluster, and each job uses mclapply() to parallelize its own work. It hangs when mc.cores is greater than 1, and it completes normally (and very quickly) when mc.cores equals 1. I am using https://github.com/ropensci/drake/commit/c6395ee129112d1bdc71b45d1362d4eb5d13ca86, and https://github.com/mschubert/clustermq/commit/ecfdb9da9870b6434f6bf689da6a3cbb94f38a2f. Other session info is here.

library(drake)
library(magrittr)
options(
  clustermq.scheduler = "sge",
  clustermq.template = "sge_clustermq.tmpl"
)
plan <- drake_plan(
  x = parallel::mclapply(1:4 + i__, sqrt, mc.cores = 4),
  y = mean(unlist(x_i__))
) %>%
  evaluate_plan(wildcard = "i__", values = 1:8)
clean(destroy = TRUE)
make(plan, verbose = 4, parallelism = "clustermq", jobs = 8)

The template file makes sure each job gets 4 cores.

# From https://github.com/mschubert/clustermq/wiki/SGE
#$ -N {{ job_name }}               # job name
#$ -t 1-{{ n_jobs }}               # submit jobs as array
#$ -j y                            # combine stdout/error in one file
#$ -o {{ log_file | /dev/null }}   # output file
#$ -cwd                            # use pwd as work dir
#$ -V                              # use environment variable
#$ -pe smp 4                       # request 1 core per job
module load R
ulimit -v $(( 1024 * {{ memory | 4096 }} ))
R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'

We get pretty far along in the workflow, but it hangs before starting x_8.

cache /lrlhps/users/c240390/reprex-cmq/.drake
analyze environment
analyze 6 imports: ld, wd, td, spell_check_ignore, plan, dl
analyze 16 targets: x_1, x_2, x_3, x_4, x_5, x_6, x_7, x_8, y_1, y_2, y_3, y_...
construct graph edges
construct vertex attributes
construct graph
import parallel::mclapply
import sqrt
import mean
import unlist
Submitting 8 worker jobs (ID: 7227) ...
target x_1
target x_2
target x_3
target x_4
target x_5
target x_6
target x_7
target x_8
target y_1
load 1 item: x_1
target y_2
unload 1 item: x_1
load 1 item: x_2
target y_4
unload 1 item: x_2
load 1 item: x_4
target y_3
unload 1 item: x_4
load 1 item: x_3
target y_6
unload 1 item: x_3
load 1 item: x_6
target y_7
unload 1 item: x_6
load 1 item: x_7

qstat shows that some, but not all, of the workers are still running.

job-ID     prior   name       user         state submit/start at     queue                          jclass                         slots ja-task-ID
------------------------------------------------------------------------------------------------------------------------------------------------
  37636485 0.55350 cmq7227    CENSORED     r     08/27/2018 15:27:52 CENSORED                                         4 3
  37636485 0.55350 cmq7227    CENSORED     r     08/27/2018 15:27:54 CENSORED                                         4 4
  37636485 0.55350 cmq7227    CENSORED     r     08/27/2018 15:27:54 CENSORED                                         4 5
  37636485 0.55350 cmq7227    CENSORED     r     08/27/2018 15:28:03 CENSORED                                         4 6
  37636485 0.55350 cmq7227    CENSORED     r     08/27/2018 15:28:06 CENSORED                                         4 7
  37636485 0.55350 cmq7227    CENSORED     r     08/27/2018 15:28:10 CENSORED                                         4 8
mschubert commented 6 years ago

Can you try this with clustermq only and post the worker log file?

Q(..., template=list(log_file="..."))
wlandau commented 6 years ago
options(
  clustermq.scheduler = "sge",
  clustermq.template = "sge_clustermq.tmpl"
)
library(clustermq)
f <- function(i){
  parallel::mclapply(1:4 + i, sqrt, mc.cores = 4)
}
Q(f, 1:8, n_jobs = 8, template = list(log_file = "log.txt"))
#> Submitting 8 worker jobs (ID: 6424) ...
#> Running 8 calculations (1 calls/chunk) ...
#> [===============================>--------------------]  62% (4/4 wrk) eta:  6s

At this point, the work hung, so I sent SIGINT with CTRL-C.

^CError in rz mq::poll.socket(list(private$socket), list("read"), timeout = msec) :
  The operation was interrupted by delivery of a signal before any events were available.
Calls: Q ... master -> <Anonymous> -> <Anonymous> -> <Anonymous>
^CExecution halted

Log file:

> clustermq:::worker("tcp://CLUSTER-LOGIN-NODE:6424")
Master: tcp://CLUSTER-LOGIN-NODE:6424
WORKER_UP to: tcp://CLUSTER-LOGIN-NODE:6424
> DO_SETUP (0.000s wait)
token from msg: ubust
> WORKER_STOP (0.000s wait)
shutting down worker

Total: 0 in 0.00s [user], 0.00s [system], 0.01s [elapsed]
mschubert commented 6 years ago

Thank you, I could reproduce this now: it was caused by poll.socket() returning NULL on non-critical interrupt, which was not handled properly by the worker (from mschubert:rzmq/signal)

wlandau commented 6 years ago

Fixed on my end. Thanks very much.