HenrikBengtsson / future

:rocket: R package: future: Unified Parallel and Distributed Processing in R for Everyone
https://future.futureverse.org
956 stars 83 forks source link

Error with global sizes #185

Closed saleonard closed 6 years ago

saleonard commented 6 years ago

I'm trying to use future for parallel computation of a bootstrap function that I want to run in multiple data frames (one large dataset stratified by calendar year) on a Windows server. Each data year has about 450,000 rows. The code works fine on its own, but when I tried using plan(multiprocess) with %<-%, I got the following error below.

I'm wondering if you could advise about what steps I can try? If I reduce the sample size, I get the same error.

Error in getGlobalsAndPackages(expr, envir = envir, persistent = persistent, : The total size of the 6 globals that need to be exported for the future expression (‘test(sample)’) is 582.13 MiB. This exceeds the maximum allowed size of 500.00 MiB (option 'future.globals.maxSize'). The three largest globals are ‘sample’ (542.96 MiB of class ‘list’), ‘A’ (26.10 MiB of class ‘numeric’) and ‘Y’ (13.05 MiB of class ‘numeric’).

### Original code
B <- 3  # Only 3 iterations for testing code
estimates <- rep(NA, B)  

test <- function(x) {
  for (i in 1:B) {
    print(i)
    bootstrapped_data <- x[sample(row.names(x), nrow(x), replace=TRUE),]
    model = glm(Y ~ A, data=bootstrapped_data, family='binomial') 
    bootstrapped_data$A[bootstrapped_data$A==0] 
    predictions <- predict(model, newdata=bootstrapped_data, type= 'response')
    estimates[i] <- mean(predictions)
  }
  meanestimate <- mean(estimates)
  interval <- quantile(estimates, probs = c(0.025, 0.975))

  return(list(meanestimate = meanestimate, interval = interval))
}

test(sample)
HenrikBengtsson commented 6 years ago

That error protects you against transferring too large objects between master and workers, e.g. when the workers are on machines across the world. The allowed limit can be increased via an option which the error message refers to.

There's no %<-% or future() call in your code, so it's not clear to me how you parallelize and therefore cannot really give any advice how to improve.

saleonard commented 6 years ago

I just included the code before I tried parallelizing. Below is the code when I tried to do so.

And thank you for the help. I'm an intermediate R user and couldn't figure out if/how I could change future.global.maxSize.

library(future)
plan(multiprocess)

B <- 3
test %<-% function(x) {
  for (i in 1:B) {
    print(i)
    bootstrapped_data %<-% x[sample(row.names(x), nrow(x), replace=TRUE),]
    model %<-% glm(Y ~ A, data=bootstrapped_data, family='binomial') 
    bootstrapped_data$A[bootstrapped_data$A==0] 
    predictions %<-% predict(model, newdata=bootstrapped_data, type= 'response')
    estimates[i] %<-% mean(predictions)
  }
  meanestimate %<-% mean(estimates)
  interval %<-% quantile(estimates, probs = c(0.025, 0.975))

  return(list(meanestimate = meanestimate, interval = interval))
}
HenrikBengtsson commented 6 years ago

It's an R option, which you can change using the options(), e.g. to increase the limit to 768 MiB use:

options(future.globals.maxSize = 768 * 1024^2)

Before continuing, it is important to understand that %<-% is not "magic". If you need to compute something with a sequential dependency, e.g.

A <- f(x)
B <- g(A)
C <- h(B)

you will gain nothing by using:

A %<-% f(x)
B %<-% g(A)
C %<-% h(B)

The reason is that in order for g(A) to be computed, the value of A needs to be available. The future assignment operator %<-% will make sure this is the case, that is, it will only launch

B %<-% g(A)

when A %<-% f(x) has completed.

Now, if you had something like:

A %<-% f(x)
B %<-% g(A)
C %<-% h(A)

instead, then B %<-% g(A) and C %<-% h(A) would be evaluated concurrently.

So, if you look at your:

    bootstrapped_data %<-% x[sample(row.names(x), nrow(x), replace=TRUE),]
    model %<-% glm(Y ~ A, data=bootstrapped_data, family='binomial') 
    bootstrapped_data$A[bootstrapped_data$A==0] 
    predictions %<-% predict(model, newdata=bootstrapped_data, type= 'response')

you have that the second assignment depend on the value of the first, and the fourth assignment on that second and first, which means you can equally well do:

    bootstrapped_data <- x[sample(row.names(x), nrow(x), replace=TRUE),]
    model <- glm(Y ~ A, data=bootstrapped_data, family='binomial') 
    bootstrapped_data$A[bootstrapped_data$A==0] 
    predictions %<-% predict(model, newdata=bootstrapped_data, type= 'response')

So, how do you parallelize then? You wanna run each loop iteration in parallel. Something like:

test <- function(x) {
  est <- listenv::listenv()
  for (i in 1:B) {
    print(i)
    est[[ii]] %<-% {
      bootstrapped_data <- x[sample(row.names(x), nrow(x), replace=TRUE),]
      model <- glm(Y ~ A, data=bootstrapped_data, family='binomial') 
      bootstrapped_data$A[bootstrapped_data$A==0] 
      predictions <- predict(model, newdata=bootstrapped_data, type= 'response')
      mean(predictions)
    }
  }  
  estimates <- unlist(est)

  meanestimate <- mean(estimates)
  interval <- quantile(estimates, probs = c(0.025, 0.975))
  list(meanestimate = meanestimate, interval = interval)
}

The above will run each of the B iterations in parallel. The number of iterations that run at the same time depends on the number of workers you have (i.e. number of cores).

A few comments on the above:

  1. It make little sense to use %<-% for defining function test; that would only define the function in parallel - not make it "magically" parallelize the internal code.

  2. Functions such as mean() and quantile() are already very fast so you'd probably loose more in overhead by trying to evaluate those two in parallel.

  3. With a big sample object being "exported" / copied in each iteration, you might have a too big overhead.

If (3) is the case, you might want to try to rewrite it using an future_lapply() statement. Something like (untested):

test <- function(x) {
  est <- future_lapply(1:B, FUN = function(i) {
    bootstrapped_data <- x[sample(row.names(x), nrow(x), replace=TRUE),]
    model <- glm(Y ~ A, data=bootstrapped_data, family='binomial') 
    bootstrapped_data$A[bootstrapped_data$A==0] 
    predictions <- predict(model, newdata=bootstrapped_data, type= 'response')
    mean(predictions)
  }
  estimates <- unlist(est)

  meanestimate <- mean(estimates)
  interval <- quantile(estimates, probs = c(0.025, 0.975))
  list(meanestimate = meanestimate, interval = interval)
}

UPDATE 2018-01-10: The option is future.globals.maxSize (not future.global.maxSize)

saleonard commented 6 years ago

I really appreciate your thorough reply. It is extremely helpful in understanding the future package and parallel computing in general. I also wasn't aware of future_lapply and after trying it, I found that it worked best.

In case it's useful for anyone in the future, here is the final code that I'm using:

B <- 1000   # Number of bootstrap samples

test <- function(dat) {
  set.seed(1599) # to replicate
  est <- future_lapply(1:B, FUN = function(i) {
    dat_boot <- dat[sample(row.names(dat), nrow(dat), replace=TRUE),]
    model <- glm(Y ~ A, data=dat_boot, family='binomial') 
    dat_boot$A <- rep(0, nrow(dat_boot))
    predictions <- predict(model, newdata=dat_boot, type= 'response')
  }  )
  estimates <- unlist(est)

  meanestimate <- mean(estimates)
  interval <- quantile(estimates, probs = c(0.025, 0.975))
  return(list(meanestimate = meanestimate, interval = interval))
}

test(dataset1)
HenrikBengtsson commented 6 years ago

Good to hear and thanks for reporting back and sharing.

One last important thing since you're doing bootstrapping. Random number generation (RNG) needs special care when running in parallel. There's a whole field of study around this trying make sure you end up getting random numbers that are truly random.

In your case, you want to use future_lapply(..., future.seed = TRUE) to achieve statistically sound RNG. You can also use future_lapply(..., future.seed = 1599). (The default is future.seed = FALSE for performance reasons, because there is some overhead in pre-generating B RNG streams for each iteration). You can read more about this in help("future_lapply").

I'm closing this issue, but feel free to re-open / comment if you have more questions.

saleonard commented 6 years ago

I have a small comment to add. There is a typo in the code throughout this thread.

options(future.global.maxSize = 768 * 1024^2)

should be

options(future.globals.maxSize = 768 * 1024^2)

Thanks to @ck37 for pointing this out.

HenrikBengtsson commented 6 years ago

Thanks. I've updated my comment accordingly for future readers.

mtorressahli commented 1 year ago

Hi

I've tried these two alternatives:

options(future.globals.maxSize = 8*1024^3)

options(future.globals.maxSize = +Inf)

When checking if the option was set, it seems to have worked, either with the higher limit:

> options('future.globals.maxSize')
$future.globals.maxSize
[1] 8589934592

or no limit:

> options('future.globals.maxSize')
$future.globals.maxSize
[1] Inf

But I keep receiving the message as if the option was set at 500 MiB

I'm setting a nested futures plan:

plan(list(
  tweak(multisession, workers = availableCores() %/% 3),
  tweak(multisession, workers = availableCores() %/% 3),
  tweak(multisession, workers = availableCores() %/% 3)
))

Not sure if that might affect the setting?

Im using future_1.29.0, future.apply_1.10.0, and R version 4.2.1 (2022-06-23 ucrt) in Windows 11.

Any help would be much appreciated