Open kendonB opened 4 years ago
The files batchtools writes are usually very small. The bottleneck you are experiencing is probably your network filesystem having trouble with the number of files. A different serialization would not help in this case.
Mm so in the case of my command moving a 20GB data.frame I should think of future.batchtools as serializing it and writing many tiny chunks to disk that get picked up on the other end? Part of why qs is fast is that it compresses as well as serializes so fewer chunks.
The future.batchtools package exports each global object using batchtools::batchExport()
:
## (iii) Export globals?
if (length(future$globals) > 0) {
batchExport(export = future$globals, reg = reg)
}
So, if there are 10 globals in that named future$globals
list, then batchtools::batchExport()
will create 10 files in its job registry folder.
I have thought about having future.batchtools bundle up the globals into a single container object, like:
if (length(future$globals) > 0) {
batchExport(export = list(all_globals = future$globals), reg = reg)
}
such that only one file is written. Obviously, the future framework then needs to undo that on the worker code, which is doable. I never got around to do this for two reasons: (i) time, and (ii) I think it make more sense if 'batchtools' would provide that feature as an option.
I still don't get what the problem is here. Are we talking about some million global variables, or a few very large ones?
A few very large ones
Ok then "bundling" them into a single file will not help. The bottleneck is either CPU for compression or IO for writing to the file system. If it is IO then qs
will not help, it creates files of approximately the same size. If the bottleneck is CPU OTOH, disabling compression (or using a faster algorithms) could indeed help.
However, I am not sure whether it is a good idea to connect qs
. Is the serialization format stable or will it change in future versions?
Agree with @mllg. FWIW, using saveRDS(..., compress=FALSE)
is significantly faster than the default compress=TRUE)
- at least on my local SSD drive.
@kendonB, can you run some benchmarking on your large objects where you compare round-trip time for saveRDS()/readRDS()
to the corresponding qs
functions? That should provide a baseline for whether it is faster or not, and if so how much.
EDIT: What will complicate the comparison is to figure out when it is valid to use more than one thread in qs
because you definitely don't want to use more CPU than your jobs are given by the scheduler.
Are their package benchmarks not sufficient?
https://cran.r-project.org/web/packages/qs/vignettes/vignette.html
Do they specify whether they use the default compress=TRUE
or the compress=FALSE
? Because that's a low-hanging fruit that I can imagine @mllg could implement as an option in batchtools without much work. That alone can bring some significant improvements:
library(bench)
con1 <- tempfile()
con2 <- tempfile()
x <- rep(list(iris), times = 5000)
x <- do.call(rbind, x)
print(object.size(x))
## 27001856 bytes
stats <- bench::mark(check=FALSE,
saveRDS(x, file=con1, compress=FALSE),
saveRDS(x, file=con2, compress=TRUE)
)
print(stats[,1:7])
# A tibble: 2 x 7
expression min median `itr/sec` mem_alloc `gc/sec` n_itr
<bch:expr> <bch:tm> <bch:tm> <dbl> <bch:byt> <dbl> <int>
1 saveRDS(x, file = con1, compress = FALSE) 41.8ms 44.2ms 22.5 8.63KB 0 12
2 saveRDS(x, file = con2, compress = TRUE) 171.9ms 172.1ms 5.80 8.63KB 0 3
Just to clarify, I'm not trying to argue against qs
, which looks great (and it's not my decision). I just like to see some real-world benchmark, especially on your 20 GB case.
Because that's a low-hanging fruit that I can imagine @mllg could implement as an option in batchtools without much work.
Exactly. In my experience gzip compression (compress = "gz"
) is a good trade-off, and this could also be implemented / exposed as an option very easily.
Their benchmark does not include the compression used in saveRDS()
, and I'm unsure why they get much better performance with 4 threads. saveRDS()
is single-threaded AFAICT.
Oh wait, gzip is the default.
To be more constructive, and to help with your problem:
Regardless of the compression, communication is expensive. If possible, avoid globals and avoid passing large objects to functions for parallelization (i.e., do not pass them as arguments / more.args). Instead, it is often possible to just load them from the file system in the parallelized function. There are also plenty of packages and file formats which allow to retrieve rows / columns of a data frame without reading the complete frame, e.g. vroom
, miniparquet
or SQLite.
Sorry to revive an old thread, but I am also really interested in batchtools
allowing the user to specify if exports should be compressed (with an eye towards eventual implementation in future.batchtools
). I understand that, for a majority of cases, having workers individually load data from a shared location is advisable. While there are lots of fast and parallel ways of accessing the file system, most are limited to tabular data. When working with or operating on other object types, I have not found a consistent and efficient strategy for handling the file I/O. Giving the user the option to export without compression would be incredibly helpful in simplifying parallel calculations with these conditions. Below are some examples that highlight how toggling compression can drastically affect timings.
For each of the following examples, Sequential, Standard Batchtools, and Uncompressed Batchtools times are shown. Standard Batchtools is the default installation of v0.9.11. Uncompressed Batchtools is also based on v0.9.11, but has the following modified writeRDS
function definition:
writeRDS = function(object, file) {
file_remove(file)
saveRDS(object, file = file, version = 2L, compress = FALSE)
waitForFile(file, 300)
invisible(TRUE)
}
All examples are run through a Slurm scheduler using future.batchtools
.
```r R version 3.5.3 (2019-03-11) Platform: x86_64-pc-linux-gnu (64-bit) Running under: Red Hat Enterprise Linux Matrix products: default BLAS: /opt/revr/ropen/3.5.3/lib64/R/lib/libRblas.so LAPACK: /opt/revr/ropen/3.5.3/lib64/R/lib/libRlapack.so locale: [1] LC_CTYPE=en_US.UTF-8 LC_NUMERIC=C LC_TIME=en_US.UTF-8 LC_COLLATE=en_US.UTF-8 LC_MONETARY=en_US.UTF-8 [6] LC_MESSAGES=en_US.UTF-8 LC_PAPER=en_US.UTF-8 LC_NAME=C LC_ADDRESS=C LC_TELEPHONE=C [11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C attached base packages: [1] parallel stats graphics grDevices utils datasets methods base other attached packages: [1] batchtools_0.9.11 data.table_1.12.6 future.batchtools_0.8.1 doFuture_0.8.2 iterators_1.0.12 future_1.15.0 [7] globals_0.12.4 foreach_1.4.7 RevoUtilsMath_11.0.0 loaded via a namespace (and not attached): [1] rstudioapi_0.10 magrittr_1.5 rappdirs_0.3.1 hms_0.5.2 progress_1.2.2 debugme_1.1.0 R6_2.4.1 brew_1.0-6 [9] rlang_0.4.1 tools_3.5.3 packrat_0.5.0 checkmate_1.9.4 withr_2.1.2 assertthat_0.2.1 base64url_1.4 digest_0.6.22 [17] tibble_2.1.3 crayon_1.3.4 vctrs_0.2.0 codetools_0.2-16 zeallot_0.1.0 stringi_1.4.3 compiler_3.5.3 pillar_1.4.2 [25] prettyunits_1.0.2 RevoUtils_11.0.3 backports_1.1.5 listenv_0.7.0 pkgconfig_2.0.3 ```
This example involves each worker summing a large (~1.5 Gb) numeric vector. The vector is exported to each worker.
library(foreach)
library(doFuture)
library(future.batchtools)
library(batchtools)
data <- rnorm(200000000, 1, 1)
fun <- function(i, datain) {
temp <- sum(datain)
i
}
foreach::registerDoSEQ()
system.time({x = foreach(i = 1:10) %dopar% fun(i, data)})
# user system elapsed
# 2.744 0.741 3.705
doFuture::registerDoFuture()
resources <-
list(
memory = 1000L,
ncpus = 16,
'walltime' = 3600,
r_home = paste0(R.home(), "/bin/"),
snapshot = Sys.getenv("R_LIB_SNAPSHOT"),
partition = "debug"
)
mybatchtools <- tweak(batchtools_slurm, resources = resources, template = 'slurm-template')
mymultisession <- tweak(future::multisession, workers = 16)
plan(list(mybatchtools, mymultisession))
system.time({x = foreach(i = 1:10) %dopar% fun(i, data)})
# user system elapsed
# 1691.690 48.786 1757.878
doFuture::registerDoFuture()
resources <-
list(
memory = 1000L,
ncpus = 16,
'walltime' = 3600,
r_home = paste0(R.home(), "/bin/"),
snapshot = Sys.getenv("R_LIB_SNAPSHOT"),
partition = "debug"
)
mybatchtools <- tweak(batchtools_slurm, resources = resources, template = 'slurm-template')
mymultisession <- tweak(future::multisession, workers = 16)
plan(list(mybatchtools, mymultisession))
system.time({x = foreach(i = 1:10) %dopar% fun(i, data)})
# user system elapsed
# 29.854 47.929 128.583
Method | Required computational time (s) | Time relative to Sequential |
---|---|---|
Sequential | 3.705 | 1 |
Standard Batchtools | 1757.878 | 474.461 |
Uncompressed Batchtools | 128.583 | 34.70526 |
As you can see, the default behavior of batchtools
(as part of future.batchools
) is pretty terrible for this type of problem. That being said, removing the export compression does reduce the time required to complete the calculation in parallel by over an order of magnitude.
In this example, the deviance reduction associated with each possible first order interaction term is calculated for a GLM model. The model object is exported to each worker.
library(foreach)
library(doFuture)
library(future.batchtools)
library(batchtools)
irisSmall <- dplyr::filter(iris, Species %in% c("virginica", "versicolor"))
irisSmall <- irisSmall[rep(seq_len(nrow(irisSmall)), 10000), ]
irisSmall$Species <- factor(irisSmall$Species)
model <- glm(
Species ~ Sepal.Width + Sepal.Length + Petal.Width + Petal.Length,
data = irisSmall,
binomial(link = "logit")
)
inters <- colnames(model.matrix(Species ~ (Sepal.Width + Sepal.Length + Petal.Width + Petal.Length)^2 - (Sepal.Width + Sepal.Length + Petal.Width + Petal.Length) - 1,irisSmall))
checkInter <- function(var, interaction, baseModel) {
newFormula <- update(baseModel$formula, paste0("~ . + ", interaction[var]))
modelNew <- glm(
newFormula,
data = baseModel$data,
binomial(link = "logit")
)
modelNew$deviance
}
foreach::registerDoSEQ()
system.time({x = foreach(i = 1:length(inters)) %dopar% checkInter(i, inters, model)})
# user system elapsed
# 55.148 9.693 57.185
doFuture::registerDoFuture()
resources <-
list(
memory = 1000L,
ncpus = 16,
'walltime' = 3600,
r_home = paste0(R.home(), "/bin/"),
snapshot = Sys.getenv("R_LIB_SNAPSHOT"),
partition = "debug"
)
mybatchtools <- tweak(batchtools_slurm, resources = resources, template = 'slurm-template')
mymultisession <- tweak(future::multisession, workers = 16)
plan(list(mybatchtools, mymultisession))
system.time({x = foreach(i = 1:length(inters)) %dopar% checkInter(i, inters, model)})
# user system elapsed
# 151.240 14.518 175.428
doFuture::registerDoFuture()
resources <-
list(
memory = 1000L,
ncpus = 16,
'walltime' = 3600,
r_home = paste0(R.home(), "/bin/"),
snapshot = Sys.getenv("R_LIB_SNAPSHOT"),
partition = "debug"
)
mybatchtools <- tweak(batchtools_slurm, resources = resources, template = 'slurm-template')
mymultisession <- tweak(future::multisession, workers = 16)
plan(list(mybatchtools, mymultisession))
system.time({x = foreach(i = 1:length(inters)) %dopar% checkInter(i, inters, model)})
# user system elapsed
# 20.919 17.609 49.964
Method | Required computational time (s) | Time relative to Sequential |
---|---|---|
Sequential | 57.185 | 1 |
Standard Batchtools | 175.428 | 3.067728 |
Uncompressed Batchtools | 49.964 | 0.8737256 |
This example starts to show how removing compression for nontabular data objects can have a big impact on computational time. In particular, parallel evaluation without export compression in this example falls below sequential evaluation. It is easy to imagine more complex calculations were these gains would be even more pronounced.
This example is very similar to Example 2, except that the model object is not directly exported to each worker. The model object is first saved to a shared location once prior to the foreach
call, then each parallel process loads the object individually.
library(foreach)
library(doFuture)
library(future.batchtools)
library(batchtools)
irisSmall <- dplyr::filter(iris, Species %in% c("virginica", "versicolor"))
irisSmall <- irisSmall[rep(seq_len(nrow(irisSmall)), 10000), ]
irisSmall$Species <- factor(irisSmall$Species)
model <- glm(
Species ~ Sepal.Width + Sepal.Length + Petal.Width + Petal.Length,
data = irisSmall,
binomial(link = "logit")
)
inters <- colnames(model.matrix(Species ~ (Sepal.Width + Sepal.Length + Petal.Width + Petal.Length)^2 - (Sepal.Width + Sepal.Length + Petal.Width + Petal.Length) - 1,irisSmall))
checkInter <- function(var, interaction, baseModel) {
newFormula <- update(baseModel$formula, paste0("~ . + ", interaction[var]))
modelNew <- glm(
newFormula,
data = baseModel$data,
binomial(link = "logit")
)
modelNew$deviance
}
checkInterLoad <- function(var, interaction, path) {
model <- readRDS(path)
checkInter(var, interaction, model)
}
evalInter <- function(interaction, model, compression = TRUE) {
if (!dir.exists(paste(getwd(), "/tmp", sep = ""))) {
dir.create(paste(getwd(), "/tmp", sep = ""))
}
path <- tempfile(tmpdir = paste(getwd(), "/tmp", sep = ""))
saveRDS(model, path, compress = compression)
x <- foreach(i = 1:length(interaction)) %dopar% checkInterLoad(i, interaction, path)
if (file.exists(path)) {
file.remove(path)
}
x
}
foreach::registerDoSEQ()
system.time({x2 = evalInter(inters, model, compression = TRUE)})
# user system elapsed
# 105.044 3.036 99.544
foreach::registerDoSEQ()
system.time({x2 = evalInter(inters, model, compression = FALSE)})
# user system elapsed
# 64.308 3.164 61.741
doFuture::registerDoFuture()
resources <-
list(
memory = 1000L,
ncpus = 16,
'walltime' = 3600,
r_home = paste0(R.home(), "/bin/"),
snapshot = Sys.getenv("R_LIB_SNAPSHOT"),
partition = "debug"
)
mybatchtools <- tweak(batchtools_slurm, resources = resources, template = 'slurm-template')
mymultisession <- tweak(future::multisession, workers = 16)
plan(list(mybatchtools, mymultisession))
system.time({x2 = evalInter(inters, model, compression = TRUE)})
# user system elapsed
# 33.699 14.041 52.820
doFuture::registerDoFuture()
resources <-
list(
memory = 1000L,
ncpus = 16,
'walltime' = 3600,
r_home = paste0(R.home(), "/bin/"),
snapshot = Sys.getenv("R_LIB_SNAPSHOT"),
partition = "debug"
)
mybatchtools <- tweak(batchtools_slurm, resources = resources, template = 'slurm-template')
mymultisession <- tweak(future::multisession, workers = 16)
plan(list(mybatchtools, mymultisession))
system.time({x2 = evalInter(inters, model, compression = FALSE)})
# user system elapsed
# 12.816 12.764 34.884
doFuture::registerDoFuture()
resources <-
list(
memory = 1000L,
ncpus = 16,
'walltime' = 3600,
r_home = paste0(R.home(), "/bin/"),
snapshot = Sys.getenv("R_LIB_SNAPSHOT"),
partition = "debug"
)
mybatchtools <- tweak(batchtools_slurm, resources = resources, template = 'slurm-template')
mymultisession <- tweak(future::multisession, workers = 16)
plan(list(mybatchtools, mymultisession))
system.time({x2 = evalInter(inters, model, compression = TRUE)})
# user system elapsed
# 34.433 15.583 55.280
doFuture::registerDoFuture()
resources <-
list(
memory = 1000L,
ncpus = 16,
'walltime' = 3600,
r_home = paste0(R.home(), "/bin/"),
snapshot = Sys.getenv("R_LIB_SNAPSHOT"),
partition = "debug"
)
mybatchtools <- tweak(batchtools_slurm, resources = resources, template = 'slurm-template')
mymultisession <- tweak(future::multisession, workers = 16)
plan(list(mybatchtools, mymultisession))
system.time({x2 = evalInter(inters, model, compression = FALSE)})
# user system elapsed
# 11.393 13.875 34.344
Method | Compression (T/F) | Required computational time (s) | Time relative to Sequential with Compression |
---|---|---|---|
Sequential | T | 99.544 | 1 |
Sequential | F | 61.741 | 0.6202383 |
Standard Batchtools | T | 52.820 | 0.5306196 |
Standard Batchtools | F | 34.884 | 0.350438 |
Uncompressed Batchtools | T | 55.280 | 0.5553323 |
Uncompressed Batchtools | F | 34.344 | 0.3450133 |
With this example, I tried to show an alternative to exporting large objects directly to workers. As expected, there are differences between evaluation times with and without compression, but not between Standard Batchtools and Uncompressed Batchtools implementations. Uncompressed Batchtools without compression in this example performed slightly better than Example 2 (34.344 s compared to 49.964 s), but at the expense of a more complicated formulation. With more complex worker calculations, I think that this gap would close. Independently, I think there is a lot to be said of the convenience of posing a calculation like Example 2 compared to Example 3.
As you can see, the default behavior of
batchtools
(as part offuture.batchools
) is pretty terrible for this type of problem. That being said, removing the export compression does reduce the time required to complete the calculation in parallel by over an order of magnitude.
@HenrikBengtsson can you explain how the exports work in future.batchtools
? Or is the foreach connector the problem in example 1?
@benmarchi what are your conclusions regarding compression? By disabling compression you trade CPU time vs. file size. Are your examples performed on local SSDs? Have you tried to measure the elapsed time on a slower file system such as a HDD or a network file system?
@HenrikBengtsson can you explain how the exports work in
future.batchtools
? Or is the foreach connector the problem in example 1?
Without having gone through the details in the above comments (impressive comphrensive benchmarking though :thumbsup:), the short answer is that globals are exported to the batchtools registry using:
batchExport(export = future$globals, reg = reg)
If there are a lot of elements parallelized over, I also recommend limiting the number of jobs (from the default maximum +Inf
jobs) as:
plan(tweak(future.batchtools::batchtools_sge, workers = 24L, ...))
@benmarchi, if you were not aware of this, doing so is likely to speed up some of your futurized HPC jobs, especially when the processing time per element is not that large.
The long answer: What is worth knowing here is that with doFuture::registerDoFuture()
, a y <- foreach(x = X, ...) %dopar% { ... }
will chunk up the processing of X
into future::nbrOfWorkers()
chunks. There are ways to control the number of elements per chunk but I'll assume the default here.
Next, what is future::nbrOfWorkers()
when using the future.batchtools::batchtools_slurm
backend? The default number of workers is +Inf
. This is because of the "assumption" hat there is an infinite number of workers available on a job scheduler. The was a design decision I made way back(*). This is documented in help("batchtools_sge", package = "future.batchtools")
and can also be seen if calling nbrOfWorkers()
after setting the plan.
What does nbrOfWorkers() == +Inf
mean here? It means that the foreach(x = X, ...)
call with use an infinite number of chunks if needed, which in practice means, there will be one chunk per element in X
. In other words, there will be length(X)
chunks. Each chunk is processed by a unique future. Each future is submitted as a separate job on the scheduler(**). Thus, there will be nbrOfWorkers() == length(X)
future.batchtools jobs submitted to the scheduler in the default setup.
Now to global variables. Contrary to batchtools::batchMap()
, the different future.batchtools jobs do not share the same batchtools registry (folder). Instead, there will be one batchtools registry per job (= per future = per chunk) with its unique set of exported globals. Because of this, even when the different futures share the same identical global, they are treated independently and multiple copies will be exported. This is the number one main cost of using the future framework with future.batchtools over using batchtools::batchMap()
directly(*) - future.batchtools will create many more batchtools global files. Because of this, future.batchtools will probably gain more from non-compressed writing than plain batchtools.**
What more can be done? The one thing I always suggest is to specify the effective number of workers when setting up the plan (*), e.g. in this case:
plan(tweak(future.batchtools::batchtools_sge, workers = 24L, ...))
Does all this make sense in what's observed?
(*) An alternative would be to default to one (workers = 1
) worker, but that would effectively result in sequential processing (=one job). Another alternative would be to pick some large number, say, workers = 24
, but that would be too arbitrary. Hence, using workers = +Inf
as the default made most sense. While writing this, I realize another option would be to use no default, that is, require the user to specify the number of workers.
() future.batchtools** does not support job arrays.
(***) This is something I am aware of and want to address at some point but this is far into the future because the concept of shared globals across futures is something that must come in at the design of the Future API so that the same code will work regardless of future backend. I have ideas on how to address this but, again, it requires lots of time and carefulness.
@mllg I totally agree that there is a trade off between time and disk space (just for reference, in Example 1, the largest export file sizes are approximately 1.43 GB and 1.49 GB for the Compressed (Standard) and Uncompressed batchtools
, respectively; there is a bigger difference between Compressed and Uncompressed export file sizes in Examples 2 and 3 (~16.0 MB vs. ~295 MB)). For all practical purposes, I am not limited by disk space. So, I am more than willing to use a bit more to cut down on processing time. That being said, I understand that others are in different situations. That is why having export compression be a user option would be the best of both worlds in my opinion. All these examples were performed on an HPC cluster, with a networked file system (I don't know the specifics of the file system, but probably could find out if that would be helpful).
@HenrikBengtsson Thanks for the detailed information regarding workers
and future.batchtools
. Limiting workers
is definitely seems like a good strategy when dealing with jobs with many or large exports. In the examples I outlined above, length(X) = 10
. So there may be some potential to optimize the total processing time with respect to nbrOfWorkers()
, but I suspect that real gains would be seen when dealing with a large number of iterations. It was great to see you call out future.batchtools
creating a unique registry for each future
job. This is certainly the time limiting step in the examples I provided. Comparing Example 2 to Example 3 really shows how reducing the number of times global variables are exported affects total computation time (175.428 s vs. 52.820 s for length(X)
and 1
exports, respectively).
It is now possible to select the compression algorithm of saveRDS()
via the configuration option compress
(gzip (default), bzip2, xz, and FALSE for uncompressed) (c.f. https://github.com/mllg/batchtools/commit/1001440ab14e697a28c9d901bc976c7d5b9e404b).
Support for qs
will be included in a future release.
Thanks for adding this.
Will it be possible to set the default of these compress
argument(s)? That way the end-user would be able to change this too even if they have no access to the individual batchtools functions (e.g. when used inside a package)?
FYI, a while ago I did some more tests on qs
and I don't think it supports all R/SEXP objects, i.e. there could be a risk that it will only work for a subset of the types of globals than needs to be exported.
FYI, a while ago I did some more tests on qs and I don't think it supports all R/SEXP objects.
@HenrikBengtsson can you elaborate on this or provide an example? My understanding is that the intention is that qs
supports everything saveRDS
supports.
I knew I shouldn't have mentioned it since I don't recall the details and absolutely don't want to discredit qs
since its potentials are great and it looks like to their objective is to support everything that serialize()/unserialize()
supports. But look at their issue tracker and non-supported cases are popping up (and getting fixed), e.g. https://github.com/traversc/qs/issues/27. It could have been that I noticed that S4 objects were not supported when you first proposed qs
- they addressed that in qs 0.20.1 (2019-12-01). So to rephrase, it might be a while before all wrinkles have been sorted out so qs
can be considered a safe plug-in replacement. OTH, adding optional support for it to batchtools will help driving traffic and CPU test hours to qs
.
Ref: https://github.com/HenrikBengtsson/future.batchtools/issues/36
I and others are finding that the job-submission rate when using batchtools via future.batchtools can be quite slow.
Is it feasible to speed up the transmission of the files using the qs package? https://cran.r-project.org/web/packages/qs/index.html