insightsengineering / teal

Exploratory Web Apps for Analyzing Clinical Trial Data
https://insightsengineering.github.io/teal/
Other
184 stars 39 forks source link

Investigate async / parallel while downloading multiple datasets #158

Closed shajoezhu closed 3 years ago

shajoezhu commented 3 years ago

Dear NEST team

I would like to propose a new feature with DDL, fetching data with rice in the context of mclapply. Think this feature will improve the speed for loading the data for more teams.

Thanks! Joe

The following is the function we use to download data from entimice, it works well on FI and BEE (not working on enabler due to some SSL initialization which I havn't figured out how to fix). We download 1.8Gb data in 21 files, this parallel process approach helped us a lot.


load_all_rice_mc <- function(paths = paths_analysis,
                             files = files_analysis,
                             user = Sys.info()[["user"]],
                             pass = getPass::getPass(msg = "Provide password for entimice: "),
                             id = NULL,
                             visits_only = FALSE){
  message("using mc version")
  stopifnot(is.character(paths), is.character(files))

  rice:::cfg_set_sid(NULL) 
  rice_session_open(username = user, password = pass)
  contents <- rice_ls(paths, prolong = TRUE, quiet = TRUE)

  if (visits_only) {
    files <- files[!files %in% c("ate", "atx")]
  }

  regex <- paste0("(", paste(files, collapse = ")|("), ")")

  file_paths <- contents$files[str_detect(contents$files, regex)]

  # load raw files, remove those ending with _122 (filtered datasets)
  file_paths <- file_paths[!str_detect(file_paths, "_122.sas7bdat$")]

  files_read <- paste0(paths, "/", file_paths) #%>% as.list

  id = rice_session_id()

  try_rice_block <- function(x){
    ret <- tryCatch({message(x);
      rice:::cfg_set_sid(id)
      # print(rice_session_id())
      rice_read(x, prolong=TRUE)}, 
      error = function(cond){
        return(NA)
      }
    )
    return(ret)
  }

  repeat_rice_block <- function(x){
    ret <- NA
    attempt <- 1
    while( is.na(ret) && attempt <= 3 ) {
      attempt <- attempt + 1
      ret <- try_rice_block(x)
    }
    return(ret)
  }

  prolong_all = TRUE
  out <- mclapply(files_read, repeat_rice_block,
                  mc.cores = 21L, mc.preschedule = FALSE
  )
  names(out) <- str_remove(files_read, paste0(paths, "/anon_")) %>%
    str_remove(".sas7bdat")

  # rice_get_meta(out, write = "./briefing_package/output/ice_meta.yml")
  rice_session_close()

  return(out)
}
kpagacz commented 3 years ago

Wouldn't do much if the container allocated for the app has limited CPU resources anyway. What would work though is some async programming when downloading data from the web/reading data from the disk.

kpagacz commented 3 years ago

Although I haven't looked into how we could implement that specifically (but seems like we could simply use some async functions in the pull methods).

pawelru commented 3 years ago

Please note that usage of rice would be changed into riceutils. https://github.com/insightsengineering/teal/issues/40

Hopefully this new package offers some performance improvements.

shajoezhu commented 3 years ago

Hi @pawelru and @kpagacz many thanks for the reply!

I think ricepass is going to take care of entering password.

@kpagacz , you are right, I believe if users starts up their shinyapp on bee, they will benefit from this improvement the most. For users accessing the app via rsconnect, I believe if needs, more resources can be allocated for the shinyapp. Our feedback from scientist and statisticians with these apps is that data loading time sometimes take too long. Thank you so much for looking into this.

pawelru commented 3 years ago

Ok I am converting this into more research task to investigate possible solutions. And I believe it's more general than only rice connector. Thanks for the feedback. We will take it soon.

kpagacz commented 3 years ago

I am going to have a brief look at the mclapply and the async stuff. I am worried about async because it's going to complicate code tremendously, especially in our team since most of us (I assume) have no experience with async programming. It would give the most benefit though when downloading data from a source, so I will try to morph a connector to an async one and try it out.

mclapply indeed won't do much for a container probably severely limited in its CPU usage, but I will have a look as well since it's the simpler one to implement.

I am looking into - how many things we would have to change to make this work and how hard it is going to be to maintain this stuff.

kpagacz commented 3 years ago

Async The introduction I used to research async promises: https://cran.r-project.org/web/packages/promises/vignettes/intro.html Short summary:

In short, it doesn't really make any sense to use async to do it because we are bounded by the network speed anyway. We can have multiple parallel slow downloads instead of one fast, the only difference is the IO buffer does more work. In terms of reading from a disk - the same story, we are bounded by IO speed, not anything else. Additionally, all the child process are going to need to copy the result (possibly a couple of gigabytes of data) to the parent process, that's obviously super slow.

A scenario where it makes sense to use async is e.g. querying an external resource, which takes a long time to compute the small return value. There it makes sense because we can go ahead and start doing other stuff. In our case, we are dependent on the datasets, so unless an app has them, it cannot proceed anyway.

Multicore The same reasons apply, doesn't really make any sense to use and can actually make the app slower.

I haven't tested it out, but considering the authors discourage exactly our use case I don't think it's a good time investment. Discouragement: https://cran.r-project.org/web/packages/promises/vignettes/futures.html#returning-values

kpagacz commented 3 years ago

I consider my part done here for now. Let me know, if you would like me to run some code tests anyway, @pawelru

kpagacz commented 3 years ago

@mhallal1 @gogonzo @nikolas-burkoff @Polkas @sorinvoicu @junlueZH have a look as well.

mhallal1 commented 3 years ago

@kpagacz Your conclusions make sense in the light of the articles above, some minimal code tests comparing mclapply and the actual state would be valuable in different scenarios.

kpagacz commented 3 years ago

For small files it is basically the same: image

For larger files, it becomes clear the non_par is the same or might be even better. image

I tested this with:

links <- c(
  "https://perso.telecom-paristech.fr/eagan/class/igr204/data/film.csv",
  "https://perso.telecom-paristech.fr/eagan/class/igr204/data/smallwikipedia.csv"
)

non_parallel <- function(links) {
  lapply(links, function(link) read.csv(link, sep = ";"))
}

non_par_frames <- non_parallel(links)

parallel <- function(links) {
  parallel::mclapply(links, function(link) read.csv(link, sep = ";"))
}

par_frames <- parallel(links)

microbenchmark::microbenchmark(
  par = parallel(links),
  non_par = non_parallel(links),
  times = 4
)

large_files_links <- c(
  "https://www.stats.govt.nz/assets/Uploads/International-trade/International-trade-June-2021-quarter/Download-data/overseas-trade-indexes-June-2021-quarter-provisional-csv.csv",
  "https://www.stats.govt.nz/assets/Uploads/Annual-enterprise-survey/Annual-enterprise-survey-2020-financial-year-provisional/Download-data/annual-enterprise-survey-2020-financial-year-provisional-csv.csv"
)
microbenchmark::microbenchmark(
  par = parallel(large_files_links),
  non_par = non_parallel(large_files_links),
  times = 2
)

with a bounded network speed (basically connected to Roche VPN :P)

kpagacz commented 3 years ago

So the results are heavily dependent on the number of cores plus the network speed, but I would assume the worst when it comes to our hosted apps.

gogonzo commented 3 years ago

Agree with @kpagacz there is no free lunch. Managing parallelization adds more complexity to the connectors and it's harder to maintain. We have some services which works in parallel (tlgdown and tlg-catalog) and we experience some issues from time to time related with overloaded cores. I also can imagine reading 21 big datasets and out-of-memory message. These crashes during shiny process on remote server (eg RSConnect) will be practically unreproducible.

Regardless of the Konrad's test I'm thinking about these lines:

 try_rice_block <- function(x){
    ret <- tryCatch({message(x);
      rice:::cfg_set_sid(id)
      rice_read(x, prolong=TRUE)}, 
      error = function(cond){
        return(NA)
      }
    )
    return(ret)
  }

  repeat_rice_block <- function(x){
    ret <- NA
    attempt <- 1
    while( is.na(ret) && attempt <= 3 ) {
      attempt <- attempt + 1
      ret <- try_rice_block(x)
    }
    return(ret)
  }

I conclude that connection with the rice do download might not be instantly established and it needs few attempts to start the downloading. This means that establishing downloading might take some time that is why parallelization is faster. Why does it have to tryCatch each time - Is it a side effect of parallelization?

We download 1.8Gb data in 21 files, this parallel process approach helped us a lot.

@shajoezhu could you write us degree how it helped you? Or present some example which we can benchmark our self?

kpagacz commented 3 years ago

Dawid touched on the maintainability subject and I can expand on that.

Async would be a hellish task to implement because any objects returned by an async function is an "async" object (so called promise) and they have their own syntax to use them. What it means, anything downstream of our downloaded datasets would have to be refactored to use the async syntax. That would be a monumental task.

Multicore is a little bit better, but as mentioned - it's tough to debug and can cause issues on a small number of cores/different architectures (even when running this test, I once got a weird error about a code not finishing its task).

shajoezhu commented 3 years ago

Hey @kpagacz , thank so so much for look into this! I was wondering how many cores do we usually assign for our hosted apps? Is this equivalent to the number of processes?

I had a quick spin with your code on BEE. I think the parallel process adds value when the list of files gets long.

> long_large_files_links <- c(
+   "https://www.stats.govt.nz/assets/Uploads/International-trade/International-trade-June-2021-quarter/Download-data/overseas-trade-indexes-June-2021-quarter-provisional-csv.csv",
+   "https://www.stats.govt.nz/assets/Uploads/International-trade/International-trade-June-2021-quarter/Download-data/overseas-trade-indexes-June-2021-quarter-provisional-csv.csv",
+   "https://www.stats.govt.nz/assets/Uploads/International-trade/International-trade-June-2021-quarter/Download-data/overseas-trade-indexes-June-2021-quarter-provisional-csv.csv",
+   "https://www.stats.govt.nz/assets/Uploads/International-trade/International-trade-June-2021-quarter/Download-data/overseas-trade-indexes-June-2021-quarter-provisional-csv.csv",
+   "https://www.stats.govt.nz/assets/Uploads/International-trade/International-trade-June-2021-quarter/Download-data/overseas-trade-indexes-June-2021-quarter-provisional-csv.csv",
+   "https://www.stats.govt.nz/assets/Uploads/Annual-enterprise-survey/Annual-enterprise-survey-2020-financial-year-provisional/Download-data/annual-enterprise-survey-2020-financial-year-provisional-csv.csv"
+ )
> 
> 
> microbenchmark::microbenchmark(
+   par = parallel(links),
+   non_par = non_parallel(links),
+   times = 4
+ )
Unit: milliseconds
    expr      min       lq     mean   median       uq      max neval cld
     par 316.1420 317.7787 322.1569 322.3151 326.5351 327.8554     4  a 
 non_par 572.0894 573.4172 580.1365 579.9724 586.8558 588.5117     4   b
> 
> 
> microbenchmark::microbenchmark(
+   par = parallel(large_files_links),
+   non_par = non_parallel(large_files_links),
+   times = 4
+ )
Unit: seconds
    expr      min       lq      mean    median        uq      max neval cld
     par 6.728852 6.894786  7.712177  7.084403  8.529567  9.95105     4   a
 non_par 7.894636 8.977930 10.968771 10.740527 12.959613 14.49939     4   a
> 
> microbenchmark::microbenchmark(
+   par = parallel(long_large_files_links),
+   non_par = non_parallel(long_large_files_links),
+   times = 4
+ )
Unit: seconds
    expr      min       lq     mean   median       uq      max neval cld
     par 16.77973 19.89862 23.81990 23.06806 27.74118 32.36375     4  a 
 non_par 38.27499 38.83608 44.03516 42.73391 49.23424 52.39783     4   b

@gogonzo the tryCatch is a side effect with parallelization on FI, it is not guaranteed everytime the download session will start. The tryCatch statement helps for this case.

I completely agree that using mclapply add difficulties with maintaining and debugging. Thank you so much for looking into this. I will share back with anything I learn from our process as well.

kpagacz commented 3 years ago

That just tells me we haven't hit the network bandwidth while downloading these files on BEE.

kpagacz commented 3 years ago

Conclusions:

Closing for now.

shajoezhu commented 3 years ago

Hey guys, many thanks for exploring the possibility!