rstudio / promises

A promise library for R
https://rstudio.github.io/promises
Other
197 stars 19 forks source link

Can I use the {parallel} package with Shiny and {plumber}? #81

Open schloerke opened 2 years ago

schloerke commented 2 years ago

https://www.linkedin.com/feed/update/urn:li:ugcPost:6899412968317280256?commentUrn=urn%3Ali%3Acomment%3A%28ugcPost%3A6899412968317280256%2C6899428857653334016%29

That's amazing! I want to try this combo as soon as possible! 🤩 It's a little bit tricky to understand how "future" works 🤔 actually, I can't figure out the real differences with "parallel" and "parallelly" packages and why they can't be used for shiny and plumber

schloerke commented 2 years ago

I'll address the {parallel} package, as {parallely} produces better clusters that use the {parallel} package for computations.


The {parallel} package blocks the main R session to compute information with most of its functions (Ex: parallel::mcLapply()). This is great for compiling a document but not good for a server with multiple users. However, one function allows you to submit a job and collect the results at a later time: parallel::mcparallel(expr) and parallel::mccollect(job).

{future} computations return objects that can be automatically upgraded to a promises::promise() within the {promises} package. By transforming the computation into a promises::promise(), we can promise to collect the result at a later time point when the R session is free.

I think we can upgrade {parallel} use of parallel::mcparallel() and parallel::mccollect(job) with {promises}....

Thought process:

library(promises)

parallel_promise <- function(expr, ...) {
  # Submit job
  job <- parallel::mcparallel(expr, ...)

  # Make promise to collect result
  p <- promises::promise(function(resolve, reject) {
    try_result <- function() {
      # Try to collect the result
      res <- parallel::mccollect(job, wait = FALSE, timeout = 0)
      if (is.null(res)) {
        # Try again a little later
        later::later(try_result, 0.1) # (Should expo backoff)
      } else {
        # Return result
        resolve(res)
      }
    }
    try_result()
  })

  # Return promise
  p
}

Example: Show it is possible

Small demo wrapped in { } to show that the main R session can continue with calculations even after submitting the job.

{
  message("Local PID: ", Sys.getpid())
  parallel_promise({Sys.getpid()}) %...>% {
    message("Parallel PID: ", .)
  }
  message("Post computation: ", 1 + 1) # Here to prove that promises are working
}
#> Local PID: 11941
#> Post computation: 2
#> Parallel PID: 24335

Example: Perform 10 jobs in parallel

start <- Sys.time()
proms <- lapply(1:4, function(i) { parallel_promise({Sys.sleep(i / 2); runif(i)}) })
final_prom <- promises::promise_all(.list = proms) %...>% {
  dt <- .
  message("Results in ", round(as.difftime(Sys.time() - start, units = "secs"), digits = 2), " seconds")
  str(dt)
  dt
}
final_prom
#> Results in 2.26 seconds
#> List of 4
#>  $ :List of 1
#>   ..$ 35715: num 0.568
#>  $ :List of 1
#>   ..$ 35716: num [1:2] 0.494 0.956
#>  $ :List of 1
#>   ..$ 35717: num [1:3] 0.151 0.1 0.353
#>  $ :List of 1
#>   ..$ 35718: num [1:4] 0.179 0.566 0.541 0.474

Take home message

As long as parallel_promise() (or a followup promises::promise() like final_prom) is returned in a Shiny render method or {plumber} API method, then the main R session will not be blocked while performing external {parallel} calculations.

To drive the point home again, the promises::promise() object must be returned from the Shiny render method or {plumber} API method. Otherwise Shiny or {plumber} will just immediately display or pass through what was returned by the function.


Why wasn't {parallel} package natively supported?

There are too many backends to support. Rather than picking a "top 5", only {future} is supported due to its explicit submission/collection functions and its ability to generically interface with many cluster setups.

HenrikBengtsson commented 2 years ago

Correct, the closest we have to future() and value() in base R is mcparallel() and mccollect(), especially since globals are automatically taken care of since forked processing is used However, the latter does not handle output and conditions. For example,

> library(parallel)
> f <- mcparallel({ print(1:3); warning("boom"); log("a") })
[1] 1 2 3
> v <- mccollect(f)
> v
$`12609`
[1] "Error in log(\"a\") : non-numeric argument to mathematical function\n"
attr(,"class")
[1] "try-error"
attr(,"condition")
<simpleError in log("a"): non-numeric argument to mathematical function>

Note how the output is not captured (and might even be lost depending on environment/GUI), and the warning is lost, so those need to be handled separately. On the upside, mcparallel()/mccollect() does indeed preserve the error condition, so you can do something like:

> cond <- attr(v[[1]], "condition")
> if (inherits(cond, "error")) stop(cond)
Error in log("a") : non-numeric argument to mathematical function

to relay the error.

If you want to do something similar with parallel PSOCK clusters, more work is needed, and you'll also have to handle errors internally (e.g. capture it via a tryCatch(), because those are not relayed (you're only getting a try-error object, so you've lost the original error).

These are some of the things that future encapsulates in a standard, unified manner:

> library(future)
> f <- future({ print(1:3); log("a") })
> v <- value(f)
[1] 1 2 3
Error in log("a") : non-numeric argument to mathematical function

regardless of parallel backend. Hopefully that helps clarify the role of future here.