Rdatatable / data.table

R's data.table package extends data.frame:
http://r-datatable.com
Mozilla Public License 2.0
3.59k stars 978 forks source link

implicit use of future with "by=" #3962

Open r2evans opened 4 years ago

r2evans commented 4 years ago

Is it possible to support implicit use of future plans, perhaps utilizing future.apply::future_lapply?

For an example:

data(iris)
setDT(iris)
iris[, lapply(.SD, mean), by = Species]
#       Species Sepal.Length Sepal.Width Petal.Length Petal.Width
# 1:     setosa        5.006       3.428        1.462       0.246
# 2: versicolor        5.936       2.770        4.260       1.326
# 3:  virginica        6.588       2.974        5.552       2.026

If I want to parallelize this with future, then I could:

library(future)
library(future.apply)
plan(multiprocess)
rbindlist(future_lapply(split(iris, iris$Species), function(x) x[, lapply(.SD,mean), by=Species]))
#       Species Sepal.Length Sepal.Width Petal.Length Petal.Width
# 1:     setosa        5.006       3.428        1.462       0.246
# 2: versicolor        5.936       2.770        4.260       1.326
# 3:  virginica        6.588       2.974        5.552       2.026

But if there were a way to enable internal use of future.apply::future_lapply in said action (or some other future-friendly operation), it could be much easier to code and read.

Thoughts for implementation:

  1. options(datatable.futureDT=TRUE) (default false)
  2. iris[, lapply(.SD, mean), by = Species, futureDT = TRUE]
  3. with_futureDT(iris[, lapply(.SD, mean), by = Species])

This is premised on the applicability of the future package. I know there are ramifications to use of parallel anything with data.table (e.g., reference semantics, etc), I believe this is not a trivial suggestion. (For instance, data.table use within future has not been without some hurdles, https://cran.r-project.org/web/packages/future/vignettes/future-4-issues.html under "Missing packages (false negatives)".)

Session Info ```r sessionInfo() # R version 3.5.3 (2019-03-11) # Platform: x86_64-w64-mingw32/x64 (64-bit) # Running under: Windows 10 x64 (build 18362) # Matrix products: default # locale: # [1] LC_COLLATE=English_United States.1252 # [2] LC_CTYPE=English_United States.1252 # [3] LC_MONETARY=English_United States.1252 # [4] LC_NUMERIC=C # [5] LC_TIME=English_United States.1252 # attached base packages: # [1] stats graphics grDevices utils datasets methods base # other attached packages: # [1] future.apply_1.2.0 future_1.12.0 data.table_1.12.2 # loaded via a namespace (and not attached): # [1] Rcpp_1.0.1 codetools_0.2-16 listenv_0.7.0 digest_0.6.18 # [5] crayon_1.3.4 dplyr_0.8.1 assertthat_0.2.1 R6_2.4.0 # [9] magrittr_1.5 evaluate_0.14 pillar_1.3.1 rlang_0.4.0 # [13] rmarkdown_1.13 tools_3.5.3 glue_1.3.1 purrr_0.2.5 # [17] parallel_3.5.3 compiler_3.5.3 xfun_0.8 pkgconfig_2.0.2 # [21] globals_0.12.4 htmltools_0.3.6 knitr_1.23 tidyselect_0.2.5 # [25] tibble_2.1.3 ```
jangorecki commented 4 years ago

I would say that handling split and apply behind the scene is not that bad option. We could eventually generalize to parLapply/future_lapply/anyCompatibleLapply. In general it is not needed because common functions calls are already parallelized, but not all obviously.

r2evans commented 4 years ago

"common functions calls are already parellelized", could you expand on this? Which common functions are already parallelized in this context?

jangorecki commented 4 years ago

At start finding groups defined in by is parallelized because it uses parallel function to find the order (forder). Then aggregate functions used in j can be parallelized too. The following issue is an initial work on that for sum and mean https://github.com/Rdatatable/data.table/issues/3042 I am not sure if/which others functions are optimised. Generally you should try to use GForce optimisation, which can be checked using verbose=TRUE. Then even single threaded should be very fast, and eventually it will get optimised to use multiple threads, if it isn't already.

jangorecki commented 4 years ago

So speaking about the API, extending your idea for a more generic version:

options(datatable.splitapplyrbind=c("parLapply","future_lapply","somy_my_apply"))
iris[, parLapply(.SD, mean), by = Species]
iris[, future_lapply(.SD, mean), by = Species]
iris[, some_my_apply(.SD, mean), by = Species]

In a this approach, if the first element of a j expression would match to a list of function names defined by an option, then instead of gforce/dogroups, we would redirect to rbindlist(some_my_apply(split.data.table(x, by=by)))

r2evans commented 4 years ago

I'm not certain if I framed it well, since I don't know that that code quite addresses what I am attempting. I'd like to be able to do some task on .SD (not column-wise), partitioned/parallelized by the by= field. So using the future:: version from your comment,

iris[, future_lapply(.SD, mean), by = Species]

I think this parallelizes by-column, so with sufficient cores, this means the mean will be computed as:

all grouped externally by Species (meaning this will happen three times each, since data.table would be partitioning by Species). In this specific example this might be the more efficient parallelization strategy, but I'm going for something else (with, as always, a different and more complicated real-world use-case :-).

What I'd like to be able to do is effectively

That's what the future_lapply(split(iris, iris$Species), ...) earlier was meant to indicate.

jangorecki commented 4 years ago

Agree, but don't expect to much speed up. Most of the tools that allows to parallelize computing are unfortunately not very efficient. It make sense to use them only on a really complex time consuming functions.


On my desktop machine:

library(data.table)
library(future.apply)
iris = as.data.table(iris)
iris[0] # eliminate overhead of the first `[` call
system.time(ans1<-iris[, lapply(.SD, mean), Species])
#   user  system elapsed
#  0.001   0.000   0.002 
system.time(ans2<-rbindlist(future_lapply(split(iris, by="Species"), function(dt) dt[, lapply(.SD, mean), .SDcols=1:4])))
#   user  system elapsed 
#  0.069   0.000   0.063
all.equal(ans1[,-1L], ans2)
#[1] TRUE
set.seed(108)
iris = iris[sample(.N, 1e7, TRUE)]
system.time(ans1<-iris[, lapply(.SD, mean), Species])
#   user  system elapsed 
#  0.931   0.118   0.582
system.time(ans2<-rbindlist(future_lapply(split(iris, by="Species"), function(dt) dt[, lapply(.SD, mean), .SDcols=1:4])))
#   user  system elapsed 
#  0.601   0.195   0.638
all.equal(ans1[,-1L], ans2)
#[1] TRUE

When trying on server environment to scale it up for 1e8 rows then future.apply failed with error

Error in getGlobalsAndPackages(expr, envir = envir, tweak = tweakExpression,  : 
  The total size of the 6 globals that need to be exported for the future expression...

In server environment 1e7 rows happened to be better for data.table (0.405s elapsed) vs future.apply (0.702s elapsed). In the default data.table setting of using only 50% of available cores. Using 100% of cores then data.table (0.298s) vs future.apply (0.664s).


Of course, as said above, everything depends on the function you need to compute, so there are surely many use cases for parallelizing UDF that data.table won't parallelize internally. Just putting some light on that, users should take care and evaluate their use case before introducing extra complexity from packages like parallel, foreach, future, etc.

r2evans commented 4 years ago

I agree about your comment on "efficiency": in my actual (not iris) use-case, I see a 4-5x speedup on an 8-core machine (even with data-transfer and other issues), which is actually good enough for me (processing can take on the order of tens of minutes even with parallelization).

If this is too "niche", I understand, I can certainly continue with doing it outside of data.table. Thanks!