HenrikBengtsson / doFuture

:rocket: R package: doFuture - Use Foreach to Parallelize via Future Framework
https://doFuture.futureverse.org
84 stars 6 forks source link

future::makeClusterPSOCK behaves differently from parallel::makePSOCKcluster #53

Closed nicolagnecco closed 3 years ago

nicolagnecco commented 3 years ago

Hi,

I just moved from the parallel package to future package. I enjoy the package very much. At the same time, however, when I readapted my code, I observed the following two different behaviours.

For simplicity, let's say I create a three-node cluster (e.g., node 1, node 2 and node 3) and I have 12 tasks to perform (e.g., task 1, ..., task 12).

  1. When I use parallel::makePSOCKcluster the tasks are allocated as follows: node 1: task 1, node 2: task 2, node 3: task 3, node 1: task 4, ... node 3: task 12.

When I use future::makeClusterPSOCK, the tasks are allocated as follows: node 1: task 1, node 2: task 5, node 3: task 9, node 1: task 2, ... node 3: task 12.

Is it possible to achieve the same task allocation in future::makeClusterPSOCK as in parallel::makePSOCKcluster?

  1. When I use parallel::makePSOCKcluster, there is a 'master' R process and several 'worker' R processes. If I kill the master process, then all the worker processes are also killed. However, with future::makeClusterPSOCK, killing the master R process leaves the worker processes running. Is this an intended behaviour? Probably, I am missing something simple here.

Thanks in advance for your support. Best wishes, Nicola

HenrikBengtsson commented 3 years ago

What OS and version of R are you using?

Can you please give a reproducible example for the first part? Because future::makeClusterPSOCK() returns a 'SOCKcluster' object with the same type and number of workers as parallel::makePSOCKcluster(). I don't see how setting the same thing up using two different types of workers would affect what is sent where.

For the second part, I cannot reproduce. I'm on Linux (Ubuntu 18.04) with R 4.0.2 and regardless if I create a 'PSOCKcluster' cluster using:

cl <- parallel::makePSOCKcluster(2)

or

cl <- future::makeClusterPSOCK(2)

and then exit R, e.g. quit("no"), any launched worker processes are automatically terminated shortly thereafter; I watch them using watch -n 1 pgrep R.

nicolagnecco commented 3 years ago

Hi,

Thanks a lot for your answer. I am working on macOS Catalina 10.15.7 and R version 4.0.2 (2020-06-22).

  1. Below a reproducible example
rm(list = ls())
library(tidyverse)

n_tasks <- 1200
n_workers <- 2

# SNOW
library(doParallel)
#> Loading required package: foreach
#> 
#> Attaching package: 'foreach'
#> The following objects are masked from 'package:purrr':
#> 
#>     accumulate, when
#> Loading required package: iterators
#> Loading required package: parallel
cl <- parallel::makePSOCKcluster(n_workers, outfile = "")
registerDoParallel(cl)

cat(file = "test_snow.txt")
ll <- foreach(i = 1:n_tasks) %dopar% {
  Sys.sleep(0.001)
  cat("Simulation", i, "out of", n_tasks, "\n", 
      file = "test_snow.txt", append = TRUE)
}

# FUTURE
library(doFuture)
#> Loading required package: future
doFuture::registerDoFuture()
future::plan(cluster, workers = n_workers)

cat(file = "test_future.txt")
ll <- foreach(i = 1:n_tasks) %dopar% {
  Sys.sleep(0.001)
  cat("Simulation", i, "out of", n_tasks, "\n", 
      file = "test_future.txt", append = TRUE)
}

Created on 2020-10-11 by the reprex package (v0.3.0)

I use the cat function to redirect the stdout from the workers and keep track of the simulations as they progress. By using the SNOW package, in the test_snow.txt file I see that the tasks are allocated in increasing order between the workers. By using the future package, on the other hand, I see in the test_future.txt file that the tasks are split into two (i.e., worker 1 receives tasks from 1 to 600 and worker 2 from 601 to 1200).

  1. Regarding the second point, I observe the following behaviour. Suppose I save the following Rscript as reprex_kill_workers.R.
rm(list = ls())
library(tidyverse)

n_tasks <- 1200
n_workers <- 2

# FUTURE
library(doFuture)
#> Loading required package: future
doFuture::registerDoFuture()
future::plan(cluster, workers = n_workers)

ll <- foreach(i = 1:n_tasks) %dopar% {
  Sys.sleep(1)
}

Created on 2020-10-11 by the reprex package (v0.3.0)

Then, I type on the command line the following.

nicus@Nicolas-MBP:~$ nohup Rscript reprex_kill_workers.R &
[1] 4699
nicus@Nicolas-MBP:~$ appending output to nohup.out

nicus@Nicolas-MBP: kill 4699
nicus@Nicolas-MBP:~$ 
[1]  + terminated  nohup Rscript reprex_kill_workers.R

Then, what I see is that the two computing R processes are still running. This does not happen if I use the SNOW package, e.g. if I save the following Rscript as reprex_kill_workers.R.

rm(list = ls())
library(tidyverse)

n_tasks <- 1200
n_workers <- 2

# SNOW
library(doParallel)
#> Loading required package: foreach
#> 
#> Attaching package: 'foreach'
#> The following objects are masked from 'package:purrr':
#> 
#>     accumulate, when
#> Loading required package: iterators
#> Loading required package: parallel
cl <- parallel::makePSOCKcluster(n_workers, outfile = "")
registerDoParallel(cl)

ll <- foreach(i = 1:n_tasks) %dopar% {
  Sys.sleep(1)
}

Created on 2020-10-11 by the reprex package (v0.3.0)

Thanks a lot for your help and time! Best wishes, Nicola

HenrikBengtsson commented 3 years ago

Ok, I suspect you're comparing apples and oranges here and drawing the wrong conclusion what's causing the observed differences. To compare parallel::makePSOCKcluster() and future::makeClusterPSOCK(), let's just run your first example code snippet, i.e. the one where you set up:

cl <- parallel::makePSOCKcluster(n_workers, outfile = "")
registerDoParallel(cl)

Then compare the results you get from that with what you get when you run the same using:

cl <- future::makeClusterPSOCK(n_workers, outfile = "")
registerDoParallel(cl)

Do you still see a difference?

PS. There's no need to attach tidyverse in these examples, it's not being used. Also, unrelated to what's going on here see https://www.tidyverse.org/blog/2017/12/workflow-vs-script/ for why rm(list = ls()) is not a good habit.

nicolagnecco commented 3 years ago

Thanks for your answer.

Yes, you are right, when I run the two snippets that you propose, I see no difference, and everything works as expected. Thank you for clarifying this point.

I guess I just misunderstood the documentation of future::cluster that says that when the argument workers is a character vector or a numeric scalar, a cluster object is created using makeClusterPSOCK(workers). Then, I wrongly assumed that calling future::plan(cluster, workers = n_workers) would create the same cluster as future::makeClusterPSOCK(n_workers). But probably I am still missing something.

PS. Many thanks for the useful reference about good practices for R scripts. It helps a lot!

HenrikBengtsson commented 3 years ago

... I wrongly assumed that calling future::plan(cluster, workers = n_workers) would create the same cluster as future::makeClusterPSOCK(n_workers). But probably I am still missing something.

No, this assumption is indeed correct. My previous comment was to start walking you through what's going on. So, let's we move to the second code snippet that uses doFuture instead of doParallel. First, run that example with the original:

n_workers <- 2
doFuture::registerDoFuture()
future::plan(cluster, workers = n_workers)

Then compare the results to:

n_workers <- 2
doFuture::registerDoFuture()
cl <- future::makeClusterPSOCK(n_workers)
future::plan(cluster, workers = cl)

you should get the same results. This is because the second is indeed the same as to former. When you confirmed that, try with:

n_workers <- 2
doFuture::registerDoFuture()
cl <- parallel::makePSOCKcluster(n_workers)
future::plan(cluster, workers = cl)

You will get the same behavior as in the previous two cases. Confirm that you do get that, then I'll explain what's really going on.

nicolagnecco commented 3 years ago

Thanks for this example! Yes, I just tried the three snippets, and they all show the very same behaviour. I'm really curious to understand what's going on.

HenrikBengtsson commented 3 years ago

Ok, now we've established that the differences you observed is not due to future::makeClusterPSOCK() vs parallel::makePSOCKcluster(). Instead, it has to do with doParallel vs doFuture.

I'll transfer this issue over to doFuture and continue there as soon as I have the time.

HenrikBengtsson commented 3 years ago

The difference you are observing is do to different "chunking" strategies. Specifically,

  1. doFuture::registerDoFuture() chunks by default
  2. doParallel::registerDoParallel(ncores) chunks by default (only available on Linux and macOS)
  3. doParallel::registerDoParallel(cl = cl) does not chunks by default

When processing in chunks ("chunking"), then the all N iterations is chunked up into W equally sized chunks (W = number of workers) and each worker process one chunk, which means there will be in total W parallel jobs running.

Without chunking, each of the N iterations is send individually to each of the W workers, which means there will be in total N parallel jobs running (but only W at the same time).

Parallelization without chunking comes with more parallelization overhead, especially if there are lots of iterations to be done. Because of this, chunking is often much more efficient. This is why doFuture, future.apply, and furrr use chunking by default.

You can control whether or not chunking should take place in all three setups using specific .options.* arguments to foreach();

1. doFuture (regardless of backend)

library(foreach)
library(doFuture)
registerDoFuture()
plan(multisession, workers = 3)
## or equivalently
## cl <- parallel::makePSOCKcluster(3)
## plan(cluster, workers = cl)

# Default (use chunking)
y <- foreach(i=1:6, .combine=rbind) %dopar% {
  list(i = i, pid = Sys.getpid())
}
print(y)
#          i pid  
# result.1 1 19912
# result.2 2 19912
# result.3 3 19911
# result.4 4 19911
# result.5 5 19910
# result.6 6 19910

# Explicitly use chunking
y <- foreach(i=1:6, .combine=rbind, .options.future = list(scheduling = TRUE)) %dopar% {
  list(i = i, pid = Sys.getpid())
}
print(y)
#          i pid  
# result.1 1 19912
# result.2 2 19912
# result.3 3 19911
# result.4 4 19911
# result.5 5 19910
# result.6 6 19910

# Explicitly don't use chunking
y <- foreach(i=1:6, .combine=rbind, .options.future = list(scheduling = FALSE)) %dopar% {
  list(i = i, pid = Sys.getpid())
}
print(y)
#          i pid  
# result.1 1 19912
# result.2 2 19911
# result.3 3 19910
# result.4 4 19912
# result.5 5 19911
# result.6 6 19910

2. doParallel using forked parallelization (doParallel:::doParallelMC())

This only works on Linux and macOS. If tried on MS Windows, it'll fall back to case 3 below.

library(foreach)
doParallel::registerDoParallel(ncores = 3)

# Default (use chunking)
y <- foreach(i=1:6, .combine=rbind) %dopar% {
  list(i = i, pid = Sys.getpid())
}
print(y)
#          i pid  
# result.1 1 21109
# result.2 2 21110
# result.3 3 21111
# result.4 4 21112
# result.5 5 21109
# result.6 6 21110

# Explicitly use chunking
y <- foreach(i=1:6, .combine=rbind, .options.multicore = list(preschedule = TRUE)) %dopar% {
  list(i = i, pid = Sys.getpid())
}
print(y)
#          i pid  
# result.1 1 21175
# result.2 2 21176
# result.3 3 21177
# result.4 4 21178
# result.5 5 21175
# result.6 6 21176

# Explicitly don't use chunking
y <- foreach(i=1:6, .combine=rbind, .options.multicore = list(preschedule = FALSE)) %dopar% {
  list(i = i, pid = Sys.getpid())
}
print(y)
#          i pid  
# result.1 1 21241
# result.2 2 21242
# result.3 3 21243
# result.4 4 21244
# result.5 5 21245
# result.6 6 21246

What the latter PIDs tell you is that when not using chunking, then each

3. doParallel using SNOW cluster parallelization (doParallel:::doParallelSNOW())

library(foreach)
cl <- parallel::makePSOCKcluster(3)
doParallel::registerDoParallel(cl = cl)

# Default (no chunking)
y <- foreach(i=1:6, .combine=rbind) %dopar% {
  list(i = i, pid = Sys.getpid())
}
print(y)
#          i pid  
# result.1 1 19912
# result.2 2 19911
# result.3 3 19910
# result.4 4 19912
# result.5 5 19911
# result.6 6 19910

# Explicitly don't use chunking
y <- foreach(i=1:6, .combine=rbind, .options.snow = list(preschedule = FALSE)) %dopar% {
  list(i = i, pid = Sys.getpid())
}
print(y)
#          i pid  
# result.1 1 19912
# result.2 2 19911
# result.3 3 19910
# result.4 4 19912
# result.5 5 19911
# result.6 6 19910

# Explicitly use chunking
y <- foreach(i=1:6, .combine=rbind, .options.snow = list(preschedule = TRUE)) %dopar% {
  list(i = i, pid = Sys.getpid())
}
print(y)
#          i pid  
# result.1 1 19912
# result.2 2 19912
# result.3 3 19911
# result.4 4 19911
# result.5 5 19910
# result.6 6 19910

You can read more about chunking in ?doParallel::registerDoParallel, ?doFuture::doFuture and ?future.apply::future_lapply as well as in ?parallel::mclapply and ?parallel::parLapply (chunk.size in the latter).

Postscript

It's very unfortunate that the foreach package itself does not define an option to control chunking, e.g. foreach(..., scheduling = TRUE). Instead, we have to use different arguments for different foreach adaptors, e.g. .options.multicore, .options.snow, and .options.future. That makes it hard to write foreach that works regardless of which foreach adaptor the end-user registers. This is one of a few unfortunate misfeatures that come with the foreach framework. It can be fixed, but it has to be fixed by foreach.

HenrikBengtsson commented 3 years ago

For what's it's worth, see also Issue #48

nicolagnecco commented 3 years ago

First of all, thanks so much for such a clear and detailed explanation! This is extremely valuable to me since I have been spending quite a bit of time trying to understand parallel computation in R. Now, I can also better see the great advantage of using doFuture compared to doParallel.

As a very last question, I see that in your example with doParallel with forked parallelization and no chunking, there are as many PIDs as tasks. Would this mean that there will be as many R sessions as tasks? I am sorry, but I cannot read your explanation after the example since it looks truncated.

Thanks again!

HenrikBengtsson commented 3 years ago

... in your example with doParallel with forked parallelization and no chunking, there are as many PIDs as tasks. Would this mean that there will be as many R sessions as tasks? I am sorry, but I cannot read your explanation after the example since it looks truncated.

Parallel processing via forks is quite special. There each future is a fork of the main R session with the main differences that it gets its own unique PID and tempdir(). As soon as a forked workers (=one future) is finished, that forked worker process is terminated. Next fork will get a new unique worker process and so on. That is why you see different PIDs for each future (=worker). In contrast, when using multisession (=PSOCK cluster workers), those workers are created only once so you'll see a fixed number of unique PIDs.

Now, the number of workers that are allowed to be running at the same time is given by futures::nbrOfWorkers(), which you specify via, say, plan(multicore, workers = n).

nicolagnecco commented 3 years ago

Thanks once more for your clear answer, and thanks again for all the help and the great packages!

HenrikBengtsson commented 3 years ago

You welcome