HenrikBengtsson / doFuture

:rocket: R package: doFuture - Use Foreach to Parallelize via Future Framework
https://doFuture.futureverse.org
84 stars 6 forks source link

Very slow multisession #6

Closed instantkaffee closed 8 years ago

instantkaffee commented 8 years ago

I do not see a any speed improvement when using doFuture. Below code is much slower when using plan(multisession). Do I oversee something ?

library("doFuture")
registerDoFuture()
plan(multisession, workers=4)

mu= 1.0
sigma= 2.0

foo_par= function() {foreach(i = 1:1000) %dopar% { rnorm(i, mean = mu, sd = sigma) }}
foo_normal= function() {lapply(1:1000, function(i) { rnorm(i, mean = mu, sd = sigma) })}

system.time({foo_par()})
system.time({foo_normal()})
> system.time({foo_par()})
       User      System verstrichen 
       0.82        0.04       16.11 
> system.time({foo_normal()})
       User      System verstrichen 
       0.04        0.00        0.04 
> sessionInfo()
R version 3.3.1 (2016-06-21)
Platform: x86_64-w64-mingw32/x64 (64-bit)
Running under: Windows >= 8 x64 (build 9200)
HenrikBengtsson commented 8 years ago

My reply turned out to be much longer than anticipated, but you're probably not the only wondering, so I figured it's worth be as explicit as possible.

There are a few things in play here that explains this, but the gist is:

the overhead of setting up an individual future, internal validation, and sending data back and forth between the main R session and the individual background worker is significantly larger than the time it takes to evaluation each R expression.

Another way to put it, there will always be a threshold where the overhead for parallelizing is greater than the speedup in time. This depends on what type of parallelization is done and how complex the computations are. Also, there are cases where one must parallelize by distributing compations to multiple computers due to memory constraints - in such cases the tradeoff of speed can still be worth it. This is not specific to futures, but to all type of parallelization.

With 1000 iterations and 4 background R worker sessions, a rough estimate is that it's worth parallelizing in this specific case if each iteration would take > 30ms to evaluate, e.g.

> system.time(for (i in 1:1000) { Sys.sleep(0.03); rnorm(i, mean = mu, sd = sigma) })
   user  system elapsed 
  0.204   0.052  30.499

> library("foreach")
> system.time(foreach(i = 1:1000) %do% { Sys.sleep(0.03); rnorm(i, mean = mu, sd = sigma) })
   user  system elapsed 
  1.520   0.044  31.875 

> library("doFuture")
> registerDoFuture()
> plan(multisession, workers = 4L)
> system.time(foreach(i = 1:1000) %dopar% { Sys.sleep(0.03); rnorm(i, mean = mu, sd = sigma) })
   user  system elapsed 
 12.280   0.532  32.008 

In detail, here is an outline what is happening behind the scene:

library("doFuture")

## Globals
mu <- 1.0
sigma <- 2.0

t <- system.time(y <- foreach(i = 1:1000) %do% { rnorm(i, mean = mu, sd = sigma) })
print(t)
##    user  system elapsed 
##   0.312   0.004   0.315

registerDoSEQ()
t <- system.time(y <- foreach(i = 1:1000) %dopar% { rnorm(i, mean = mu, sd = sigma) })
print(t)
##   user  system elapsed 
##  0.364   0.000   0.364

library("doParallel")
cl <- parallel::makeCluster(4L)
registerDoParallel(cl)
t <- system.time(y <- foreach(i = 1:1000) %dopar% { rnorm(i, mean = mu, sd = sigma) })
print(t)
##   user  system elapsed 
##  0.532   0.028  10.523
parallel::stopCluster(cl)

registerDoFuture()
plan(eager)
t <- system.time(y <- foreach(i = 1:1000) %dopar% { rnorm(i, mean = mu, sd = sigma) })
print(t)
##   user  system elapsed 
##  2.808   0.012   2.821

registerDoFuture()
plan(multisession)
t <- system.time(y <- foreach(i = 1:1000) %dopar% { rnorm(i, mean = mu, sd = sigma) })
print(t)
##   user  system elapsed 
##  9.492   0.520  21.965

## Whatabout a simple for loop?
t <- system.time({
  y <- list()
  for(i in 1:1000) y[[i]] <- { rnorm(i, mean = mu, sd = sigma) }
})
print(t)
##   user  system elapsed 
##  0.052   0.000   0.051

## What is the overhead if we use plain sequential futures?
library("listenv")  
plan(eager)
t <- system.time({
  y <- listenv()
  for(i in 1:1000) y[[i]] %<-% { rnorm(i, mean = mu, sd = sigma) }
  y <- as.list(y)
})
print(t)
##   user  system elapsed 
##  1.852   0.000   1.853

## Whatabout multisession futures?
plan(multisession, workers = 4L)
t <- system.time({
  y <- listenv()
  for(i in 1:1000) y[[i]] %<-% { rnorm(i, mean = mu, sd = sigma) }
  y <- as.list(y)
})
print(t)
##   user  system elapsed 
##  8.932   0.424  21.872 

In summary, yes, there's a goal to lower overhead further both in the core future package, but also the doFuture package. But regardless, it will never be worth parallelizing very simple R expression that are finishes quickly. Of course, one can always parallelize in chunks and thereby lower the total parallelization overhead - the foreach package provides some of this via the use of different iterators, cf. help("foreach", package="foreach").

Hope this helps

PS. I think you meant plan(multisession, workers = 4) without () because the latter gives an immediate error.

HenrikBengtsson commented 8 years ago

BTW, I just now realized that you took the example from the vignette. I should spend more time on the doFuture vignette, e.g. clarify it's just a proof-of-concept toy example, or write a more useful example where it actually makes sense to parallelize.

instantkaffee commented 8 years ago

Thx, helps a lot to understand.

paullevchuk commented 8 years ago

Henrik, frankly speaking I understand your points but don't understand how doParallel with the same overhead does its job much better. It seems that doFuture does something wrong on Windows while doParallel does it right.

library(parallel)
library(doFuture)
registerDoFuture()
cl <- makeCluster(4)
plan(cluster, workers=cl)

x <- iris[ which(iris[,5] != "setosa"), c(1,5)]
trials <- 10000

ptime <- system.time({
     r <- foreach(icount(trials), .combine=cbind) %dopar% {
         ind <- sample(100, 100, replace=TRUE)
         result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
         coefficients(result1)
         }
     })[3]
ptime
## elapsed 177.12

ptime <- system.time({
     r <- foreach(icount(trials), .combine=cbind) %do% {
         ind <- sample(100, 100, replace=TRUE)
         result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
         coefficients(result1)
     }
 })[3]
ptime
## elapsed 19.19 

library(doParallel)
registerDoParallel(cores = 4)
getDoParWorkers()
## 4

ptime <- system.time({
     r <- foreach(icount(trials), .combine=cbind) %dopar% {
         ind <- sample(100, 100, replace=TRUE)
         result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
         coefficients(result1)
     }
})[3]
ptime
## elapsed 7.31 
HenrikBengtsson commented 8 years ago

Thanks for this feedback. The comments from both of you got me to bump up the priority on adding support for automatic chunking of iterations which I meant to look into since the start. I've created Issue #7 specifically for this. I think that one explains most of the performance differences reported here.

Anyway, I should clarify that the main priority for creating doFuture in the first place was to bring in support for compute cluster processing via the foreach framework, which AFAIK I know is currently not possible. There's a lot of foreach developers out there and I though this was an obvious enhancement. It was also a very simple one - in the first version of doFuture it only took ~35 lines of code to marry foreach with future.BatchJobs. In the most recent version, we're now up to a whopping 60 lines of code ;)

Here's an foreach HPC example adopted from my useR talk. First, without parallel processing processing all data take ~10 days to process:

## Find our 40 FASTQ files
fastq <- dir(pattern = "[.]fq$")           ## 200 GB each!
## Align them
bam <- lapply(fastq, FUN = DNAseq::align)  ## 6 hours each!

For those with access to HPC resources and who prefer to use foreach, they can now do:

library("doFuture")
registerDoFuture()      ## (a) Tell foreach to use futures
library("future.BatchJobs")
plan(batchjobs_slurm)   ## (b) Resolve via Slurm scheduler

foreach(i = seq_along(fastq), .export = "fastq") %dopar% {
  DNAseq::align(fastq[i])
}

Since the plyr package utilizes foreach internally (if you ask it too), users who fancy plyr can do:

bam <- plyr::llply(fastq, DNAseq::align, .parallel = TRUE)

In both cases, each iteration will create a unique job on the HPC job scheduler and then distributed there. So, in the ideal world, all 40 jobs will run all at the same time. In such these type of big long-running jobs, the overhead per job is less of a concern, especially since it may be minutes before jobs are really started.

So, the above was the main objective when I created doFuture. That explains why I haven't focused on optimizing parallelization of a very large number of short running tasks. Nevertheless, doFuture should be improved to have similar performance stats as doParallel and hopefully Issue #7 will address most of the difference currently observed.

PS. Unless you're on Windows, I think doParallel::registerDoParallel(cores = 4) will use "multicore" processing, i.e. forked processes. To make sure your comparison is valid also on non-Windows systems, one can use:

library("doParallel")
cl <- parallel::makeCluster(4)
registerDoParallel(cl)

so it uses the exact same type of workers as:

library(doFuture)
registerDoFuture()
cl <- parallel::makeCluster(4)
plan(cluster, workers=cl)
paullevchuk commented 8 years ago

Henrik. Thanks for quick reply. Actually I'm working only on Windows. I changed initialization code of doParallel from cores to cluster. Result is still great. Probably the easier way is to use doParallel backend on Windows (at least for multicore rmode)?

library(doParallel)
cl <- makeCluster(4)
registerDoParallel(cl)
getDoParWorkers()
## [1] 4

x <- iris[ which(iris[,5] != "setosa"), c(1,5)]
trials <- 10000
ptime <- system.time({
    r <- foreach(icount(trials), .combine=cbind) %dopar% {
        ind <- sample(100, 100, replace=TRUE)
        result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
        coefficients(result1)
    }
})[3]
## ptime elapsed 7.61 
HenrikBengtsson commented 8 years ago

For your immediate needs, yes, definitely use doParallel.

paullevchuk commented 8 years ago

Anyway. I believe future and doFuture can become unified, simple and elegant solutions for parallelization. So I am waiting for new updates of your packages.

HenrikBengtsson commented 7 years ago

Just a heads up: in the develop branch, doFuture now uses load balancing / chunk by default, cf. Issue #7.

This solves the issue with the huge time difference you reported above (thanks again). The updates in the develop branch will be part of the next release. I'm hoping to have future 1.4.0 on CRAN soon (just submitted) and after than I'm hoping to get time to push out an updated version of doFuture, because I find this issue quite important.

Here are new benchmarking examples:

library("doParallel")

cl <- parallel::makeCluster(4)
registerDoParallel(cl)
print(getDoParWorkers())
## [1] 4

x <- iris[ which(iris[,5] != "setosa"), c(1,5)]
trials <- 10000
ptime <- system.time({
    r <- foreach(icount(trials), .combine=cbind) %dopar% {
        ind <- sample(100, 100, replace=TRUE)
        result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
        coefficients(result1)
    }
})[3]
print(ptime)
## elapsed 
##  10.912
library("doFuture")
registerDoFuture()
cl <- parallel::makeCluster(4)
plan(cluster, workers = cl)
print(getDoParWorkers())
## [1] 4

x <- iris[ which(iris[,5] != "setosa"), c(1,5)]
trials <- 10000
ptime <- system.time({
    r <- foreach(icount(trials), .combine=cbind) %dopar% {
        ind <- sample(100, 100, replace=TRUE)
        result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
        coefficients(result1)
    }
})[3]
print(ptime)
## elapsed 
##  9.455
HenrikBengtsson commented 7 years ago

FYI, doFuture 0.4.0 implementing this is now on CRAN.

instantkaffee commented 7 years ago

Thx for your excellent work.