mapme-initiative / mapme.biodiversity

Efficient analysis of spatial biodiversity datasets for global portfolios
https://mapme-initiative.github.io/mapme.biodiversity/dev
GNU General Public License v3.0
33 stars 7 forks source link

A different approach to parallelization with mapme.biodiversity #135

Closed fBedecarrats closed 1 year ago

fBedecarrats commented 1 year ago

TL;DR: This issue describes an alternative way to specify cores with mapme.biodiversity that uses the future package backend. I include benchmarks for different configurations and numbers of parallel workers.

Demo: Usually we do the following with cores = n:

library(dplyr)
library(mapme.biodiversity)

my_porfolio <- system.file("extdata", "sierra_de_neiba_478140_2.gpkg", package = "mapme.biodiversity") %>%
    sf::read_sf() %>%
    init_portfolio( years = 2016:2017,
      cores = 4) %>%
    get_resources(resources = c("gfw_treecover", "gfw_lossyear", "gfw_emissions")) %>%
    calc_indicators("treecover_area_and_emissions", min_size = 1, min_cover = 30)

But the following (with cores = "future") works and is likely more consistent and flexible across configurations, including windows:

library(dplyr)
library(mapme.biodiversity)
library(future.apply)

my_porfolio <- system.file("extdata", "sierra_de_neiba_478140_2.gpkg", 
                           package = "mapme.biodiversity") %>%
  sf::read_sf() %>%
  init_portfolio( years = 2016:2017,
                  cores = "future") %>% # Here we rely on the future backend
  get_resources(resources = c("gfw_treecover", "gfw_lossyear", 
                              "gfw_emissions"))
plan(multisession, workers = 4)
my_portfolio <- calc_indicators(x = my_porfolio,
                                indicators = "treecover_area_and_emissions", 
                                min_size = 1, min_cover = 30)

How does it work? With the package mapme.biodiversity, we usually define a number of cores to use for parallelization with init_portfolio(cores = n) where n is an integer. Under the hood, the cores argument is stored and passed at each subsequent calc_indicators() call to the pbapply::pbapply(cl = n) function, see the code of mapme.biodiversity::calc_indicators(), lines 89-92.

Since its 1.7 version from January 2023 of pbapply, this package accepts a different way to specify the number of parallelization workers, with pbapply::pbapply(cl = "future"). When doing so, pbapply uses the future backend (see the future documentation).

I think that this approach has several advantages:

Below I will add two more comments:

fBedecarrats commented 1 year ago

The approach described above works out of the box as soon as the users has pbapply version >= 1.7 and has installed future.apply. If we want to streamline the option described above, 2 modifications could be made to the mapme.biodiversity package:

For the latter, a simple solution could be to replace in R/calc_indicator.R lines 87-97:

  if (processing_mode == "asset") {
    # apply function with parameters and add hidden id column
    results <- pbapply::pblapply(1:nrow(x), function(i) {
      .prep_and_compute(x[i, ], params, i)
    }, cl = cores)
  }

by:

if (processing_mode == "asset") {
  if (cores = "future") {
    # apply function with parameters and add hidden id column
    results <- pbapply::pblapply(1:nrow(x), function(i) {
      .prep_and_compute(x[i, ], params, i)
    }, cl = cores, future.seed = TRUE)
  } else {
    # apply function with parameters and add hidden id column
    results <- pbapply::pblapply(1:nrow(x), function(i) {
      .prep_and_compute(x[i, ], params, i)
    }, cl = cores)

  }
}

Note that I have not tested this solution yet. Maybe future.seed = TRUE can be added even when future is not called in pbapply().

goergen95 commented 1 year ago

Hi! This is awesome!!! I would opt to move {future} to the Suggests list of the package and check in init_portfolio() if the namespace is available if one specifies cores="future". We should also prominently put this feature in the README or some vignette. I would also very much like to test this extensively on windows, so hopefully we can close #15 with these changes.

melvinhlwong commented 1 year ago

@Shirobakaidou Could you try this and report back? Thanks @fBedecarrats ! We will discuss this with Darius and Johannes today.

fBedecarrats commented 1 year ago

Below I provide some benchmarks with processing times for a subset and then all terrestrial Madagascar protected areas (note that I use another source than WDPA, that I made available on this repo a while ago):

n (number of cores/workers) init_portfolio(cores = n) init_portfolio(cores = "future") plan(multisession, workers = n) init_portfolio(cores = "future") plan(cluster, workers = n)
4 59m 48s    
8 43m 47s 41m 21s 40m 52s
16 37m 56s 36m 21s 37m 18s
24 34m 20s 34m 02s 35m 31s

The tests were made on SSP Cloud, a data processing platform opened to data scientist from the French public sector. It is a Kubernetes instance hosted by the French statistical institute INSEE, with Onyxia running on top as a user interface.

This code was used for the tests without future:

library(tictoc)
library(dplyr)
library(sf)
library(mapme.biodiversity)

my_shp <- "https://github.com/mapme-initiative/mapme.biodiversity/files/9746104/AP_Vahatra_shp.zip"
download.file(my_shp, destfile = "Vahatra98AP.zip")
unzip("Vahatra98AP.zip", exdir = "Vahatra")
PA_mada <- st_read("Vahatra/AP_Vahatra.shp", quiet = TRUE) %>%
  # Il manque la projection (pas de fichier .prj), on la spécifie à la main
  st_set_crs("EPSG:4326")

# Discard points and cast multipolygons as polygons
PA_poly <- PA_mada %>%
  st_make_valid() %>%
  filter(st_geometry_type(.) == "MULTIPOLYGON") %>%
  st_cast("POLYGON")

# Subset on small PAs
PA_poly_small <- PA_poly 

# Test with 4 cores
PA_poly_small <- init_portfolio(x = PA_poly_small, 
                                years = 2000:2020,
                                outdir = "data_s3/mapme",
                                cores = 4,
                                add_resources = TRUE,
                                verbose = TRUE)
# Get GFW data
PA_poly_small <- get_resources(x = PA_poly_small, 
                               resources = c("gfw_treecover", "gfw_lossyear", 
                                             "gfw_emissions"))
# With 8 cores (without portfolio initialization)
start_4 <- tic()
PA_poly_small <- calc_indicators(x = PA_poly_small,
                                 indicators = "treecover_area_and_emissions", 
                                 min_cover = 10, min_size = 1)
duration_4 <- toc()
# With 8 cores 
start_8 <- tic()
PA_poly_small_8 <- init_portfolio(x = PA_poly_small, 
                                  years = 2000:2020,
                                  outdir = "data_s3/mapme",
                                  cores = 8,
                                  add_resources = TRUE,
                                  verbose = TRUE)
PA_poly_small_8 <- calc_indicators(x = PA_poly_small_8,
                                   indicators = "treecover_area_and_emissions", 
                                   min_cover = 10, min_size = 1)
# elapsed=43m 47s

# With 16 cores
PA_poly_small_16 <- init_portfolio(x = PA_poly_small, 
                                   years = 2000:2020,
                                   outdir = "data_s3/mapme",
                                   cores = 16,
                                   add_resources = TRUE,
                                   verbose = TRUE)
PA_poly_small_16 <- calc_indicators(x = PA_poly_small_16,
                                    indicators = "treecover_area_and_emissions", 
                                    min_cover = 10, min_size = 1)
# elapsed=37m 56s

# With 24 cores
PA_poly_small_24 <- init_portfolio(x = PA_poly_small, 
                                   years = 2000:2020,
                                   outdir = "data_s3/mapme",
                                   cores = 24,
                                   add_resources = TRUE,
                                   verbose = TRUE)
PA_poly_small_24 <- calc_indicators(x = PA_poly_small_24,
                                    indicators = "treecover_area_and_emissions", 
                                    min_cover = 10, min_size = 1)
# elapsed=34m 20s

duration_4$callback_msg # Without init_portfolio icnluded in timer
# [1] "3588.133 sec elapsed"
duration_8$callback_msg
# [1] "2626.842 sec elapsed"
duration_16$callback_msg
# [1] "2275.712 sec elapsed"
duration_24$callback_msg
# [1] "2060.465 sec elapsed"

This code was used for the tests with future:

remove.packages("mapme.biodiversity")
# the general version
remotes::install_github("https://github.com/mapme-initiative/mapme.biodiversity")
# my fork
# remotes::install_github("https://github.com/fBedecarrats/mapme.biodiversity/tree/new_parallel")
install.packages("tictoc")
install.packages('future.apply')

library(tictoc)
library(dplyr)
library(sf)
library(future)
library(mapme.biodiversity)

my_shp <- "https://github.com/mapme-initiative/mapme.biodiversity/files/9746104/AP_Vahatra_shp.zip"
download.file(my_shp, destfile = "Vahatra98AP.zip")
unzip("Vahatra98AP.zip", exdir = "Vahatra")
PA_mada <- st_read("Vahatra/AP_Vahatra.shp", quiet = TRUE) %>%
  # Il manque la projection (pas de fichier .prj), on la spécifie à la main
  st_set_crs("EPSG:4326")

# Discard points and cast multipolygons as polygons
PA_poly <- PA_mada %>%
  st_make_valid() %>%
  filter(st_geometry_type(.) == "MULTIPOLYGON") %>%
  st_cast("POLYGON")

# Subset on small PAs
PA_poly_small <- PA_poly %>%
  filter(hectars <= 50000)

PA_poly_small_future <- init_portfolio(x = PA_poly_small,
                                       years = 2000:2020,
                                       outdir = "/home/onyxia/work/new_test/my_future",
                                       tmpdir = "/home/onyxia/work/new_test/tmp",
                                       add_resources = TRUE,
                                       verbose = TRUE,
                                       cores = "future")
# Get GFW data
PA_poly_small_future <- get_resources(x = PA_poly_small_future, 
                               resources = c("gfw_treecover", "gfw_lossyear", 
                                             "gfw_emissions"))
dir.create("my_future/test")

plan(sequential)
plan(multicore, workers = 16)
PA_poly_small_future <- calc_indicators(x = PA_poly_small_future,
                                        indicators = "treecover_area_and_emissions",
                                        min_cover = 10, min_size = 1)
# Warning message:
#   In supportsMulticoreAndRStudio(...) :
#   [ONE-TIME WARNING] Forked processing ('multicore') is not supported when 
#   running R from RStudio because it is considered unstable. For more details, 
#   how to control forked processing or not, and how to silence this warning in 
#   future R sessions, see ?parallelly::supportsMulticore

# elapsed=03m 18s
plan(sequential)
plan(multicore, workers = 8)
PA_poly_small_future <- calc_indicators(x = PA_poly_small_future,
                                        indicators = "treecover_area_and_emissions",
                                        min_cover = 10, min_size = 1)
# elapsed=03m 14s
plan(sequential)
PA_poly_small_future <- calc_indicators(x = PA_poly_small_future,
                                        indicators = "treecover_area_and_emissions",
                                        min_cover = 10, min_size = 1)
# elapsed=03m 13s
plan(sequential)
plan(multisession, workers = 16)
PA_poly_small_future <- calc_indicators(x = PA_poly_small_future,
                                        indicators = "treecover_area_and_emissions",
                                        min_cover = 10, min_size = 1)
# elapsed=01m 49s
plan(sequential)
plan(multisession, workers = 4)
PA_poly_small_future <- calc_indicators(x = PA_poly_small_future,
                                        indicators = "treecover_area_and_emissions",
                                        min_cover = 10, min_size = 1)
# elapsed=02m 23s
plan(sequential)
plan(cluster, workers = 8)
PA_poly_small_future <- calc_indicators(x = PA_poly_small_future,
                                        indicators = "treecover_area_and_emissions",
                                        min_cover = 10, min_size = 1)
# 100% elapsed=01m 46s
plan(sequential)
plan(cluster, workers = 16)
PA_poly_small_future <- calc_indicators(x = PA_poly_small_future,
                                        indicators = "treecover_area_and_emissions",
                                        min_cover = 10, min_size = 1)
# elapsed=01m 50s
###########################################################################

PA_poly <- init_portfolio(x = PA_poly,
                           years = 2000:2020,
                           outdir = "my_future",
                           tmpdir = "tmp",
                           add_resources = TRUE,
                           verbose = TRUE,
                           cores = "future")

plan(sequential)
plan(multisession, workers = 16)
PA_poly<- calc_indicators(x = PA_poly,
                          indicators = "treecover_area_and_emissions",
                          min_cover = 10, min_size = 1)
# elapsed=36m 21s
plan(sequential)
plan(multisession, workers = 8)
PA_poly <- calc_indicators(x = PA_poly,
                           indicators = "treecover_area_and_emissions",
                           min_cover = 10, min_size = 1)
# elapsed=41m 21s
plan(sequential)
plan(cluster, workers = 16)
PA_poly<- calc_indicators(x = PA_poly,
                          indicators = "treecover_area_and_emissions",
                          min_cover = 10, min_size = 1)
# elapsed=37m 18s
plan(sequential)
plan(cluster, workers = 8)
PA_poly<- calc_indicators(x = PA_poly,
                          indicators = "treecover_area_and_emissions",
                          min_cover = 10, min_size = 1)
# elapsed=40m 52s
plan(sequential)
plan(multisession, workers = 24)
PA_poly<- calc_indicators(x = PA_poly,
                          indicators = "treecover_area_and_emissions",
                          min_cover = 10, min_size = 1)
# elapsed=34m 02s
plan(sequential)
plan(cluster, workers = 24)
PA_poly<- calc_indicators(x = PA_poly,
                          indicators = "treecover_area_and_emissions",
                          min_cover = 10, min_size = 1)
# elapsed=35m 31s
Jo-Schie commented 1 year ago

Thanks @fBedecarrats for this approach. We have discussed it and I think there are two things we would need to test:

I think we will need a bit of time and testing to dig into this.

fBedecarrats commented 1 year ago
  • What are the possible downsides for performance? Not all geoprocessing parts of the calc_indicator function are done in parallel. Parallelizing over the whole function may also have negative performance outcomes if eg a lot of unnecessary or repetitive clipping or reclassification etc is done. It depends very much on the inner workings of the function.

This approach doesn't change the package behaviour regarding which calculation get parallelized and which calculations remain sequential (single core). This approach only handles the already parallelized cases with a different backend.

fBedecarrats commented 1 year ago
  • What are the possible downsides for performance? Not all geoprocessing parts of the calc_indicator function are done in parallel. Parallelizing over the whole function may also have negative performance outcomes if eg a lot of unnecessary or repetitive clipping or reclassification etc is done. It depends very much on the inner workings of the function.

This approach doesn't change the package behaviour regarding which calculation get parallelized and which calculations remain sequential (single core). This approach only handles the already parallelized cases with a different backend.

Note that from the above benchmarks, this doesn't affect performance.

fBedecarrats commented 1 year ago
  • Is it really increasing stability (if yes it has to be preferred)

The problem is that the previous freeze or slowdown where appearing somewhat randomly and I am unable to purposedly reproduce these occurrence, altough @karpfen and @Shirobakaidou indicated they had similar experiences (cf. https://github.com/openkfw/mapme.pa-impact-evaluation/issues/8 or #90). In any case, I don't think that it should be necessarily prefered. The idea is to have an alternative if users encounter issues.

goergen95 commented 1 year ago

I agree that the current suggestion by @fBedecarrats simply introduces another backend to the already implemented simplistic parallelization strategy (that is, we simply parallelize over assets in a portfolio). For that reason, I do not see any issues with including the proposed changes. Looking at the bigger picture, the future package is the way to go if we would like to improve our approach to parallelization. It provides a fairly straight-forward interface to parallelize not only on a local machine but also across multiple machines/clusters. Also, it supports a pattern of nested parallelization, that could be interesting for mapme.biodiversity. However, both, parallelization across multiple machines and nested parallelization, is not trivial to set up from a package development perspective. That applies to any package in general and specifically for mapme.biodiversity since we deal with very large datasets in the background which are only available on the host machine. I still think that it is worth exploring the possibilities that future provides because we want to support the most efficient processing for global portfolios possible.

fBedecarrats commented 1 year ago

@goergen95 I'd ne happy to think about it with you. I think that the furrr package provides a nice API to vectorize and parallelize operations with future. Maybe you could point out one of the indicator calculation formula that could try to refractor with this approach, as a proof of concept.

goergen95 commented 1 year ago

I would be glad if we can figure something out together! Parallelization on the indicator level is just one out of three levels of parallelization I have in mind and it is the one where I do not expect much efficiency gains (except maybe vector-raster operations).

We could think about parallelization on these levels:

{future} would allow us to nest these levels of parallelization. The difficult part will be to figure out how to orchestrate a nested parallelization pattern safely, possibly across multiple machines, and only sending the necessary data to the child processes.

fBedecarrats commented 1 year ago

Great! The idea is to replace the for loops by nested future_map() functions that are vectorized and parallelized and let the packages furrr and future do the magic for orchestrating the calculations. I would like to provide a reproducible example, but first can you please clarify what you mean by "asset level" in your previous message? Thanks in advance!

goergen95 commented 1 year ago

Below is a minimal PoC of what I currently have in mind. Note, that in that case we actually don't get any speed-up compared to setting plan(sequential). I guess that is because of the overhead of initiating the new processes and that we will only start to see speed improvements with substantially larger number of polygons. The advantage of the proposed approach is that we minimize the data transferred from parent process to the child processes. So in principal, this approach should be able to parallelize between different machines reducing the amount of data transferred between them to a minimum. A downside of this specific example is that we actually query the remote data source twice per polygon. I am not sure about this one, since I don't think that pulling the data should move up one position in the processing chain because than we would have to transfer it to the child processes... Also, it could be a way forward for the package to rely on cloud native geospatial formats (such as COGs and flatgeobuf) because with this approach we only query the actual needed data and don't write persistent copies on disk. The DEM I am using below is actually about ~200GB in size but since it is supplied as a COG, {terra} reads only the proportion of the data that intersect with the polygons.

library(terra)
library(sf)
library(future)
library(tibble)
library(dplyr)
library(furrr)

# construct globally distributed polygons
(aois <- c(
  "POLYGON ((-127.4604 59.78938, -127.1124 59.79392, -127.1237 60.13207, -127.4339 60.13207, -127.4604 59.78938))",
  "POLYGON ((-91.62243 15.84818, -91.52449 15.84076, -91.52375 15.95687, -91.60944 15.95353, -91.62243 15.84818))",
  "POLYGON ((-48.37485 -13.55735, -48.61121 -13.56249, -48.60864 -13.68581, -48.362 -13.71664, -48.37485 -13.55735))",
  "POLYGON ((34.54681 1.639019, 34.87915 1.663101, 34.92249 1.889474, 34.68649 1.932822, 34.54681 1.639019))",
  "POLYGON ((120.3057 46.61764, 120.6169 46.65425, 120.5986 46.86782, 120.3789 46.84341, 120.3057 46.61764))",
  "POLYGON ((151.2656 -32.10844, 151.6211 -32.09528, 151.608 -31.8056, 151.371 -31.8056, 151.2656 -32.10844))"
) |>
    st_as_sfc(crs = st_crs(4326)) |>
    st_as_sf())
#> Simple feature collection with 6 features and 0 fields
#> Geometry type: POLYGON
#> Dimension:     XY
#> Bounding box:  xmin: -127.4604 ymin: -32.10844 xmax: 151.6211 ymax: 60.13207
#> Geodetic CRS:  WGS 84
#>                                x
#> 1 POLYGON ((-127.4604 59.7893...
#> 2 POLYGON ((-91.62243 15.8481...
#> 3 POLYGON ((-48.37485 -13.557...
#> 4 POLYGON ((34.54681 1.639019...
#> 5 POLYGON ((120.3057 46.61764...
#> 6 POLYGON ((151.2656 -32.1084...
aois$id <- 1:nrow(aois)
# print areas
units::set_units(st_area(aois), ha)
#> Units: [ha]
#> [1] 69344.81 12106.23 40886.77 92429.55 48699.86 91747.18
# get centroids
centroids <- st_centroid(aois)
#> Warning: st_centroid assumes attributes are constant over geometries
# read the COG file from openlandmap, the true file size is about ~200 GB
dem_url <- "/vsicurl/https://s3.eu-central-1.wasabisys.com/openlandmap/dtm/dtm.bareearth_ensemble_p10_30m_s_2018_go_epsg4326_v20230210.tif"
(dem <- rast(dem_url))
#> class       : SpatRaster 
#> dimensions  : 524999, 1296000, 1  (nrow, ncol, nlyr)
#> resolution  : 0.0002777778, 0.0002777778  (x, y)
#> extent      : -180, 180, -62.00056, 83.8325  (xmin, xmax, ymin, ymax)
#> coord. ref. : lon/lat WGS 84 (EPSG:4326) 
#> source      : dtm.bareearth_ensemble_p10_30m_s_2018_go_epsg4326_v20230210.tif 
#> name        : dtm.bareearth_ensemble_p10_30m_s_2018_go_epsg4326_v20230210
# plot dem and position of polygons
plot(dem, colNA = "steelblue", smooth = TRUE, range = c(-1e2, 1e4))
plot(centroids, col="red", add = TRUE, pch = 4, cex = 2)

# define example indicator functions
# mean height
dem_mean <- function(poly, dem_url){
  rast(dem_url) |>
    crop(poly) |>
    mask(poly) |>
    global("mean", na.rm = TRUE) |>
    as.numeric() -> dem_mean
  tibble(parameter = "dem",
         value = dem_mean,
         child.pid =  Sys.getpid()
  )
}
# mean tri
tri_mean <- function(poly, dem_url){
  rast(dem_url) |>
    crop(poly) |>
    mask(poly) |>
    terrain(v = "TRI") |>
    global("mean", na.rm = TRUE) |>
    as.numeric() -> tri_mean
  tibble(parameter = "tri",
         value = tri_mean,
         child.pid =  Sys.getpid()
  )
}

# define main function
main <- function(portfolio, indicators, dem_url){
  future_map_dfr(indicators,
                           function(indicator, portfolio, dem_url){
                             fun <- switch(indicator,
                                           "dem" = dem_mean,
                                           "tri" = tri_mean)
                             portfolio <- portfolio |> dplyr::group_split(id)
                             result <- future_map_dfr(portfolio,
                                                      function(poly, fun, dem_url){
                                                        fun(poly, dem_url)
                                                      }, fun, dem_url)
                             result$parent.id <- Sys.getpid()
                             result
                           }, portfolio, dem_url, .options=furrr_options(
                             packages = c("terra", "sf")
                           )
  )
}

# execute in sequence and parallel
plan(list(tweak(multisession, workers = 2), tweak(multisession, workers = 3)))
system.time(out <- main(aois, c("dem", "tri"), dem_url))
#>    user  system elapsed 
#>   1.347   0.061  26.588
plan(sequential)

out
#> # A tibble: 12 × 4
#>    parameter   value child.pid parent.id
#>    <chr>       <dbl>     <int>     <int>
#>  1 dem        814.       14746     14630
#>  2 dem       2522.       14746     14630
#>  3 dem        574.       14748     14630
#>  4 dem       1506.       14748     14630
#>  5 dem        883.       14747     14630
#>  6 dem       1061.       14747     14630
#>  7 tri          3.05     14902     14631
#>  8 tri          7.92     14902     14631
#>  9 tri          4.62     14901     14631
#> 10 tri          4.51     14901     14631
#> 11 tri          3.96     14903     14631
#> 12 tri          6.62     14903     14631

Created on 2023-02-19 with reprex v2.0.2

fBedecarrats commented 1 year ago

Great work @goergen95 ! This is the kind of nexting of future_map function I had in mind. The performance increase rapidly gets visible and significant when we increase the number of assets. I made some benchmarks that I summerise below, before including the whole sequence of tests and results.

Number of assets Sequential cluster 4x4
6 19.5 36.4
60 191.9 57.3
600 ~1900(*) 311.7

(*) This one I didn't run, but the performance from sequential must be linear, as the sequential tests with 6 and 60 assets confirm.

Below the full testing sequence:


plan(sequential)
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 2.547   0.650  19.507 
plan(list(tweak(cluster, workers = 4), tweak(cluster, workers = 4)))
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 2.559   0.332  36.415 

# Now let's try with more assets
many_aois <- bind_rows(replicate(10, aois, simplify = FALSE))
many_aois$id <- 1:nrow(many_aois)

plan(sequential)
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 21.570   5.551 191.902 

plan(list(tweak(cluster, workers = 4), tweak(cluster, workers = 4)))
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 4.689   0.360  57.267 

# Now let's try with more assets
very_many_aois <- bind_rows(replicate(100, aois, simplify = FALSE))
very_many_aois$id <- 1:nrow(very_many_aois)

plan(sequential)
plan(list(tweak(cluster, workers = 4), tweak(cluster, workers = 4)))
system.time(out <- main(very_many_aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 29.505   2.516 311.656
fBedecarrats commented 1 year ago

Regarding bottlenecks from data transfer and read, my understanding is that Apache Arrow has sparked a real revolution during the last couple of years for tabular and vector spatial data with orders of magnitude in performance improvement (both for optimized storage, with parquet, data read and in-memory sharing with arrow, and with net transfer with flight. I haven't found anything already available for raster data, although I see in this issue/discussion that the arrow developers have been engaging with geospatial players about porting this approach to raster data (see this post in particular). If/when they finalize this, this would mean that we could leverage the whole arrow functionalities available in R for raster processing. @goergen95 , can you have a look at both links and tell me if you think that this would be a perspective worth studying further?

goergen95 commented 1 year ago

Thanks for the replication and timing with more assets! That looks like the expected improvements.

A small side note: With plan(list(tweak(cluster, workers = 4), tweak(cluster, workers = 4))) you are using for 4 cores for each level of parallization (so 16 in total) but there are only two indicators to loop over at the first level. That means two threads will remain idle. So maybe with plan(list(tweak(cluster, workers = 2), tweak(cluster, workers = 8))) we see more gains?

Discussing formats for me is not the priority right now. Instead we need to understand how we want to handle the package internal flow of data with parallelization. My example above delays reading any data from the raster source until the very last step. This means the only data that is transferred to child processes is the URL and the polygons, so its the bare minimum of data that needs to be transferred. Because in this case the data source is a COG on a remote server we can simply read the data for the respective polygon in the very last step. We can thus also parallelize between different machines.

With the way we currently handle data in the package, this approach is only applicable on a single machine because we won't find the downloaded resources on another server. Alternatively, we would transfer entire resources to the other machines which could introduce a serious bottleneck (in the example above, the main bottleneck is the latency to the server with the data source).

fBedecarrats commented 1 year ago

Hi, I tested other configs yesterday (details gitted there), with the following results:

Number of assets 6 60 600
Sequential 19.5 191.9 ~1900(*)
cluster 4x4 36.4 57.3 311.7
cluster 2x4   67.4  
cluster 4x2   73.1  
cluster 4   109.2  
cluster 8   74.8  
cluster 2x8   70.1  
multisession 4x4   61.6  

I didn't see improvements at 2x8 compared to 4x4, but it may be due to cluster initialization overheads. I'll try later today with 600 assets to see how it behave with a larger dataset. The idea about arrow goes beyond mere format aspect (on disk vs. in-memory data + the flight protocol for data transfer between workers). But you're right, it's a different discussion and it's better to focus on parallelization here.

fBedecarrats commented 1 year ago

So I completed the benchmark. Here is a graph, table and full test scripts and outputs (created a specific repo for these tests). image The main takeways I see in these results:

Below the detailed table and testing scripts and outputs.

Number of assets Total number of workers 6 polygons 60 polygons 600 polygons
Sequential 1 19,5 191,9 1480,7
cluster 2x2 4 26,2 65 493,5
cluster 4 4 20 109,2 879,9
cluster 2x4 8 31,2 67,4 1042,6
cluster 4x2 8 24,7 73,1 1314,1
cluster 8 8 24,3 74,8 567,4
multisession 4x4 16 34,4 61,6 317,1
cluster 4x4 16 36,4 57,3 311,7
cluster 2x8 16 41,3 70,1 807,9
cluster 16 16 21,2 73,4 971,6
cluster 3x8 24 47,9 66,7 310,1
cluster 4x6 24 38,2 56,3 442,1
cluster 2x12 24 43,2 80,9 222,4
cluster 24 24 20,9 102,3 810,6
library(terra)
library(sf)
library(future)
library(tibble)
library(dplyr)
library(furrr)

# construct globally distributed polygons
(aois <- c(
  "POLYGON ((-127.4604 59.78938, -127.1124 59.79392, -127.1237 60.13207, -127.4339 60.13207, -127.4604 59.78938))",
  "POLYGON ((-91.62243 15.84818, -91.52449 15.84076, -91.52375 15.95687, -91.60944 15.95353, -91.62243 15.84818))",
  "POLYGON ((-48.37485 -13.55735, -48.61121 -13.56249, -48.60864 -13.68581, -48.362 -13.71664, -48.37485 -13.55735))",
  "POLYGON ((34.54681 1.639019, 34.87915 1.663101, 34.92249 1.889474, 34.68649 1.932822, 34.54681 1.639019))",
  "POLYGON ((120.3057 46.61764, 120.6169 46.65425, 120.5986 46.86782, 120.3789 46.84341, 120.3057 46.61764))",
  "POLYGON ((151.2656 -32.10844, 151.6211 -32.09528, 151.608 -31.8056, 151.371 -31.8056, 151.2656 -32.10844))"
) |>
    st_as_sfc(crs = st_crs(4326)) |>
    st_as_sf())
#> Simple feature collection with 6 features and 0 fields
#> Geometry type: POLYGON
#> Dimension:     XY
#> Bounding box:  xmin: -127.4604 ymin: -32.10844 xmax: 151.6211 ymax: 60.13207
#> Geodetic CRS:  WGS 84
#>                                x
#> 1 POLYGON ((-127.4604 59.7893...
#> 2 POLYGON ((-91.62243 15.8481...
#> 3 POLYGON ((-48.37485 -13.557...
#> 4 POLYGON ((34.54681 1.639019...
#> 5 POLYGON ((120.3057 46.61764...
#> 6 POLYGON ((151.2656 -32.1084...
aois$id <- 1:nrow(aois)
# print areas
units::set_units(st_area(aois), ha)
#> Units: [ha]
#> [1] 69344.81 12106.23 40886.77 92429.55 48699.86 91747.18
# get centroids
centroids <- st_centroid(aois)
#> Warning: st_centroid assumes attributes are constant over geometries
# read the COG file from openlandmap, the true file size is about ~200 GB
dem_url <- "/vsicurl/https://s3.eu-central-1.wasabisys.com/openlandmap/dtm/dtm.bareearth_ensemble_p10_30m_s_2018_go_epsg4326_v20230210.tif"
(dem <- rast(dem_url))
#> class       : SpatRaster 
#> dimensions  : 524999, 1296000, 1  (nrow, ncol, nlyr)
#> resolution  : 0.0002777778, 0.0002777778  (x, y)
#> extent      : -180, 180, -62.00056, 83.8325  (xmin, xmax, ymin, ymax)
#> coord. ref. : lon/lat WGS 84 (EPSG:4326) 
#> source      : dtm.bareearth_ensemble_p10_30m_s_2018_go_epsg4326_v20230210.tif 
#> name        : dtm.bareearth_ensemble_p10_30m_s_2018_go_epsg4326_v20230210
# plot dem and position of polygons
plot(dem, colNA = "steelblue", smooth = TRUE, range = c(-1e2, 1e4))
plot(centroids, col="red", add = TRUE, pch = 4, cex = 2)

# define example indicator functions
# mean height
dem_mean <- function(poly, dem_url){
  rast(dem_url) |>
    crop(poly) |>
    mask(poly) |>
    global("mean", na.rm = TRUE) |>
    as.numeric() -> dem_mean
  tibble(parameter = "dem",
         value = dem_mean,
         child.pid =  Sys.getpid()
  )
}
# mean tri
tri_mean <- function(poly, dem_url){
  rast(dem_url) |>
    crop(poly) |>
    mask(poly) |>
    terrain(v = "TRI") |>
    global("mean", na.rm = TRUE) |>
    as.numeric() -> tri_mean
  tibble(parameter = "tri",
         value = tri_mean,
         child.pid =  Sys.getpid()
  )
}

# define main function
main <- function(portfolio, indicators, dem_url){
  future_map_dfr(indicators,
                 function(indicator, portfolio, dem_url){
                   fun <- switch(indicator,
                                 "dem" = dem_mean,
                                 "tri" = tri_mean)
                   portfolio <- portfolio |> 
                     dplyr::group_split(id)
                   result <- future_map_dfr(portfolio,
                                            function(poly, fun, dem_url){
                                              fun(poly, dem_url)
                                            }, fun, dem_url)
                   result$parent.id <- Sys.getpid()
                   result
                 }, portfolio, dem_url, .options=furrr_options(
                   packages = c("terra", "sf")
                 )
  )
}

# Create larger datasets

# With 60 assets
many_aois <- bind_rows(replicate(10, aois, simplify = FALSE))
many_aois$id <- 1:nrow(many_aois)

# With 600 assets
very_many_aois <- bind_rows(replicate(100, aois, simplify = FALSE))
very_many_aois$id <- 1:nrow(very_many_aois)
# user   system  elapsed 
# 189.744   48.133 1480.693

# Sequential ------------------------------------------------------

plan(sequential)
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 2.547   0.650  19.507 

plan(sequential)
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 21.570   5.551 191.902 

plan(sequential)
system.time(out <- main(very_many_aois, c("dem", "tri"), dem_url))

# 4 Workers -----------------------------------------------------------

## cluster 2x2 ---------------------------------

# very small dataset
plan(sequential)
plan(list(tweak(cluster, workers = 2), tweak(cluster, workers = 2)))
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 1.623   0.095  26.197 

# small dataset
plan(sequential)
plan(list(tweak(cluster, workers = 2), tweak(cluster, workers = 2)))
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 8.049   5.468  65.004 

# larger dataset
plan(sequential)
plan(list(tweak(cluster, workers = 2), tweak(cluster, workers = 2)))
system.time(out <- main(very_many_aois, c("dem", "tri"), dem_url))

## Cluster 4 --------------------------------------

plan(sequential)
plan(cluster, workers = 4)
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 1.009   0.053  19.999 

plan(sequential)
plan(cluster, workers = 4)
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 9.946   0.926 109.158 

# larger dataset
plan(sequential)
plan(cluster, workers = 4)
system.time(out <- main(very_many_aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 81.295   6.770 879.906 

# 8 workers ---------------------------------------------------------------

## Cluster 2x4 --------------------------------------

plan(sequential)
plan(list(tweak(cluster, workers = 2), tweak(cluster, workers = 4)))
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 2.043   0.128  31.238

plan(sequential)
plan(list(tweak(cluster, workers = 2), tweak(cluster, workers = 4)))
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 5.589   0.565  67.473 

# larger dataset
plan(sequential)
plan(list(tweak(cluster, workers = 2), tweak(cluster, workers = 4)))
system.time(out <- main(very_many_aois, c("dem", "tri"), dem_url))
# user   system  elapsed 
# 101.255    8.462 1042.591 

## Cluster 4x2 -----------------------------------------

plan(sequential)
plan(list(tweak(cluster, workers = 4), tweak(cluster, workers = 2)))
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 1.446   0.211  24.726 

plan(sequential)
plan(list(tweak(cluster, workers = 4), tweak(cluster, workers = 2)))
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 5.983   0.600  73.072 

# larger dataset
plan(sequential)
plan(list(tweak(cluster, workers = 4), tweak(cluster, workers = 2)))
system.time(out <- main(very_many_aois, c("dem", "tri"), dem_url))
# user   system  elapsed 
# 121.529   11.115 1314.205 

## Cluster 8 ---------------------------------------

plan(sequential)
plan(cluster, workers = 8)
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 1.334   0.132  24.333 

plan(sequential)
plan(cluster, workers = 8)
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 6.725   0.770  74.830 

# larger dataset
plan(sequential)
plan(cluster, workers = 8)
system.time(out <- main(very_many_aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 51.439   5.110 567.372 

# 8 workers ---------------------------------------------------------------

## Multisession 4x4 --------------------------

plan(sequential)
plan(list(tweak(multisession, workers = 4), tweak(multisession, workers = 4)))
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 2.287   0.238  34.425 

plan(sequential)
plan(list(tweak(multisession, workers = 4), tweak(multisession, workers = 4)))
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 5.064   0.410  61.576 

# larger dataset
plan(sequential)
plan(list(tweak(multisession, workers = 4), tweak(multisession, workers = 4)))
system.time(out <- main(very_many_aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 28.309   2.233 317.105

## cluster 4x4 ----------------------------------------

plan(list(tweak(cluster, workers = 4), tweak(cluster, workers = 4)))
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 2.559   0.332  36.415 

plan(list(tweak(cluster, workers = 4), tweak(cluster, workers = 4)))
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 4.689   0.360  57.267 

plan(sequential)
plan(list(tweak(cluster, workers = 4), tweak(cluster, workers = 4)))
system.time(out <- main(very_many_aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 29.505   2.516 311.656

## cluster 2x8 ---------------------------------

plan(sequential)
plan(list(tweak(cluster, workers = 2), tweak(cluster, workers = 8)))
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 2.830   0.680  41.274

plan(sequential)
plan(list(tweak(cluster, workers = 2), tweak(cluster, workers = 8)))
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 6.459   0.846  70.975

# larger dataset
plan(sequential)
plan(list(tweak(cluster, workers = 2), tweak(cluster, workers = 8)))
system.time(out <- main(very_many_aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 61.611   4.638 807.943 

## cluster 16 -----------------------

plan(sequential)
plan(cluster, workers = 16)
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 1.152   0.074  21.172 

plan(sequential)
plan(cluster, workers = 16)
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 5.960   0.548  73.368 

# larger dataset
plan(sequential)
plan(cluster, workers = 16)
system.time(out <- main(very_many_aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 65.716   2.881 971.648 

# 24 workers -----------------------------------------------------------

## cluster 3x8 -----------------------------------

plan(sequential)
plan(list(tweak(cluster, workers = 3), tweak(cluster, workers = 8)))
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 3.807   0.274  47.897 

# Smaller dataset
plan(sequential)
plan(list(tweak(cluster, workers = 3), tweak(cluster, workers = 8)))
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 5.207   0.453  66.653 

# larger dataset
plan(sequential)
plan(list(tweak(cluster, workers = 3), tweak(cluster, workers = 8)))
system.time(out <- main(very_many_aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 27.404   2.322 310.140 

## Cluster 4x6 ------------------------------

# Very small dataset
plan(sequential)
plan(list(tweak(cluster, workers = 4), tweak(cluster, workers = 6)))
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 2.740   0.264  38.223 

# Smaller dataset
plan(sequential)
plan(list(tweak(cluster, workers = 4), tweak(cluster, workers = 6)))
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 4.118   0.364  56.342

# larger dataset
plan(sequential)
plan(list(tweak(cluster, workers = 4), tweak(cluster, workers = 6)))
system.time(out <- main(very_many_aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 37.542   3.022 442.073

## cluster 2 x 12 ----------------------------------

plan(sequential)
plan(list(tweak(cluster, workers = 2), tweak(cluster, workers = 12)))
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 3.501   0.436  43.203 

plan(sequential)
plan(list(tweak(cluster, workers = 2), tweak(cluster, workers = 12)))
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 6.926   0.754  80.899 

# larger dataset
plan(sequential)
plan(list(tweak(cluster, workers = 2), tweak(cluster, workers = 12)))
system.time(out <- main(very_many_aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 20.459   1.965 222.440

## cluster 24 ------------------------------------------

plan(sequential)
plan(cluster, workers = 24)
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 0.954   0.071  20.891 

plan(sequential)
plan(cluster, workers = 24)
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 8.095   0.828 102.298 

# larger dataset
plan(sequential)
plan(cluster, workers = 24)
system.time(out <- main(very_many_aois, c("dem", "tri"), dem_url))
# user  system elapsed 
# 55.393   4.377 810.612