mschubert / clustermq

R package to send function calls as jobs on LSF, SGE, Slurm, PBS/Torque, or each via SSH
https://mschubert.github.io/clustermq/
Apache License 2.0
146 stars 27 forks source link

ISSUE: foreach with doParallel and clustermq backends returns different results #126

Closed Zhuk66 closed 5 years ago

Zhuk66 commented 5 years ago

I am trying to implement nested loops using clustermq and doParallel. I noticed incosistent behavior. For example:

setwd("~/zmq")
library(clustermq)
library(itertools)
library(foreach)
library(doParallel)
library(purrr)

myFunc <- function(x, ...) {
  list(return=1:x, error=rep(NA, x))
}

testCMQ <- function(x, parallel = FALSE, cmq = FALSE, ...) {

  args <- list(...)

  exports <- list(isplitVector=isplitVector, `%dopar%`=`%dopar%`, parallel=parallel, 
                  foreach=foreach, registerDoParallel=registerDoParallel, 
                  myFunc=myFunc, safely=safely, args=args)

  if(parallel) {
    if(cmq) {
      register_dopar_cmq(n_jobs=8, memory=4096, export=exports)
    } else {
      registerDoParallel(cores=4)
    }
  } else {
      registerDoSEQ()
  }

  rv <- foreach(chunkIt = isplitVector(1:x, chunkSize=4),
                      .combine  = append,
                      .inorder  = TRUE,
                      .verbose  = FALSE ) %dopar% {

                        registerDoParallel(cores=4)
                        foreach( v = isplitVector(chunkIt,  chunkSize=1),
                                 .inorder  = TRUE,
                                 .verbose  = FALSE) %dopar% {

                                   args <- c(list(x=v), args)
                                   do.call(safely(myFunc), args)
                                 }
                      } %>% transpose
     return(rv)
}
  1. parallel execution with clustermq:
> res <- testCMQ(10, parallel=T, cmq=T)
Submitting 3 worker jobs (ID: 7288) ...
Running 3 calculations (1 calls/chunk) ...
Master: [1.5s 2.6% CPU]; Worker: [avg 98.8% CPU, max 242.5 Mb]                             
Warning messages:
1: Element 2 has length 3 not 4 
2: Element 3 has length 3 not 4 
> length(res)
[1] 4 
  1. Parallel execution with doParallel
> res <- testCMQ(10, parallel=T, cmq=F)
> length(res)
[1] 2
  1. Sequential execution:
> res <- testCMQ(10, parallel=F)
> length(res)
[1] 2
mschubert commented 5 years ago

Can you simplify your example a bit?

For instance, the inner foreach should never change as far as I understand.

Zhuk66 commented 5 years ago

Internal cycle can be executed using registerDoParallel or registerDoSEQ, it does not change the result. However the result is different if we use registerDoParallel (registerDoSEQ) or register_dopar_cmq (before external foreach).

Zhuk66 commented 5 years ago

Here is hack that solved this issue for me: In master.r I added a function: flatten <- function(data, rettype) { res <- list() for(i in 1:length(data)) { for(j in 1:length(data[[i]])) { res <- c(res, data[[i]][j]) } } type <- paste0('as.', rettype) res <- do.call(type, list(res)) return(res) }

... and I am calling it right after we've finished building a job_result: 83 } else # or else shut it down 84 qsys$send_shutdown_worker() 85 } 86 87 job_result <- flatten(job_result, rettype) 88 89 if (qsys$reusable || qsys$cleanup()) 90 on.exit(NULL) 91 92 summarize_result(job_result, n_errors, n_warnings, cond_msgs, 93 min(submit_index)-1, fail_on_error) 94 }

mschubert commented 5 years ago

Minimal example:

register_dopar_cmq(n_jobs=0)
res = foreach(i=1:3, .combine=c) %dopar% sqrt(i)
cmp = foreach(i=1:3, .combine=c) %do% sqrt(i)

Fixed in 1f045e2.