r-spatial / stars

Spatiotemporal Arrays, Raster and Vector Data Cubes
https://r-spatial.github.io/stars/
Apache License 2.0
560 stars 94 forks source link

Implement futures for st_apply #124

Closed adrfantini closed 4 years ago

adrfantini commented 5 years ago

In #57, we discussed the possibility of a parallel st_apply, which was promptly introduced leveraging mcapply and parLapply. Lately, I have been moving more and more code to use the future package. Compared to using bare parallel, it offers a number of benefits. For example, future makes it extremely easy to run any parallel R code on any kind of platform, including HPC systems (via future.batchtools).

Here's a very interesting talk from the author of future, explaining the hows and the whys.

The package future.apply provides the future version of the apply family of functions, just like parallel provides mclapply. Following that, it would be awesome to have a future_st_apply (or st_future_apply?) function to leverage the capabilities of this wonderful package.

edzer commented 5 years ago

Why not build it into st_apply?

edzer commented 5 years ago

... and to who can I assign this? maybe @Nowosad ?

adrfantini commented 5 years ago

Why not build it into st_apply?

Usually the future version of standard functions is future_function. This applies to purrr::map (furrr::future_map), for example, as well as base::apply (future.apply::future_apply) and the likes. That's why I thought of st_future_apply (or future_st_apply). But yes, a future = TRUE option would also be totally fine.

Nowosad commented 5 years ago

I can take a look at it during the next weekend and next I will let you know if that's not too much for me.

karldw commented 5 years ago

Another option, borrowing from the drake package, would be a parallelism argument. (This is only useful if you think st_apply might support other parallel computing approaches in the future)

adrfantini commented 5 years ago

Personally I would prefer the flexibility of calling plan myself, however I admit I'm not familiar with drake and maybe I am misunderstanding how it implements futures. Separate functions would also more closely mimic what other packages are doing with future, i.e. creating future_* version of serial functions, rather than passing new options. However yes, your suggestion does provide additional flexibility for other parallel backends to be added in the future. I'm not sure however how likely it is this is going to happen any time soon.

Also, thanks for the link, I did not know about ZeroMQ.

karldw commented 5 years ago

No problem! I'm also no kind of expert in drake. drake requires you call plan yourself, but that plan is only used if you also specify parallelism="future". future_* functions also make a lot of sense.

adrfantini commented 5 years ago

No problem! I'm also no kind of expert in drake. drake requires you call plan yourself, but that plan is only used if you also specify parallelism="future". future_* functions also make a lot of sense.

Ah, I see, thanks. I guess this depends on which backends will be implemented. future will soonish implement a ZeroMQ backend too: https://github.com/HenrikBengtsson/future/issues/204 https://github.com/ropensci/drake/issues/561#issuecomment-451362719

The whole discussion in that second link is very interesting.

Nowosad commented 5 years ago

I've started working on adding the future.apply support. My first step was to implement a simple new option (https://github.com/Nowosad/stars/commit/38f311143c878a586dd894d06b585ed452732621) and test how it works. To my surprise, future_apply() does not improve calculations times and I have no idea why... I've tried it on a few examples, one of them is attached below.

library(stars)
#> Loading required package: abind
#> Loading required package: sf
#> Linking to GEOS 3.6.1, GDAL 2.3.2, PROJ 4.9.3
f = system.file("netcdf/avhrr-only-v2.19810902.nc", package = "starsdata")
y = read_stars(f)
#> sst, anom, err, ice,
#> Warning: ignoring unrecognized unit: percentage

# base stars --------------------------------------------------------------
system.time(st_apply(y, 1:2, mean))
#>    user  system elapsed 
#>  21.283   0.032  21.359

# parallel ----------------------------------------------------------------
library(parallel)
no_cores = detectCores() - 1
cl = makeCluster(no_cores)

system.time(st_apply(y, 1:2, mean, CLUSTER = cl))
#>    user  system elapsed 
#>  14.772   0.224  19.330

# future ------------------------------------------------------------------
library(future.apply)
#> Loading required package: future
plan(multiprocess)
system.time(st_apply(y, 1:2, mean, FUTURE = TRUE))
#>    user  system elapsed 
#>  35.000   2.153  40.284

Created on 2019-03-03 by the reprex package (v0.2.1)

Nowosad commented 5 years ago

I also tried a little bit more complex approach with replacing lapply() with future_lapply() - https://github.com/Nowosad/stars/commit/073842a23f21e4bfd527c4ba1690b0e1a20dd93d (future_lapply() does not work well with parApply()).

The results are interesting...

library(stars)
#> Loading required package: abind
#> Loading required package: sf
#> Linking to GEOS 3.6.1, GDAL 2.3.2, PROJ 4.9.3
library(future.apply)
#> Loading required package: future

f = system.file("netcdf/avhrr-only-v2.19810902.nc", package = "starsdata")
y = read_stars(f)
#> sst, anom, err, ice,
#> Warning: ignoring unrecognized unit: percentage

# base stars --------------------------------------------------------------
system.time(st_apply(y, 1:2, mean))
#>    user  system elapsed 
#>  21.574   0.046  21.663

# base stars with future --------------------------------------------------------------
plan(multiprocess)
system.time(st_apply(y, 1:2, mean))
#>    user  system elapsed 
#>   0.070   0.078   6.451

# parallel ----------------------------------------------------------------
library(parallel)
no_cores = detectCores() - 1
cl = makeCluster(no_cores)

system.time(st_apply(y, 1:2, mean, CLUSTER = cl))
#>    user  system elapsed 
#>  14.565   0.286  18.058

# future ------------------------------------------------------------------
system.time(st_apply(y, 1:2, mean, FUTURE = TRUE))
#>    user  system elapsed 
#>   0.087   0.064  16.247

Created on 2019-03-03 by the reprex package (v0.2.1)

adrfantini commented 5 years ago

If I understand it correctly, this is the usual difference between performing multicore computation on the different variables (outer lapply, second comment above) and on the array (inner apply, first comment). The former of course only works if you have multiple variables, which is why @edzer initially decided to parallelise only the inner loop.

I think the issue stems from the fact that you are looping over dimensions which have only 1 value. In fact, in this case adrop(y) whould have the same effect of st_apply(y, 1:2, mean) and be much much faster!

> dim(y)
   x    y zlev time 
1440  720    1    1 
Nowosad commented 5 years ago

Thanks for your comment @adrfantini. Could you prepare a better example (or examples) for testing the multi-core stars operation?

adrfantini commented 5 years ago

Thanks for your comment @adrfantini. Could you prepare a better example (or examples) for testing the multi-core stars operation?

Yessir, I'll do that tonight or tomorrow.

adrfantini commented 5 years ago

You could use starsdata to test.

library(stars)
#> Loading required package: abind
#> Loading required package: sf
#> Linking to GEOS 3.7.1, GDAL 2.3.2, PROJ 5.2.0
x = c(
"avhrr-only-v2.19810901.nc",
"avhrr-only-v2.19810902.nc",
"avhrr-only-v2.19810903.nc",
"avhrr-only-v2.19810904.nc",
"avhrr-only-v2.19810905.nc",
"avhrr-only-v2.19810906.nc",
"avhrr-only-v2.19810907.nc",
"avhrr-only-v2.19810908.nc",
"avhrr-only-v2.19810909.nc"
)
file_list = system.file(paste0("netcdf/", x), package = "starsdata")

(y = read_stars(file_list, quiet = TRUE) %>% adrop)

#> stars object with 3 dimensions and 4 attributes
#> attribute(s), summary of first 1e+05 cells:
#>    sst [C*°]       anom [C*°]      err [C*°]          ice       
#>  Min.   :-1.80   Min.   :-4.69   Min.   :0.110   Min.   :0.010  
#>  1st Qu.:-1.19   1st Qu.:-0.06   1st Qu.:0.300   1st Qu.:0.730  
#>  Median :-1.05   Median : 0.52   Median :0.300   Median :0.830  
#>  Mean   :-0.32   Mean   : 0.23   Mean   :0.295   Mean   :0.766  
#>  3rd Qu.:-0.20   3rd Qu.: 0.71   3rd Qu.:0.300   3rd Qu.:0.870  
#>  Max.   : 9.36   Max.   : 3.70   Max.   :0.480   Max.   :1.000  
#>  NA's   :13360   NA's   :13360   NA's   :13360   NA's   :27377  
#> dimension(s):
#>      from   to     offset  delta  refsys point values    
#> x       1 1440          0   0.25      NA    NA   NULL [x]
#> y       1  720         90  -0.25      NA    NA   NULL [y]
#> time    1    9 1981-09-01 1 days POSIXct    NA   NULL

# ----------- SERIAL -----------
system.time(st_apply(y, 1:2, mean))
#>    user  system elapsed 
#>  26.473   0.151  26.687

# ----------- PARALLEL -----------
library(parallel)
no_cores = detectCores() - 1
cl = makeCluster(no_cores)
system.time(st_apply(y, 1:2, mean, CLUSTER = cl))
#>    user  system elapsed 
#>   7.515   0.472  15.750

Created on 2019-03-03 by the reprex package (v0.2.1)

adrfantini commented 5 years ago

Can I ask why was this closed, @Edzer, @Nowosad? No plans to bring this forward? @Nowosad, did you test further with your implementation in your branch?

edzer commented 5 years ago

No plans from my side right now. Are you at EGU next week, @adrfantini ?

adrfantini commented 5 years ago

Are you at EGU next week, @adrfantini ?

Yep, I arrive on Monday night and leave about 12:00 on Friday.

edzer commented 5 years ago

I'll arrive Tue morning, leave Wed afternoon. Let's drink a coffee together!

Nowosad commented 5 years ago

@adrfantini @edzer I plan to work on this in a near future. Henrik Bengtsson explained why st_apply() was so slow in the examples (https://github.com/HenrikBengtsson/future.apply/issues/39) and proposed a solution - to add options(future.globals.maxSize = +Inf).

edzer commented 5 years ago

Great, feel free to reopen here.

adrfantini commented 5 years ago

I'll arrive Tue morning, leave Wed afternoon. Let's drink a coffee together!

Yup, see you there! We'll probably see each other there at some session, but if this does not happen I'll send you an email or something.

I plan to work on this in a near future.

Great, thanks! I really appreciate it. Opening again, then, I guess. If you need some testing with real world datasets just let me know.

tim-salabim commented 5 years ago

For what it's worth, I'll b at egu the whole week. Catching up would be great! I'll be sure to drop by your poster @edzer

Nowosad commented 5 years ago

@edzer @adrfantini Please take a look at https://github.com/r-spatial/stars/pull/166. It works as in the comparision below:

library(stars)
#> Loading required package: abind
#> Loading required package: sf
#> Linking to GEOS 3.6.1, GDAL 2.3.2, PROJ 4.9.3
x = c(
  "avhrr-only-v2.19810901.nc",
  "avhrr-only-v2.19810902.nc",
  "avhrr-only-v2.19810903.nc",
  "avhrr-only-v2.19810904.nc",
  "avhrr-only-v2.19810905.nc",
  "avhrr-only-v2.19810906.nc",
  "avhrr-only-v2.19810907.nc",
  "avhrr-only-v2.19810908.nc",
  "avhrr-only-v2.19810909.nc"
)
file_list = system.file(paste0("netcdf/", x), package = "starsdata")
y = read_stars(file_list, quiet = TRUE) %>% adrop()
#> Warning: ignoring unrecognized unit: percentage

#> Warning: ignoring unrecognized unit: percentage

#> Warning: ignoring unrecognized unit: percentage

#> Warning: ignoring unrecognized unit: percentage

#> Warning: ignoring unrecognized unit: percentage

#> Warning: ignoring unrecognized unit: percentage

#> Warning: ignoring unrecognized unit: percentage

#> Warning: ignoring unrecognized unit: percentage

#> Warning: ignoring unrecognized unit: percentage
system.time(st_apply(y, 1:2, mean))
#>    user  system elapsed 
#>  19.805   0.088  19.934
library(parallel)
no_cores = detectCores() - 1
cl = makeCluster(no_cores)
system.time(st_apply(y, 1:2, mean, CLUSTER = cl))
#>    user  system elapsed 
#>   6.259   0.366   9.837
library(future.apply)
#> Loading required package: future
plan(multisession, workers = no_cores)

system.time(st_apply(y, 1:2, mean, FUTURE = TRUE))
#>    user  system elapsed 
#>  10.094   0.200  14.256

Created on 2019-04-12 by the reprex package (v0.2.1)

adrfantini commented 5 years ago

Trying to download and test that from the train right now!

adrfantini commented 5 years ago

Sorry, having problems installing the latest polyclip version, I'll be back as soon as I manage to fix that.

adrfantini commented 5 years ago

Confirm working, but I am a bit baffled at the much slower performance compared to bare parallel (+ 30% runtime), even if setting options(future.globals.maxSize = +Inf). I'll test with more intensive workloads. @Nowosad , you have any explanation for this? Can this be attributed to the reasons mentioned in the issue you posted previously?

adrfantini commented 5 years ago

Interestingly, looping over different dimensions sometimes gives future the advantage. This is using only 2 cores, where future is competitive:

microbenchmark(st_apply(y, 1:2, mean, CLUSTER=cl), st_apply(y, 1:2, mean, FUTURE = TRUE), times = 10)
# Unit: seconds
#                                   expr      min       lq     mean   median       uq      max neval
#   st_apply(y, 1:2, mean, CLUSTER = cl) 20.66613 21.46560 25.51657 25.17390 29.71878 31.52102    10
#  st_apply(y, 1:2, mean, FUTURE = TRUE) 22.61443 23.31394 26.80674 26.51852 28.32064 35.27727    10

microbenchmark(st_apply(y, 1, mean, CLUSTER=cl), st_apply(y, 1, mean, FUTURE = TRUE), times = 10)
# Unit: seconds
#                                 expr      min       lq     mean   median       uq      max neval
#   st_apply(y, 1, mean, CLUSTER = cl) 5.027009 5.090493 5.344692 5.319925 5.641487 5.729059    10
#  st_apply(y, 1, mean, FUTURE = TRUE) 4.206007 4.259150 4.533820 4.362500 4.626024 5.240908    10

microbenchmark(st_apply(y, 2, mean, CLUSTER=cl), st_apply(y, 2, mean, FUTURE = TRUE), times = 10)
# Unit: seconds
#                                 expr      min       lq     mean   median       uq      max neval
#   st_apply(y, 2, mean, CLUSTER = cl) 4.302541 4.328051 4.477898 4.386149 4.677309 4.789973    10
#  st_apply(y, 2, mean, FUTURE = TRUE) 3.809556 4.018349 4.264969 4.301376 4.426772 4.581687    10

microbenchmark(st_apply(y, 3, mean, CLUSTER=cl), st_apply(y, 3, mean, FUTURE = TRUE), times = 10)
# Unit: seconds
#                                 expr      min       lq     mean   median       uq      max neval
#   st_apply(y, 3, mean, CLUSTER = cl) 4.660445 4.740116 4.837221 4.779131 4.914649 5.153446    10
#  st_apply(y, 3, mean, FUTURE = TRUE) 3.932313 3.982457 4.094957 4.080643 4.150643 4.419043    10

And this is with 4 cores, where plain parallel dominates:

microbenchmark(st_apply(y, 1:2, mean, CLUSTER=cl), st_apply(y, 1:2, mean, FUTURE = TRUE), times = 10)
#Unit: seconds
#                                  expr      min       lq     mean   median       uq      max neval
#  st_apply(y, 1:2, mean, CLUSTER = cl) 16.18076 16.33587 16.94545 16.43819 16.62316 21.36609    10
# st_apply(y, 1:2, mean, FUTURE = TRUE) 21.11023 21.20599 22.08562 21.48302 21.96687 25.43718    10

microbenchmark(st_apply(y, 1, mean, CLUSTER=cl), st_apply(y, 1, mean, FUTURE = TRUE), times = 10)
#Unit: seconds
#                                expr      min       lq     mean   median       uq      max neval
#  st_apply(y, 1, mean, CLUSTER = cl) 3.183550 3.231978 3.313934 3.285378 3.364793 3.553232    10
# st_apply(y, 1, mean, FUTURE = TRUE) 3.961406 3.992229 4.074901 4.087794 4.143539 4.180647    10

microbenchmark(st_apply(y, 2, mean, CLUSTER=cl), st_apply(y, 2, mean, FUTURE = TRUE), times = 10)
#Unit: seconds
#                                expr      min       lq     mean   median       uq      max neval
#  st_apply(y, 2, mean, CLUSTER = cl) 2.764441 2.823060 2.873365 2.868538 2.934442 2.981836    10
# st_apply(y, 2, mean, FUTURE = TRUE) 3.488604 3.553307 3.602525 3.594069 3.653607 3.733016    10

microbenchmark(st_apply(y, 3, mean, CLUSTER=cl), st_apply(y, 3, mean, FUTURE = TRUE), times = 10)
#Unit: seconds
#                                expr      min       lq     mean   median       uq      max neval
#  st_apply(y, 3, mean, CLUSTER = cl) 3.283144 3.311529 3.373661 3.346839 3.449870 3.529692    10
# st_apply(y, 3, mean, FUTURE = TRUE) 3.748531 3.775567 3.852514 3.808257 3.958022 4.006158    10
edzer commented 5 years ago

You're looking at things that take seconds; I think this becomes only really meaningful if you compare things that take minutes, hours or more. Also, I would suggest to keep the baseline (ncores = 1) in your comparisons, so you have an idea of speed-up, or even offset, coming from doing parallel vs. not doing so.

adrfantini commented 5 years ago

I'll for sure do more tests when I'm back at work.

adrfantini commented 5 years ago

I did more tests with a larger dataset on a node in our cluster (running oldish Xeon E5-2680 v2s). This is a 464*201*5844 file (4GB in RAM).

Notes:

library(stars)
#> Loading required package: abind
#> Loading required package: sf
#> Linking to GEOS 3.4.2, GDAL 2.3.1, PROJ 4.8.0
library(parallel)
library(future.apply)
#> Loading required package: future

y <- read_ncdf('pp_ens_mean_0.25deg_reg_1995-2010_v19.0e.nc')
# downloaded from: http://surfobs.climate.copernicus.eu/dataaccess/access_eobs_chunks.php
# direct link: https://www.ecad.eu/download/ensembles/data/Grid_0.25deg_reg_ensemble/pp_ens_mean_0.25deg_reg_1995-2010_v19.0e.nc
gc()
#>             used   (Mb) gc trigger    (Mb)   max used    (Mb)
#> Ncells    650235   34.8    1210854    64.7    1210854    64.7
#> Vcells 546217867 4167.4 1573116451 12002.0 1866121664 14237.4

st_dimensions(y)
#>           from   to                  offset  delta  refsys point values
#> longitude    1  464                   -40.5   0.25      NA    NA   NULL
#> latitude     1  201                   25.25   0.25      NA    NA   NULL
#> time         1 5844 1994-12-31 12:00:00 UTC 1 days POSIXct    NA   NULL
#>              
#> longitude [x]
#> latitude  [y]
#> time

# ====== Baseline serial
system.time(st_apply(y, 1:2, mean, na.rm = TRUE))
#>    user  system elapsed 
#>  14.356   1.346  15.741
system.time(st_apply(y, 3, mean, na.rm = TRUE))
#>    user  system elapsed 
#>  11.286   2.344  13.631

# ====== 1 core
no_cores = 1
plan(multiprocess, workers = no_cores)
cl = makeCluster(no_cores)

system.time(st_apply(y, 1:2, mean, CLUSTER=cl, na.rm = TRUE))
#>    user  system elapsed 
#>  14.337   3.483  33.696
system.time(st_apply(y, 1:2, mean, FUTURE = TRUE, na.rm = TRUE))
#>    user  system elapsed 
#>  14.798   2.436  17.282

system.time(st_apply(y, 3, mean, CLUSTER=cl, na.rm = TRUE))
#>    user  system elapsed 
#>  12.388   1.959  27.608
system.time(st_apply(y, 3, mean, FUTURE = TRUE, na.rm = TRUE))
#>    user  system elapsed 
#>  11.465   2.435  13.935

stopCluster(cl)

# ====== 20 cores
no_cores = 20
plan(multiprocess, workers = no_cores)
cl = makeCluster(no_cores)

system.time(st_apply(y, 1:2, mean, CLUSTER=cl, na.rm = TRUE))
#>    user  system elapsed 
#>  14.689   2.322  34.338
system.time(st_apply(y, 1:2, mean, FUTURE = TRUE, na.rm = TRUE))
#>    user  system elapsed 
#>   9.826   2.436  22.799

system.time(st_apply(y, 3, mean, CLUSTER=cl, na.rm = TRUE))
#>    user  system elapsed 
#>  12.954   7.057  34.468
system.time(st_apply(y, 3, mean, FUTURE = TRUE, na.rm = TRUE))
#>    user  system elapsed 
#>   8.540   5.460  23.463

stopCluster(cl)

Created on 2019-04-16 by the reprex package (v0.2.1)

adrfantini commented 5 years ago

If using a slightly more CPU-intensive function, summary, the outcome continues to favor single-core st_apply. Also I am a bit surprised at the difference between applying on 1:2 and on 3, which we did not see so strongly with mean.

library(stars)
#> Loading required package: abind
#> Loading required package: sf
#> Linking to GEOS 3.4.2, GDAL 2.3.1, PROJ 4.8.0                                                                                                                     
library(parallel)                                                                                                                                                    
library(future.apply)                                                                                                                                                
#> Loading required package: future                                                                                                                                  

y <- read_ncdf('pp_ens_mean_0.25deg_reg_1995-2010_v19.0e.nc')                                                                                                        
# downloaded from: http://surfobs.climate.copernicus.eu/dataaccess/access_eobs_chunks.php                                                                            
# direct link: https://www.ecad.eu/download/ensembles/data/Grid_0.25deg_reg_ensemble/pp_ens_summary_0.25deg_reg_1995-2010_v19.0e.nc                                  
gc()                                                                                                                                                                 
#>             used   (Mb) gc trigger    (Mb)   max used    (Mb)                                                                                                     
#> Ncells    650287   34.8    1206948    64.5    1206948    64.5                                                                                                     
#> Vcells 546218338 4167.4 1573116995 12002.0 1866122135 14237.4                                                                                                     

st_dimensions(y)                                                                                                                                                     
#>           from   to                  offset  delta  refsys point values                                                                                           
#> longitude    1  464                   -40.5   0.25      NA    NA   NULL                                                                                           
#> latitude     1  201                   25.25   0.25      NA    NA   NULL                                                                                           
#> time         1 5844 1994-12-31 12:00:00 UTC 1 days POSIXct    NA   NULL                                                                                           
#>                                                                                                                                                                   
#> longitude [x]                                                                                                                                                     
#> latitude  [y]                                                                                                                                                     
#> time                                                                                                                                                              

# ====== Baseline serial                                                                                                                                             
system.time(st_apply(y, 1:2, summary, na.rm = TRUE))                                                                                                                 
#>    user  system elapsed                                                                                                                                           
#>  30.502   0.948  31.520                                                                                                                                           
system.time(st_apply(y, 3, summary, na.rm = TRUE))                                                                                                                   
#>    user  system elapsed                                                                                                                                           
#> 418.881   1.089 420.923                                                                                                                                           

# ====== 1 core                                                                                                                                                      
no_cores = 1                                                                                                                                                         
plan(multiprocess, workers = no_cores)
cl = makeCluster(no_cores)

system.time(st_apply(y, 1:2, summary, CLUSTER=cl, na.rm = TRUE))
#>    user  system elapsed 
#>  14.903   3.566  50.241
system.time(st_apply(y, 1:2, summary, FUTURE = TRUE, na.rm = TRUE))
#>    user  system elapsed 
#>  30.098   3.161  33.342

system.time(st_apply(y, 3, summary, CLUSTER=cl, na.rm = TRUE))
#>    user  system elapsed 
#>  13.841   2.353 431.441
system.time(st_apply(y, 3, summary, FUTURE = TRUE, na.rm = TRUE))
#>    user  system elapsed 
#> 418.422   2.776 422.088

stopCluster(cl)

# ====== 20 cores
no_cores = 20
plan(multiprocess, workers = no_cores)
cl = makeCluster(no_cores)

system.time(st_apply(y, 1:2, summary, CLUSTER=cl, na.rm = TRUE))
#>    user  system elapsed 
#>  17.259   3.064  53.999
system.time(st_apply(y, 1:2, summary, FUTURE = TRUE, na.rm = TRUE))
#>    user  system elapsed 
#>  12.247   3.266  51.966

system.time(st_apply(y, 3, summary, CLUSTER=cl, na.rm = TRUE))
#>    user  system elapsed 
#>  15.337   7.847 487.414
system.time(st_apply(y, 3, summary, FUTURE = TRUE, na.rm = TRUE))
#>    user  system elapsed 
#>  14.352   2.845 621.959

stopCluster(cl)

Created on 2019-04-16 by the reprex package (v0.2.1)

HenrikBengtsson commented 5 years ago

I stumbled upon this issue from a back reference in one of my future issues. So, the "slowness" of the future solution is most likely due to the default wait time when polling for results. Reducing this wait time (option future.wait.interval) will lower the elapsed time, cf. https://github.com/HenrikBengtsson/future.apply/issues/41#issuecomment-535581882.

adrfantini commented 5 years ago

@HenrikBengtsson excellent, thanks. I repeated all the tests with and without that option, with the latest versions of everything. Here are the results:

library(stars)
library(parallel)
library(future.apply)
download.file('https://www.ecad.eu/download/ensembles/data/Grid_0.25deg_reg_ensemble/pp_ens_mean_0.25deg_reg_1995-2010_v20.0e.nc')
y <- read_ncdf('pp_ens_mean_0.25deg_reg_1995-2010_v20.0e.nc')

# ====== Baseline serial 
system.time(st_apply(y, 1:2, summary, na.rm = TRUE))
#   user  system elapsed
# 30.181   0.945  31.136
system.time(st_apply(y, 3, summary, na.rm = TRUE))
#   user  system elapsed
#412.335   1.063 413.484

# ====== 1 core                                                                                                                                                      
no_cores = 1                                                                                                                                                         
plan(multiprocess, workers = no_cores)
cl = makeCluster(no_cores)

system.time(st_apply(y, 1:2, summary, CLUSTER=cl, na.rm = TRUE))
#   user  system elapsed
# 16.883   4.377  52.133
system.time(st_apply(y, 1:2, summary, FUTURE = TRUE, na.rm = TRUE))
#   user  system elapsed
# 31.287   5.012  36.307

system.time(st_apply(y, 3, summary, CLUSTER=cl, na.rm = TRUE))
#   user  system elapsed
# 14.429   2.857 429.047
system.time(st_apply(y, 3, summary, FUTURE = TRUE, na.rm = TRUE))
#   user  system elapsed
#433.213   2.117 435.377
stopCluster(cl)

# ====== 20 cores
no_cores = 20
plan(multiprocess, workers = no_cores)
cl = makeCluster(no_cores)

system.time(st_apply(y, 1:2, summary, CLUSTER=cl, na.rm = TRUE))
#   user  system elapsed
#  16.37    3.57   25.61
system.time(st_apply(y, 1:2, summary, FUTURE = TRUE, na.rm = TRUE))
#   user  system elapsed
# 12.494   5.850  19.246

system.time(st_apply(y, 3, summary, CLUSTER=cl, na.rm = TRUE))
#   user  system elapsed
# 15.249   6.411  49.062
system.time(st_apply(y, 3, summary, FUTURE = TRUE, na.rm = TRUE))
#   user  system elapsed
#  9.900   5.819  40.268

stopCluster(cl)

# ====== 20 cores (future no wait time)
no_cores = 20
options(future.wait.interval=0L)
plan(multiprocess, workers = no_cores)

system.time(st_apply(y, 1:2, summary, FUTURE = TRUE, na.rm = TRUE))
#   user  system elapsed
# 10.435   6.153  18.643

system.time(st_apply(y, 3, summary, FUTURE = TRUE, na.rm = TRUE))
#   user  system elapsed
#  9.257   4.721  38.804

So right now it seems to have no impact on this workload. But notice that I had to repeat the last two runs (with wait time set to 0) since the first time they gave me errors:

# Error: Failed to retrieve the result of MulticoreFuture (<none>) from the forked worker (on localhost; PID 62061). Post-mortem diagnostic: No process exists with this PID, i.e. the forked localhost worker is no longer alive.

for the first and subsequently

Error in mcfork(detached) : 
  unable to fork, possible reason: Cannot allocate memory

for the second.

Nowosad commented 5 years ago

@adrfantini correct me if I am reading it wrong, but the newest recalculations have way shorter times for FUTURE=TRUE. For example 19.246 vs 51.966, and 40.268 vs 621.959 (comparison between https://github.com/r-spatial/stars/issues/124#issuecomment-540476633 and https://github.com/r-spatial/stars/issues/124#issuecomment-483790925)

[CC: @HenrikBengtsson]

adrfantini commented 5 years ago

@Nowosad You are not mistaken, timings are much faster now. This was on the same cluster, probably not on the same node but they should be all equal. So the only difference should be the version of the packets and of R (now 3.5.1 on this machine). Feel free to try it yourself with the same file!

Nowosad commented 5 years ago
# remotes::install_github("r-spatial/stars")
# remotes::install_github("HenrikBengtsson/future.apply")
library(stars)
## Loading required package: abind
## Loading required package: sf
## Linking to GEOS 3.7.1, GDAL 2.3.2, PROJ 5.2.0
library(parallel)
library(future.apply)
## Loading required package: future
# download.file('https://www.ecad.eu/download/ensembles/data/Grid_0.25deg_reg_ensemble/pp_ens_mean_0.25deg_reg_1995-2010_v20.0e.nc', destfile = 'pp_ens_mean_0.25deg_reg_1995-2010_v20.0e.nc')
y <- read_ncdf('pp_ens_mean_0.25deg_reg_1995-2010_v20.0e.nc')
## no 'var' specified, using pp
## other available variables:
##  latitude, longitude, time
## No projection information found in nc file. 
##  Coordinate variable units found to be degrees, 
##  assuming WGS84 Lat/Lon.
# ====== Baseline serial 
system.time(st_apply(y, 1:2, summary, na.rm = TRUE))
##    user  system elapsed 
##  21.499   1.682  23.225
system.time(st_apply(y, 3, summary, na.rm = TRUE))
##    user  system elapsed 
## 231.468   0.713 232.715
# ====== 1 core                                                                                                                                                      
no_cores = 1                                                                                                                                                         
plan(multiprocess, workers = no_cores)
## Warning: [ONE-TIME WARNING] Forked processing ('multicore') is disabled
## in future (>= 1.13.0) when running R from RStudio, because it is
## considered unstable. Because of this, plan("multicore") will fall
## back to plan("sequential"), and plan("multiprocess") will fall back to
## plan("multisession") - not plan("multicore") as in the past. For more
## details, how to control forked processing or not, and how to silence this
## warning in future R sessions, see ?future::supportsMulticore
cl = makeCluster(no_cores)

system.time(st_apply(y, 1:2, summary, CLUSTER=cl, na.rm = TRUE))
##    user  system elapsed 
##  15.444   2.206  30.361
system.time(st_apply(y, 1:2, summary, FUTURE = TRUE, na.rm = TRUE))
##    user  system elapsed 
##  21.589   1.749  23.379
system.time(st_apply(y, 3, summary, CLUSTER=cl, na.rm = TRUE))
##    user  system elapsed 
##   9.946   1.352 230.313
system.time(st_apply(y, 3, summary, FUTURE = TRUE, na.rm = TRUE))
##    user  system elapsed 
## 234.551   0.796 235.895
stopCluster(cl)

# ====== 6 cores
no_cores = 6
plan(multiprocess, workers = no_cores)
cl = makeCluster(no_cores)

system.time(st_apply(y, 1:2, summary, CLUSTER=cl, na.rm = TRUE))
##    user  system elapsed 
##  15.124   0.606  21.485
system.time(st_apply(y, 1:2, summary, FUTURE = TRUE, na.rm = TRUE))
##    user  system elapsed 
##  15.432   0.631  22.026
system.time(st_apply(y, 3, summary, CLUSTER=cl, na.rm = TRUE))
##    user  system elapsed 
##  10.669   0.761  70.032
system.time(st_apply(y, 3, summary, FUTURE = TRUE, na.rm = TRUE))
##    user  system elapsed 
##  11.351   0.730  71.124
stopCluster(cl)

# ====== 6 cores (future no wait time)
no_cores = 6
options(future.wait.interval=0L)
plan(multiprocess, workers = no_cores)

system.time(st_apply(y, 1:2, summary, FUTURE = TRUE, na.rm = TRUE))
##    user  system elapsed 
##  14.996   0.574  21.805
system.time(st_apply(y, 3, summary, FUTURE = TRUE, na.rm = TRUE))
##    user  system elapsed 
##  11.218   0.781  70.483
edzer commented 4 years ago

since we have futures implemented in st_apply, I'm closing here.