HenrikBengtsson / future

:rocket: R package: future: Unified Parallel and Distributed Processing in R for Everyone
https://future.futureverse.org
951 stars 83 forks source link

PERFORMANCE: Earlier signaling of error for resolve() and value() #622

Open HenrikBengtsson opened 2 years ago

HenrikBengtsson commented 2 years ago

The below example shows how created, but not yet launched futures, continue to launch even after one produces an error:

library(future)
library(progressr)
progressr::handlers(global = TRUE)

plan(sequential)

my_fcn <- function(n) {
  p <- progressr::progressor(n)
  fs <- lapply(seq_len(n), FUN = function(ii) {
    future({
      if (ii %in% c(2L, 4L)) {
        p(sprintf("ii = %d <= FAIL", ii), class = "sticky") ## signal progress
        stop(sprintf("Boom! (ii = %d)", ii))
      }
      p(sprintf("ii = %d", ii), class = "sticky") ## signal progress
      Sys.sleep(1.0)
      ii
    })
  })
  value(fs)
}

y <- my_fcn(5)
#> ii = 1                                                                        
#> ii = 2 <= FAIL                                                                
#> ii = 3                                                                        
#> ii = 4 <= FAIL                                                                       
#> ii = 5                                                                        
#> Error in eval(quote({ : Boom! (ii = 2)

The question is, should the error have precedence over launching yet-to-be-launched futures? That probably depends on the use case, but at least there should be an option to do so. For example, in higher-level map-reduce functions it makes sense to do so. If so, the above function could exit early, i.e.

y <- my_fcn(5)
#> ii = 1                                                                        
#> ii = 2 <= FAIL                                                                
#> Error in eval(quote({ : Boom! (ii = 2)

Imagine what a difference this would make if we called my_fcn(1000).

Internally, value(fs) calls:

fs <- resolve(fs, result = TRUE, stdout = TRUE, signal = TRUE, force = TRUE)

which won't return until all futures have been resolved, but it won't signal errors (although it will signal all other type of conditions). That's done later by value(fs). So, if we want to have value(fs) to error early, and avoid launching the remaining futures, we need to add this feature to resolve() first.

Now, if we allow resolve() to signal errors, and thereby terminating early, we have to make sure any parallel futures that have already been launched are let to finish first, before the error is signaled. We do not want to have any stray futures remaining when we exit resolve() or value() (... or the calling future.apply, furrr, ... function)

Related to the above is the promise to relay stdout or conditions in order, regardless of future backend, i.e. parallel output should be the same as sequential output. This requires that all preceding futures are resolved, so their output can be relayed, before signaling an error. This is in most cases already covered by the previous consideration. However, if we use a randomize ordering of futures at the map-reduce level, this is not necessarily true, e.g. future_lapply(..., future.chunk.size = structure(1L, ordering = "random"). Unfortunately, this reordering is orchestrated by future.apply and not future, so resolved cannot possibly know the true, intended order. Luckily, resolve() for lists takes an optional argument idxs, which allows us to process the futures in the order given by idxs. We could make use of that at the map-reduce level to make sure all output is relayed and in order.

Care needs to be taken to make sure the order of relayed stdout and conditions are preserved if there's an error, just as we already do. However, if we allow an error to terminate the resolve process, we should probably be conservative and collect and relay

HenrikBengtsson commented 2 years ago

Note that the proposed improvement only affects the higher-level map-reduce functions when each parallel worker processes more than one chunk, e.g. future.chunk.size = 1L of future.apply. In contrast, the default is to split up the iterations uniformly across all workers and have one chunk (=future) per worker, which means they will all run. Using the above example, the default corresponds to:

library(future.apply)
library(progressr)
progressr::handlers(global = TRUE)

plan(sequential)

my_fcn <- function(n) {
  p <- progressr::progressor(n)
  future_lapply(seq_len(n), FUN = function(ii) {
    if (ii %in% c(2L, 4L)) {
      p(sprintf("ii = %d <= FAIL", ii), class = "sticky") ## signal progress
      stop(sprintf("Boom! (ii = %d)", ii))
    }
    p(sprintf("ii = %d", ii), class = "sticky") ## signal progress
    Sys.sleep(1.0)
    ii
  })
}

y <- my_fcn(5)
#> ii = 1                                                                        
#> ii = 2 <= FAIL                                                                
#> Error in ...future.FUN(...future.X_jj, ...) : Boom! (ii = 2)

which signals the error without launching the remaining futures. In contrast, if we use future.chunk.size = 1L, we have the case illustrated in https://github.com/HenrikBengtsson/future/issues/622#issue-1230467284, e.g.

my_fcn <- function(n) {
  p <- progressr::progressor(n)
  future_lapply(seq_len(n), FUN = function(ii) {
    if (ii %in% c(2L, 4L)) {
      p(sprintf("ii = %d <= FAIL", ii), class = "sticky") ## signal progress
      stop(sprintf("Boom! (ii = %d)", ii))
    }
    p(sprintf("ii = %d", ii), class = "sticky") ## signal progress
    Sys.sleep(1.0)
    ii
  }, future.chunk.size = 1L)
}

y <- my_fcn(5)
#> ii = 1                                                                        
#> ii = 2 <= FAIL                                                                
#> ii = 3                                                                        
#> ii = 4 <= FAIL                                                                
#> ii = 5                                                                        
#> Error in ...future.FUN(...future.X_jj, ...) : Boom! (ii = 2)