Closed zeehio closed 1 year ago
This approach is better when the workers and the caller do not share memory. However for workers that can share memory (e.g. serial or multicore) we may be paying the overhead of the transposing the list of lists without reaping any benefit, since passing the whole dataset to the worker may not create any additional copies.
I have not checked that, maybe someone else already knows the answer
In order to transpose the list only when needed I would need to know if the workers of the current backend share memory with the current process or not. Is there any method named "sharedMemory" or similar that returns TRUE
for MultiCoreParam
and SerialParam
but returns FALSE
for SnowParam
(etc for other backends)?
The only way to share memory would be forking, right? (Well, and single-process)
I guess, yes.
I'd like to consider backends such as this one (even if it is experimental):
https://github.com/HenrikBengtsson/BiocParallel.FutureParam
Do you have an idea in mind?
Hmm, so I think your suggestion of a method might make sense. For most params, this method will return a static boolean value (TRUE for multicore/serial and FALSE for others) but for "meta-backends" like FutureParam and DoparParam, they would need to examine the registered foreach/future backend and return true or false depending on which one is used.
For backward compatibility and future-proofing, the abstract base class (BiocParallelParam) should probably provide a default implementation of this method (which always returns FALSE?), which is then overridden by specific backends to "opt in" to the optimization.
We now have the best of both worlds:
bpsharememory()
genericThis generic tells whether the workers share the same memory space. A serial or multicore based backend would return TRUE
, while a multiple process backend (like Snow) would return FALSE
.
This generic is now used by bpmapply()
to determine whether it is better to pass the whole data to all workers (faster if workers share memory) or to modify the structure of the data to pass a slice of it to each worker (more efficient if workers do not share memory)
bpmapply
: Use bpsharememory()
to avoid extra copies of all databpmapply()
receives several arguments to iterate on. This ends up being something like:
ddd <- list(
arg1 = list(arg1_iter1, arg1_iter2, arg1_iter3),
arg2 = list(arg2_iter1, arg2_iter2, arg2_iter3)
)
The implementation before this commit was passing ddd
to all workers as well as the iteration index i
, and each worker would take the corresponding slice ddd$arg1[[i]]
and ddd$arg2[[i]]
and pass it to the parallelized function.
For Serial and MultiCore backends, where workers share memory, this is very efficient. However for Snow backends, where workers do not share memory, the whole ddd
needs to be serialized, copied to each worker and deserialized, which is very inneficient.
In the Snow scenario, it is far more efficient to let the main process "transpose" the ddd
list so it becomes:
ddd2 <- list(
iter1 = list(arg1_iter1, arg2_iter1),
iter2 = list(arg1_iter2, arg2_iter2)
)
Then only pass the corresponding iteration to each worker, reducing the amount of serialization/deserialization needed and the memory footprint significantly.
For this to be efficient in all backends, we need to know if the specific backend has workers that share memory or not.
So we use the newly introduced bpsharememory()
generic to determine that and choose the most efficient bpmapply()
argument slicing strategy.
@Jiefei-Wang's SharedObject provides object sharing provided one is on a single machine.
I'm not a fan of bpsharedmemory()
; if there's a need for bpmapply()
to behave differently for different back-ends then dispatch on BPPARAM in
> showMethods(bpmapply)
Function: bpmapply (package BiocParallel)
FUN="ANY", BPPARAM="BiocParallelParam"
FUN="ANY", BPPARAM="list"
FUN="ANY", BPPARAM="missing"
It's actually not correct that MulticoreParam()
always shares memory. Consider
x <- 1
param <- bpstart(MulticoreParam()) # forked process, 'x' in shared memory
bplapply(1:4, sqrt, BPPARAM = param) # already forked, so 1:4 must be serialized to worker
Please also follow BiocParallel coding convention (such as it is!) with 4-space indentation. Also I have moved away from aligning arguments with the opening parenthesis of a function name, and instead favor starting (if arguments don't fit on a single line) arguments on a new line indented 4 spaces
result <- someFunction(
foo,
bar,
baz
)
with some flexibility favoring a compact representation
result <- someFunction(foo, bar, baz) # fits on one line
result <- someFunction( #args fit on one line, but not function + arg
foo, bar, baz
)
result <- someFunction( # some args fit on one
foo, bar,
baz
)
I think it's also important to ask whether this is enough of a problem to warrant a solution? Can you provide a simple reproducible example that illustrates?
Interesting points about memory sharing, Martin. How do you provide a reproducible example for memory usage? Is there an easy way to get the peak memory usage of the R process and all its children summed?
Measuring memory is challenging of course, but actually I had been under the impression that the discussion was about speed, so an example would certainly help clarify! Maybe Rcollectl would be helpful...
Here I just show the problem, which is RAM usage mostly. I'll discuss the solution afterwards
Here is some code to demo the issue, which affects mostly to RAM usage (and indirectly to CPU, because we have to serialize a lot more data).
To keep things simple I just use a 1GB list with 1000 elements to iterate on. Each element is 1MB long. This was large enough to be noticeable in my system monitor, but small enough to not crash on my 16GB laptop. If you have less RAM available than me, please make the dataset size smaller to avoid RAM issues. I don't measure the RAM in the script, but I attach two screenshots of my system monitor:
# This example shows the RAM issues of bpmapply()
# 1. Create a list of num_samples elements, where each element will be a numeric
# vector of a given size, such as the whole list targets to take 1GB
# amount of RAM. I call this list "samples" because it's like a list of
# samples to process
#
# On my 16GB of RAM laptop, 1GB are easily noticeable, and I won't suffer
# running out of RAM if I have an extra copy or two (but I will notice the bump)
# Dataset size:
dataset_size_GB <- 1 # GB
num_samples <- 1000
sample_size_MB <- dataset_size_GB/num_samples * 1024 # MB
sample_length <- round(sample_size_MB*1024^2/8) # one double is 8 bytes
samples <- lapply(
seq_len(num_samples),
function(i, sample_length) {
runif(sample_length)
},
sample_length = sample_length
)
# 2. Since bpmapply() is designed to take several arguments, we also create
# another list of the same length as the dataset, with one random number.
# I see this as another argument I want to iterate on, but it's not relevant
extra_args <- lapply(
seq_len(num_samples),
function(i) {
runif(1)
}
)
# 3. To show the problem, we can check installing either the master version or
# this pull request:
#
# # Pick one:
# remotes::install_github("Bioconductor/BiocParallel)
# remotes::install_github("Bioconductor/BiocParallel#228")
#
# (The BiocParallel from Bioconductor would work as well)
library(BiocParallel)
# I use three workers
bpparam <- SnowParam(workers = 3, exportglobals=FALSE, exportvariables=FALSE)
process_sample <- local({function(sample, extra_arg) {
force(sample)
force(extra_arg)
# just wait a bit, 1000 samples / 3 workers ~ 333 samples/worker * 0.05 s/sample = 16.6 seconds
Sys.sleep(0.05)
NULL
}}, envir = baseenv())
bpmapply(
process_sample,
sample = samples,
extra_arg = extra_args,
BPPARAM = bpparam,
SIMPLIFY = FALSE
)
When running this on the BiocParallel master branch, I see my RAM usage goes from 15% to 60%. This is roughly 7GB of RAM.
When running this on the branch from this pull request (remotes::install_github("Bioconductor/BiocParallel#228")
), we take from 15% up to 25% of the RAM. That's 1.6GB.
I may be off in my estimations, since I took them with the naked eye, but it looks pretty clear to me this is worth it.
There are three points open for discussion:
bpsharememory()
: As you have shown with the MultiCore
example, bpsharememory()
is not the right solution for this problem.We have here two implementations of bpmapply()
: one that sends a copy of the whole list of arguments to each worker and another one that transforms that list so it can send to each worker just the stuff it needs.
bpmapply()
to pass to each worker only the stuff it needs. I suggest this should be the default bpmapply()
implementation.bpmapply()
implementation for SerialParam()
SharedObject
For backends such as Multicore
or maybe even Snow
(e.g. when all workers are on localhost) it should be possible to share memory between the workers. This should be by far the most efficient solution!
To use SharedObject
we should know:
SharedObject
)SharedObject
.If everything is compatible with SharedObject
then we should use it (adding it as a dependency to BiocParallel...). I believe that using SharedObject
will require significant testing in many OS and it is beyond this pull request.
bpsharememory()
genericbpmapply()
will use the proposed implementation for all backends except for Serial, where it will never be as efficient as the older one.What do you think?
The plan sounds good; thanks for your engagement on this.
One thing might be to reconsider a special case for SerialParam -- there's value in having consistency across back ends, especially when one might switch to SerialParam when trying to debug. Also and perhaps more importantly the re-organization of data structures might not actually be that expensive -- not actually copying the large vector elements, just the S-expressions of the list 'skeleton'. Not sure where you are in your knowledge of R but
> l = list(list(x=1:10, y = 1:10))
> .Internal(inspect(l))
@13c821060 19 VECSXP g0c1 [REF(2)] (len=1, tl=0)
@15acd1648 19 VECSXP g0c2 [REF(3),ATT] (len=2, tl=0)
@13c04a810 13 INTSXP g0c0 [REF(65535)] 1 : 10 (compact)
@13c04a730 13 INTSXP g0c0 [REF(65535)] 1 : 10 (compact)
ATTRIB:
@13c04a6c0 02 LISTSXP g0c0 [REF(1)]
TAG: @15a80d020 01 SYMSXP g0c0 [MARK,REF(6344),LCK,gp=0x4000] "names" (has value)
@15acd16c8 16 STRSXP g0c2 [REF(65535)] (len=2, tl=0)
@15b00d6e0 09 CHARSXP g0c1 [MARK,REF(127),gp=0x61] [ASCII] [cached] "x"
@15b04a3f0 09 CHARSXP g0c1 [MARK,REF(23),gp=0x61] [ASCII] [cached] "y"
shows that the 'data' x
is at memory pointed to by 13c04a810
and y
is at 13c04a730
Re-arranging,
> m = list(x = l[[1]]$x, y = l[[1]]$y)
> .Internal(inspect(m))
@14a836788 19 VECSXP g0c2 [REF(1),ATT] (len=2, tl=0)
@13c04a810 13 INTSXP g0c0 [REF(65535)] 1 : 10 (compact)
@13c04a730 13 INTSXP g0c0 [REF(65535)] 1 : 10 (compact)
ATTRIB:
@14ab4e438 02 LISTSXP g0c0 [REF(1)]
TAG: @15a80d020 01 SYMSXP g0c0 [MARK,REF(6345),LCK,gp=0x4000] "names" (has value)
@14a836808 16 STRSXP g0c2 [REF(1)] (len=2, tl=0)
@15b00d6e0 09 CHARSXP g0c1 [MARK,REF(129),gp=0x61] [ASCII] [cached] "x"
@15b04a3f0 09 CHARSXP g0c1 [MARK,REF(25),gp=0x61] [ASCII] [cached] "y"
the list S-expressions have been updated, but the data are being re-used, x
is still at 13c04a810
and y
at 13c04a730
so there is no real memory cost (the list S-expressions are a fixed size, maybe just 48 bytes? (object.size(list())
; I don't think that's exactly right). And the time cost of re-arranging is just an iteration that is likely insignificant in the context of the computation being performed.
Oh right! I forgot about R copy-on-write magic! I'll keep it simple then!
I think there are some reasons that I did not touch bpmapply
when I was reconstructing these apply
functions, but I'm not exactly sure why... Maybe error handling? I need to go over the code again to see if there are any side effect for this change.(or I just simply forgot to change this function?)
@Jiefei-Wang Sorry, I don't have context of what you were doing. bpmapply()
ends up wrapping bplapply()
so most of the parallellization stuff happens in bplapply()
. Maybe you left it out because of that?
Your implementation looks good to me, I only have one minor comment. When creating ddd
, I think we can use mapply
to make the list instead of reinventing our own wheel. For example
foo <- function(...){
ddd <- mapply(
function(...)list(...),
...,
SIMPLIFY = FALSE
)
ddd
}
foo(i=1:2, j=4:5)
It will take care of both variable names and unmatched variable lengths.
I didn't do that simplification because when I tried some unit tests failed.
We could argue if the unit tests should be changed, but I would rather do that on a different issue.
Example of a failing test (from inst/unitTests/test_bpmapply.R
):
library(BiocParallel)
library(RUnit)
X <- list(c(a = 1))
checkIdentical(X, bpmapply(identity, X, SIMPLIFY = FALSE))
For your suggestion to work, that test should be changed to:
library(BiocParallel)
library(RUnit)
X <- list(c(a = 1))
checkIdentical(mapply(identity, X, SIMPLIFY = FALSE) , bpmapply(identity, X, SIMPLIFY = FALSE))
Which makes sense to me, to be honest. There are several other tests that would need to be updated as well.
Anyway, this is a subtle difference between mapply
and bpmapply
that probably should be fixed in BiocParallel. But I'd rather do it on a different issue / pull request.
In case you want to check that out, here is a branch with the simplification you suggested:
remotes::install_github("zeehio/BiocParallel@fix-simplify-bpmapply-argument-preparation")
Or, if you want the code, you can add my remote and check the branch out (in case you are not familiar with git remotes and branches):
# Assuming you are in this git repository...
git remote add zeehio git@github.com:zeehio/BiocParallel.git
git fetch zeehio
git checkout fix-simplify-bpmapply-argument-preparation
If there are no other issues or suggestions I would suggest to merge this pull request and work on that simplification on a new thread. I would rather avoid introducing breaking changes here
Thanks @zeehio this looks great! Can you 'squash' this into a single commit maybe with
git reset --soft ad15c8f
git commit --edit -m"$(git log --format=%B --reverse HEAD..HEAD@{1})"
(from magic at https://stackoverflow.com/a/5201642/547331) ? That way the history shows only the paths taken rather than the paths not taken... Maybe a commit message like
improve bpmapply() memory use
- merges #228
- bpmapply receives several arguments to iterate on. This ends up being
something like:
ddd <- list(
arg1 = list(arg1_iter1, arg1_iter2, arg1_iter3),
arg2 = list(arg2_iter1, arg2_iter2, arg2_iter3)
)
The implementation before this commit was passing ddd to all
workers as well as the iteration index, and each worker would
take the corresponding slice ddd$arg1[[i]] and ddd$arg2[[i]].
For Serial and (sometimes) Multicore backends, where workers share memory,
this is very efficient. However for Snow backends, where workers
do not share memory, the whole ddd needs to be serialized, copied
to each worker and deserialized, which is very inneficient.
In the Snow scenario, it is far more efficient to let the main
process "transpose" the `ddd` list so it becomes:
ddd2 <- list(
iter1 = list(arg1_iter1, arg2_iter1),
iter2 = list(arg1_iter2, arg2_iter2)
)
Then only pass the corresponding iteration to each worker,
reducing the amount of serialization/deserialization needed
and the memory footprint significantly.
The re-arrangement is not too expensive, and for consistency is
applied to all backends.
- Define helper functions in local() only with the base namespace
- Simplify bpmapply implementation
- Remove unused .mrename
- Update NEWS
I guess there have been changes since this started; I can update before merging, but if you wanted to git rebase master
and adjust the version number in the NEWS entry file as appropriate... (or I can do this)
I think the .transposeArgsWithIterations()
should be in bpmapply-methods (and corresponding test file).
I used git rebase -i master
and chose to squash all commits. It's all squashed now.
I updated the news entry and moved the transpose function as suggested.
I did this from my phone and I don't have R available here, I can run R CMD check later today to ensure it's all good, or you can beat me to it if you like :)
All my checks pass. Feel free to merge
Thanks @zeehio that was really helpful; I added you as 'ctb' to the DESCRIPTION file.
Thanks!
bpmapply was passing another copy of the full data (besides #227). In this case, directly as an explicit extra argument to bplapply.
The arguments to be iterated on are now transposed, so instead of having
ddd[[i]][[j]]
being the value of the i-th argument in the j-th iteration, we build a list of the formddd[[j]][[i]]
. Thenbplapply()
can directly iterate onddd
, passing one element ofddd
to each worker, instead of passing the wholeddd
list of lists and the corresponding index.This approach has a drawback when the arguments to be iterated on are not lists, but vectors. In that case, the transposition has some overhead, but I would argue that it would make more sense to use something like bpvec or bpmvec if it existed.
UPDATED: This pull request has been updated so its drawbacks do not exist anymore. https://github.com/Bioconductor/BiocParallel/pull/228#issuecomment-1301778281