Bioconductor / BiocParallel

Bioconductor facilities for parallel evaluation
https://bioconductor.org/packages/BiocParallel
65 stars 29 forks source link

Nested parallellization question #239

Closed zeehio closed 1 year ago

zeehio commented 1 year ago

Hi,

(On R 4.2.2, BiocParallel 1.33.3)

I am writing a package where I have two functions that call bplapply() and one function calls the other one (This is nested parallellization). I expect most users to use a Snow+Serial combination, but that's ultimately up to them.

For instance I expect users to typically use Snow+Serial on Windows as shown

BiocParallel::registered()
# $SnowParam
# class: SnowParam
#   bpisup: FALSE; bpnworkers: 2; bptasks: 0; bpjobname: BPJOB
#   bplog: FALSE; bpthreshold: INFO; bpstopOnError: TRUE
#   bpRNGseed: ; bptimeout: NA; bpprogressbar: FALSE
#   bpexportglobals: TRUE; bpexportvariables: TRUE; bpforceGC: FALSE; bpfallback: TRUE
#   bplogdir: NA
#   bpresultdir: NA
#   cluster type: SOCK
# 
# $SerialParam
# class: SerialParam
#   bpisup: FALSE; bpnworkers: 1; bptasks: 0; bpjobname: BPJOB
#   bplog: FALSE; bpthreshold: INFO; bpstopOnError: TRUE
#   bpRNGseed: ; bptimeout: NA; bpprogressbar: FALSE
#   bpexportglobals: FALSE; bpexportvariables: FALSE; bpforceGC: FALSE; bpfallback: FALSE
#   bplogdir: NA
#   bpresultdir: NA
# 

Here is my simplified code:


internal_loop <- function(iterations) {
  x <- as.list(seq_len(iterations))
  BiocParallel::bplapply(x, function(i) {
    Sys.sleep(1)
    i
  })
}

external_loop <- function(iterations_ext, iterations_int) {
  x <- as.list(rep(iterations_int, iterations_ext))
  BiocParallel::bplapply(x, function(i) {
    internal_loop(i)
  })
}

I expected the outer BiocParallel::bplapply() to pop the first backend from registered() (SnowParam in the example above), then using that backend call internal_loop in parallel. Next, each internal loop call will call bplapply(), which will pop the next backend from registered() (SerialParam in the example above) and use it.

However this is not what is happening, since I find the following error:

external_loop(5, 5)
# Error: BiocParallel errors
#  1 remote errors, element index: 1
#  2 unevaluated and other errors
#  first remote error:
# Error in socketConnection(port = port, server = TRUE, blocking = TRUE, : no se puede abrir la conexión 
# "no se puede abrir la conexión" means "cannot open the connection"

To confirm, I changed the internal loop to see what is bpparam() returning, expecting to see a SerialParam there. See the changes here:


internal_loop <- function(iterations) {
  x <- as.list(seq_len(iterations))
  # I skip the bplapply call
  #BiocParallel::bplapply(x, function(i) {
  #  Sys.sleep(1)
  #  i
  #})
  # I just return the bpparam that will be seen by the internal bplapply() call
  BiocParallel::bpparam()
}

external_loop <- function(iterations_ext, iterations_int) {
  x <- as.list(rep(iterations_int, iterations_ext))
  current_bpparam <- BiocParallel::bpparam()
  internal_bpparams <- BiocParallel::bplapply(x, function(i) {
    internal_loop(i)
  })
  list(external = current_bpparam, internal = internal_bpparams[[1]])
}
external_loop(2, 2)

Both the external and the internal loop are trying to create Snow clusters.

#external_loop(2, 2)
# $external
# class: SnowParam
#   bpisup: FALSE; bpnworkers: 2; bptasks: 0; bpjobname: BPJOB
#   bplog: FALSE; bpthreshold: INFO; bpstopOnError: TRUE
#   bpRNGseed: ; bptimeout: NA; bpprogressbar: FALSE
#   bpexportglobals: TRUE; bpexportvariables: TRUE; bpforceGC: FALSE; bpfallback: TRUE
#   bplogdir: NA
#   bpresultdir: NA
#   cluster type: SOCK
# 
# $internal
# class: SnowParam
#   bpisup: FALSE; bpnworkers: 2; bptasks: 0; bpjobname: BPJOB
#   bplog: FALSE; bpthreshold: INFO; bpstopOnError: TRUE
#   bpRNGseed: ; bptimeout: NA; bpprogressbar: FALSE
#   bpexportglobals: TRUE; bpexportvariables: TRUE; bpforceGC: FALSE; bpfallback: TRUE
#   bplogdir: NA
#   bpresultdir: NA
#   cluster type: SOCK
# 

Should I explicitely pop the bpparam() from the registered() list somehow, or is this a BiocParallel issue?

I feel I am missing something, any help is very appreciated.

Thanks

mtmorgan commented 1 year ago

The idea is to provide a list() of BiocParallelParam corresponding to the nested order in which you'd like them to be applied

internal_loop <- function(iterations, BPPARAM) {
  x <- as.list(seq_len(iterations))
  BiocParallel::bplapply(x, function(i) {
    Sys.sleep(1)
    i
  }, BPPARAM = BPPARAM)
}

external_loop <- function(iterations_ext, iterations_int, BPPARAM) {
  x <- as.list(rep(iterations_int, iterations_ext))
  BiocParallel::bplapply(x, internal_loop, BPPARAM = BPPARAM)
}

res <-  external_loop(5, 5, list(SnowParam(), MulticoreParam()))

But really I would strongly discourage this -- you're asking a lot of your users to configure this correctly. In the 'coarse-grained' parallelism that BiocParallel (and probably more generally R-level parallelism) works best, it's likely that parallel application of the outer loop is 'enough' to consume CPU resources available on your computer. For instance in your little example works was being done by 5 x 5 = 25 tasks, but I only have 8 cores -- in a real example each of these 25 tasks would be competing with each other for the 8 cores; it would be better to just divide the 25 tasks between the 8 cores (core 1 gets tasks 1:4, core 2 gets tasks 5:8, etc) so that all the cores are being used, but are not competing with each other. This is what happens with a single, default bplapply.

zeehio commented 1 year ago

In my package, internal_loop is going to be an exported function, that a user may want to call directly. It makes sense that the function can be parallelized. external_loop() is also exported, and it makes sense to parallelize it as well.

I expected BiocParallel defaults to be "on the first bplapply either SnowParam or MultiCore are used, then on any nested bplapply calls SerialParam is used". So neither I nor most of my users would need to worry about registering parallelization options or passing BPPARAM values around.

I believe the default BiocParallel registered() output should be Snow+Serial (or MultiCore+Serial) to prevent child workers to explode in more workers as you say. Snow+MultiCore doesn't make much sense to me. An advanced user with access to a HPC cluster might want to register some distributed computing backend first and then a Snow multiprocessing afterwards for nested calls. I expected that by registering the backends in that order, no BPPARAM arguments would need to be passed, since everything it's set up upfront.

I'm clearly missing something because I don't understand why when registered() shows me SnowParam+SerialParam the default behaviour of my nested bplapply is Snow+Snow, so either I or my users must manually pass the Snow+Serial list as you mention.

Why does registered() return a list if I have to explicitly pass the list to bplapply anyway?

The BiocParallel introductory vignette mentions that the registered() list behaves as a stack. I assumed the default BPPARAM argument for bplapply would be the registered() list, and any nested bplapply call would use registered() removing the first element, using SerialParam if all backends are used.

Right now I am concerned with the default behaviour of BiocParallel: Let's assume in my package I use bplapply and inside that bplapply I call a function from a third-party package. That's simple to imagine: my users have registered a snow cluster, they run my function and everything works fine for everyone. Next month the third-party package adds BiocParallel support to the function I am calling in my package. Suddenly all my users will start creating nested snow clusters and having too many workers competing for a smaller number of cores.

I will keep testing how this works and I am probably missing something, but that doesn't seem the most sensible default behaviour for me. Or maybe I am missing something here...

mtmorgan commented 1 year ago

If internal_loop() and external_loop() are both exposed, then perhaps it is best to make sure that your external_loop() call(s) to internal_loop() does not result in over-subscription, most simply by passing SerialParam().

Snow originally meant 'simple network of workstations' and Snow+Multicore corresponded to the idea that you'd spawn a bunch of workers on separate workstations (independent computers) and subsequently have access to the multiple cores on those workers. But in reality there's no way for a developer to know what their user will do, i.e., no way to know whether the snow workers are on the same or different 'workstations'.

registered() was originally meant to provide the (advanced) user with a way to toggle between pre-configured backends -- for this function I want multiple cores, but for this one I want separate processes; in both cases I want to log my results -- without having to manage these different alternatives themselves; it behaves like a stack in the sense that register() puts a new param in the default position, not that the params will be used 'automatically' at subsequent levels of nesting. Also, one can establish persistent clusters bpstart(registered()[["SnowParam"]]) and not worry about managing that at different stages in the script. In retrospect registration seems like over-engineering, especially compared to alternative solutions (like batchtools on an HPC cluster.

There's no agreement amongst parallel packages on ways to detect existing parallelism (probably @HenrikBengtsson will correct me on this), so even if BiocParallel were clever enough to detect that it was invoking nested calls, your concern about third party packages would still apply. Also in general as a user one might intentionally invoke nested parallelism, ensuring no oversubscription, for specific problems. And as another piece of the puzzle oversubscription is not well-defined, e.g., on my personal laptop I have no problems using all cores, but on an HPC node that would not be appropriate.

The take-home message for most packages is to avoid nested parallelism in their own code.

HenrikBengtsson commented 1 year ago

I'll answer on the following part without having read all the details above:

There's no agreement amongst parallel packages on ways to detect existing parallelism (probably @HenrikBengtsson will correct me on this), so even if BiocParallel were clever enough to detect that it was invoking nested calls, your concern about third party packages would still apply.

Correct, there is no standard in R to detect an active parallelism. There might be bits and pieces that give us some clues, but nothing reliable to make use of. There are ways we can lower the risk for ending up with nested parallelism by mistake. For instance, we can set (and unset) all known R options and environment variables that control parallel processing, e.g. options(mc.cores = 1L), Sys.getenv(BIOCPARALLEL_WORKER_NUMBER = "1"), ... This can be done internally by each parallel framework. Of course, it won't protect against the case when the number of parallel workers are hard-coded, including my favorite hate-function detectCores().

zeehio commented 1 year ago

If internal_loop() and external_loop() are both exposed, then perhaps it is best to make sure that your external_loop() call(s) to internal_loop() does not result in over-subscription, most simply by passing SerialParam().

I will do that.

registered() was originally meant to provide the (advanced) user with a way to toggle between pre-configured backends -- for this function I want multiple cores, but for this one I want separate processes; in both cases I want to log my results -- without having to manage these different alternatives themselves; it behaves like a stack in the sense that register() puts a new param in the default position, not that the params will be used 'automatically' at subsequent levels of nesting. Also, one can establish persistent clusters bpstart(registered()[["SnowParam"]]) and not worry about managing that at different stages in the script. In retrospect registration seems like over-engineering, especially compared to alternative solutions (like batchtools on an HPC cluster.

There's no agreement amongst parallel packages on ways to detect existing parallelism (probably @HenrikBengtsson will correct me on this), so even if BiocParallel were clever enough to detect that it was invoking nested calls, your concern about third party packages would still apply. Also in general as a user one might intentionally invoke nested parallelism, ensuring no oversubscription, for specific problems. And as another piece of the puzzle oversubscription is not well-defined, e.g., on my personal laptop I have no problems using all cores, but on an HPC node that would not be appropriate.

To my understanding, as a user I just needed to register a list of backends. Each nested bplapply() call would pop the first element of the list and pass the remaining registered backends to the corresponding workers. So workers just need to use that list (given to them) if they have to call bplapply(). This approach is already working for the future package as explained in their vignette: https://cran.r-project.org/web/packages/future/vignettes/future-3-topologies.html. I see now that's not how BiocParallel is supposed to work

I started using BiocParallel because it's the Bioconductor standard package for parallelization (https://www.bioconductor.org/developers/package-guidelines/ section 9.10) . I feel that the future package is covering the nested parallelization scenario better than BiocParallel. I understand it is not a relevant issue for most people and it is probably a lot of work to fix this, so avoiding nested parallelization is the solution I will follow for now.

Feel free to close the issue if you like. And thanks to both of you for your detailed explanations

Jiefei-Wang commented 1 year ago

I think you should avoid using parallelization by default unless the user explicitly ask to do so. A simple argument in your function can handle that. No one will have a nested SNOW if all package developers follow this simple principle.