smnorris / bcdata

Python and command line tools for quick access to DataBC geo-data available via WFS/WCS.
MIT License
29 stars 7 forks source link

async downloads #119

Closed smnorris closed 1 year ago

smnorris commented 1 year ago

Low priority/long term goal as current multi-threading approach using requests works fine - but it seems there could be some speed improvements using async requests:

https://docs.aiohttp.org/en/stable/ https://us-pycon-2019-tutorial.readthedocs.io/aiohttp_intro.html

ateucher commented 1 year ago

I did investigate async in the R package here, but never did implement it in the package...

smnorris commented 1 year ago

Nice - do you remember what the results of the comparison showed?

ateucher commented 1 year ago

I just ran it again: parallel is faster, but I think that async is probably more 'polite' as the host is deciding how many threads to open... at least I think that's how it works!

library(crul)
library(sf)
#> Linking to GEOS 3.10.2, GDAL 3.4.2, PROJ 8.2.1; sf_use_s2() is TRUE
library(glue)
library(future)
library(future.apply)

start_and_chunks <- function(n_records, limit) {
  last <- n_records %% limit

  start_indexes <- seq(0, n_records, by = limit)
  # Remove the last start index if same as number of records
  start_indexes <- setdiff(start_indexes, n_records)
  chunk_sizes <- rep(limit, length(start_indexes))

  if (last) {
    chunk_sizes[length(chunk_sizes)] <- last
  }

  stopifnot(length(chunk_sizes) == length(start_indexes))

  list(start_indexes = start_indexes,
       chunk_sizes = chunk_sizes)
}

combine_list <- function(res) {
  json_list <- lapply(res, function(x) x$parse("UTF-8"))

  sf_list <- lapply(json_list, sf::st_read, quiet = TRUE, stringsAsFactors = FALSE)

  sf_obj <- do.call("rbind", sf_list)

  nrow(sf_obj) == number_of_records
  length(unique(sf_obj$OBJECTID)) == number_of_records

  sf_obj
}

query_list <- function() {
  list(
    SERVICE = "WFS",
    VERSION = "2.0.0",
    REQUEST = "GetFeature",
    outputFormat = "application/json",
    typeNames = "WHSE_FOREST_VEGETATION.BEC_BIOGEOCLIMATIC_POLY",
    sortby = "OBJECTID"
  )
}

get_bec_paging <- function(n, limit_chunk) {

  url_base <- "https://openmaps.gov.bc.ca/geo/pub/wfs"

  cli <- crul::HttpClient$new(url = url_base)

  cc <- crul::Paginator$new(
    client = cli,
    by = "limit_offset",
    limit_param = "count",
    offset_param = "startIndex",
    limit = number_of_records,
    chunk = limit_chunk
  )

  res <- cc$post(body = query_list(), encode = "form")

  combine_list(res)
}

get_bec_async <- function(n, limit_chunk) {

  i_c <- start_and_chunks(n, limit_chunk)

  url_base <- glue("https://openmaps.gov.bc.ca/geo/pub/wfs?startIndex={i_c$start_indexes}&count={i_c$chunk_sizes}")

  cc <- Async$new(urls = url_base)

  res <- cc$post(body = query_list(), encode = "form")

  combine_list(res)
}

get_bec_parallel <- function(n, limit_chunk) {

  i_c <- start_and_chunks(n, limit_chunk)

  url_base <- glue("https://openmaps.gov.bc.ca/geo/pub/wfs?startIndex={i_c$start_indexes}&count={i_c$chunk_sizes}")

  res <- future_lapply(url_base, function(u) {

    cli <- crul::HttpClient$new(url = u)
    r <- cli$post(body = query_list(), encode = "form")
    json <- r$parse("UTF-8")
    sf::st_read(json, quiet = TRUE, stringsAsFactors = FALSE)

  },
  future.seed = NULL
  )

  do.call("rbind", res)

}

#########################################################

number_of_records <- 15267
limit_chunk <- 1000

# number_of_records <- 200
# limit_chunk <- 30

future::plan(multiprocess(workers = 4))
#> Warning: Strategy 'multiprocess' is deprecated in future (>= 1.20.0)
#> [2020-10-30]. Instead, explicitly specify either 'multisession' (recommended) or
#> 'multicore'. In the current R session, 'multiprocess' equals 'multisession'.
#> Warning in supportsMulticoreAndRStudio(...): [ONE-TIME WARNING] Forked
#> processing ('multicore') is not supported when running R from RStudio
#> because it is considered unstable. For more details, how to control forked
#> processing or not, and how to silence this warning in future R sessions, see ?
#> parallelly::supportsMulticore

cat("Parallel\n")
#> Parallel
tictoc::tic()
b3 <- get_bec_parallel(n = number_of_records, limit_chunk = limit_chunk)
tictoc::toc()
#> 58.554 sec elapsed

Sys.sleep(60)

cat("Async\n")
#> Async
tictoc::tic()
b2 <- get_bec_async(n = number_of_records, limit_chunk = limit_chunk)
tictoc::toc()
#> 109.683 sec elapsed

Sys.sleep(60)

cat("Paging\n")
#> Paging
tictoc::tic()
b1 <- get_bec_paging(n = number_of_records, limit_chunk = limit_chunk)
tictoc::toc()
#> 152.921 sec elapsed

all.equal(b1, b2)
#> [1] TRUE
all.equal(b2, b3)
#> [1] TRUE

Created on 2022-10-07 with reprex v2.0.2

smnorris commented 1 year ago

Very interesting. I did not realize it was up to the server.

Google leads me here but I am not sure how much applies or how much I want to digest: https://docs.ogc.org/per/16-023r3.html#_asynchronous_request_processing_for_wfs

I've found that 4 parallel workers can be too much for the server and now default to 2 for ad-hoc requests. For large scheduled loads, just one worker.

smnorris commented 1 year ago

Currently no functions or cli commands with max_workers parameter have a cap. async would be nice but at very least max_workers values should be checked and capped at 4 (or maybe 1-2)

smnorris commented 1 year ago

I'm removing existing parallel functionality and closing this issue - async functionality is out of scope, just keep the script super simple.