HenrikBengtsson / future

:rocket: R package: future: Unified Parallel and Distributed Processing in R for Everyone
https://future.futureverse.org
949 stars 83 forks source link

plan(list(tweak(multicore, workers = N), tweak(multicore, workers = N))); future(...) fails with illegal error message #231

Closed mlell closed 6 years ago

mlell commented 6 years ago

In this Stack Overflow answer, you explained how to achieve multi-layer-parallelism:

Now, we could imagine that we process the outer layer with, say, two parallel futures, and then the inner layer with four parallel futures. In that case, we would end up running on at most eight cores (= 2 * 4). This can be achieved by forcing a fixed number of workers at each layer (not recommended):

plan(list(tweak(multiprocess, workers = 2), tweak(multiprocess, workers = 4)))

The problem is, that this apparently causes an error with a strange error message which by itself causes an error while handling:

Error in stop(condition) : bad error message

Full 2-level parallelism is possible if using

plan(list(tweak(multisession, workers = 2), tweak(multisession, workers = 4)))

MCVE:

library(future)
library(ggplot2)
library(listenv)
library(dplyr)

"Workhorse" function (in real world)

Run for a random amount of time and return start and stop time Returns a 1-row data.frame with columns start and stop.

startStop <- function(){
  start <- Sys.time()
  x <- runif(1, 1, 3)
  Sys.sleep(x)
  stop <- Sys.time()
  return(data.frame(start = start, stop = stop))
}

Plotting functions

All functions expects a data.frame with columns iL1, iL2, start, stop.

plotStartStop <- function(d){
  d$x <- paste(d$iL1, d$iL2, sep = "_")
  d$iL1 <- as.character(d$iL1)
  ggplot(d, aes(x = x, ymin = start, ymax = stop, color = iL1)) + geom_linerange() +
    coord_flip() + ylab("time") + xlab("future")
}

plotNActive <- function(d){
  tr <- c(min(d$start), max(d$stop))
  x <- tidyr::crossing(iL1 = unique(d$iL1), iL2 = unique(d$iL2), t = seq(tr[1], tr[2], length.out = 1000))
  x <- x %>%
    left_join(d, by = c("iL1", "iL2")) %>%
    mutate(running = t >= start & t <= stop, start = NULL, stop = NULL)
  x <- x %>%
    group_by(t, iL1) %>%
    summarise(nRunning = sum(running), i = 1) #%>%
    #mutate(t2 = lead(t))

  x$iL1 <- as.character(x$iL1)
  ggplot(x, aes(x = t, y = nRunning, fill = iL1)) +
    geom_area(position = "stack", color = "black")
}

plotAll <- function(d){
  gridExtra::grid.arrange(
    plotStartStop(d), plotNActive(d), nrow = 2, heights = unit(c(2,1), "null")
  )
}

Functions which execute futures

Two variants: implicit and explicit futures.

#' Execute startStop() nested within two levels of futures.
#' nL1 number of level-1 futures. nL2: number of level-2 futures.
#' Returns: A data.frame with one row for each L2-future.
#'   iL1: Index of level-1-future
#'   iL2: Index of level-2-future
#'   start: time startStop() was started
#'   stop:  time startStop() finished
twolevel_implicit <- function(nL1, nL2){
  l <- listenv()
  #d$start <- Sys.time()
  #d$stop <- Sys.time()

  for(i in seq_len(nL1)){
    l[[i]] %<-% {
      m <- listenv()
      for(j in seq_len(nL2)){
        m[[j]] %<-% startStop()
      }
      Reduce(rbind, m)
    }
  }

  d <- Reduce(rbind, l)
  d$iL1 <- rep(seq_len(nL1), times = nL2)
  d$iL2 <- rep(seq_len(nL2), each = nL1)
  d$stop  <- as.numeric(difftime(d$stop , min(d$start), units = "secs"))
  d$start <- as.numeric(difftime(d$start, min(d$start), units = "secs"))
  return(d)
}

#' like twolevel_implicit, but with explicit futures
twolevel_explicit <- function(nL1, nL2){
  l <- rep(list(NULL), nL1)
  for(i in seq_along(l)){
    l[[i]] <- future({
      m <- rep(list(NULL), nL2)
      for(j in seq_along(m)){
        m[[j]] <- future(startStop())
      }
      m <- lapply(m, value)
      m <- do.call(rbind, m)
      m
    })
  }
  l <- lapply(l, value)
  d <- do.call(rbind, l)
  d$iL1 <- rep(seq_len(nL1), each = nL2)
  d$iL2 <- rep(seq_len(nL2), times = nL1)
  d$stop  <- as.numeric(difftime(d$stop , min(d$start), units = "secs"))
  d$start <- as.numeric(difftime(d$start, min(d$start), units = "secs"))
  return(d)
}

Tests

Suggested solution from Stack overflow answer works.

plan(list(tweak(multisession, workers = 5), tweak(multisession, workers = 5)))
dwarmup <- twolevel_implicit(3,4)
d1 <- twolevel_implicit(3,4)
d2 <- twolevel_explicit(3,4)
plotAll(d1)

image

plotAll(d2)

image

Also, using a single-level multicore plan works, even when tweaking the number of workers.

plan(list(tweak(multicore, workers = 5)))
dwarmup <- twolevel_implicit(3,4)
d1 <- twolevel_implicit(3,4)
d2 <- twolevel_explicit(3,4)
plotAll(d1)

image

Explicit futures work as predicted. First level parallel, second level sequential. 

plotAll(d2)

image

However, 2-level multicore plans throw an error:

plan(list(tweak(multicore, workers = 5), tweak(multicore, workers = 5)))
dwarmup <- twolevel_implicit(3,4)
 Error in stop(condition) : bad error message 

Session Info

R version 3.4.3 (2017-11-30)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: CentOS Linux 7 (Core)

Matrix products: default
BLAS: /opt/Bio/R/3.4.3/lib64/R/lib/libRblas.so
LAPACK: /opt/Bio/R/3.4.3/lib64/R/lib/libRlapack.so

locale:
 [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C              
 [3] LC_TIME=en_US.UTF-8        LC_COLLATE=en_US.UTF-8    
 [5] LC_MONETARY=en_US.UTF-8    LC_MESSAGES=en_US.UTF-8   
 [7] LC_PAPER=en_US.UTF-8       LC_NAME=C                 
 [9] LC_ADDRESS=C               LC_TELEPHONE=C            
[11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C       

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] bindrcpp_0.2  dplyr_0.7.4   listenv_0.7.0 ggplot2_2.2.1 future_1.8.1 

loaded via a namespace (and not attached):
 [1] Rcpp_0.12.17     bindr_0.1        magrittr_1.5     munsell_0.4.3   
 [5] colorspace_1.3-2 R6_2.2.2         rlang_0.1.6      plyr_1.8.4      
 [9] globals_0.11.0   tools_3.4.3      parallel_3.4.3   grid_3.4.3      
[13] gtable_0.2.0     assertthat_0.2.0 yaml_2.1.19      lazyeval_0.2.1  
[17] digest_0.6.15    tibble_1.4.2     gridExtra_2.3    tidyr_0.7.2     
[21] purrr_0.2.4      codetools_0.2-15 glue_1.2.0       labeling_0.3    
[25] compiler_3.4.3   pillar_1.1.0     scales_0.5.0     pkgconfig_2.0.1
HenrikBengtsson commented 6 years ago

Thanks for this detailed report. It looks like a bug in the future framework (where a specific type of error condition is not handled correctly). What do you get if you can traceback() immediately after you get that error message?

mlell commented 6 years ago
15: stop(condition)
14: resignalCondition(future)
13: value.Future(future)
12: value(future)
11: eval(quote({
        value <- value(future)
        rm(list = future_name, envir = assign.env)
        value
    }), new.env())
10: eval(quote({
        value <- value(future)
        rm(list = future_name, envir = assign.env)
        value
    }), new.env())
9: eval(expr, p)
8: eval(expr, p)
7: eval.parent(substitute(eval(quote(expr), envir)))
6: local({
       value <- value(future)
       rm(list = future_name, envir = assign.env)
       value
   })
5: mget(vars[ok], envir = x, inherits = FALSE)
4: as.list.listenv(x)
3: as.list(x)
2: Reduce(rbind, l) at #16
1: twolevel_implicit(3, 4)

Using debug(future:::signalCondition), the error is thrown at signalEarly.R:61.

HenrikBengtsson commented 6 years ago

Thank you; I can reproduce this. The "bad error message" error is due to a malformed FutureError whose message is empty (must be a string vector of length one):

> trace(future:::resignalCondition, at = 6L, tracer = quote(str(condition)))
> dwarmup <- twolevel_implicit(3,4)
Tracing resignalCondition(future) step 6 
List of 2
 $ message: chr(0) 
 $ call   : NULL
 - attr(*, "class")= chr [1:5] "FutureError" "simpleError" "error" "FutureCondition" ...
 - attr(*, "future")=Classes 'MulticoreFuture', 'MultiprocessFuture', 'Future', 'environment' <environment: 0x3d96928> 
Error in stop(condition) : bad error message

Why this is, I have to figure out, but I got what I need to troubleshoot this. After fixing this one, there will probably be a more informative error on why the nested multicore processing fails here.

HenrikBengtsson commented 6 years ago

Short story of the non-informative bad error message message; there was a mistake in construction of a FutureError condition where one of the arguments to sprintf() being NULL (also by mistake) resulting in a zero-length message, which is invalid in R.

New minimal reproducible example

Having fixed this in the develop branch, I can now reproduce the underlying problem while getting a more informative error message:

library(future)
plan(list(tweak(multicore, workers = 2), tweak(multicore, workers = 2)))

fs <- lapply(1:2, FUN = function(i) {
  future({
    f <- future({ Sys.sleep(1); i })
    value(f)
  })
})
v <- values(fs)

this gives:

Error: Invalid usage of futures: A future whose value has not yet been collected can only
be queried by the R process (34813549-aeca-e2a0-4085-84086fa23b13; pid 16532 on hb-x1)
 that created it, not by any other R processes (44b29749-07c0-3392-9a19-b6961534cfeb; 
pid 16570 on hb-x1): {; f <- future({; Sys.sleep(1); i; }); value(f); }

Troubleshooting

My best guess for now is that this is due to an oversight by me when it comes to forked processes (i.e. when using multicore). More precisely, the forked child process inherits an internal registry of active futures from the parent that it does not own but tries to cleanup/resolve. This can only happen when forked processes (multicore) are in use. It should only occur when using two or more consecutive tweak(multicore, workers = two_or_more) when setting up the plan(). For instance, it will not be a problem when using any of the following:

plan(list(tweak(multicore, workers = 2), sequential, tweak(multicore, workers = 2)))
plan(list(tweak(multicore, workers = 2), tweak(multisession, workers = 2)))

As mentioned in the updated vignette (and in my SO answer), I don't recommend forcing nested parallel processing on the same machine this way;

This can be achieved by forcing a fixed number of workers at each layer (not recommended)

Having said all this, this is still a bug in the future framework for MulticoreFuture:s that needs to be fixed. It's not a quick fix, but it's also not a major one. I'd hope this will be fixed by next release together with a few other things.

Thanks again for reporting.

HenrikBengtsson commented 6 years ago

This has been fixed in the develop branch (commit c9696a4)and, as usual for all bugs, I've added a package redundancy test for this case. You can try it with:

remotes::install_github("HenrikBengtsson/future@develop")

I'm closing, but please feel to reopen if needed.

PS. I like your graphical presentation of the parallel load. FYI there's a plan on automatic gathering (start, stop) times and possible other stats too (Issue #59) and since future (>= 1.8.0) this is basically just a matter on deciding the public API for access such info. PPS. The snow package (now basically deprecated) has some built-in "snow-timing" gathering/plotting, but unfortunately they dropped that when the moved its code into the parallel package.

mlell commented 6 years ago

@HenrikBengtsson Thank you very much for working this out!

I want, however comment on your updated vignette to that topic:

This behavior is due to the built-in protection against nested parallelism. If both layers would run in parallel, each using the 8 cores available on the machine, we would be running 8 * 8 = 64 parallel processes - that would for sure overload our computer.

I want to show you my use case to illustrate that this can be a useful thing and should IMO not be treated as an exotic corner case. I'm currently working on a machine with 2TB RAM and 60 physical CPUs. I suspect that among people who are interested in parallel computing, setups like these are not uncommon. Therefore I feel that the assertion "64 parallel processes will surely overload your computer" is a bit over-cautious. Also CPUs nowadays have overheat protection; the system throttles them in response to temperature sensors. So I do not think that you can brick your machine without tweaking processor internals.

I have to perform 1000 cross validation rounds for each of 10 subgroups of my data. Each round involves 1500 iterations of a Gibbs sampler. In "productive" operation, this means that I need only one level of parallelism, because the CPUs are busy enough performing the 1000 cross validations of one subgroup. However, as this takes about a day, I want to make sure that there are no errors downstream which might make me loose my results halfway through. Therefore, I usually test the pipeline in advance with toy settings, e.g. 3 cross validation rounds with 20 Gibbs samples. Your package is awesome to switch between these cases if it can multi-level parallelism well, because

All this is possible without deep changes to code logic because of future if 2-level parallelism works! It apparently does now, I have still to test it, so thank you very much!