HenrikBengtsson / future

:rocket: R package: future: Unified Parallel and Distributed Processing in R for Everyone
https://future.futureverse.org
956 stars 83 forks source link

Need API like parallel::clusterCall #273

Open renkun-ken opened 5 years ago

renkun-ken commented 5 years ago

I need to do some setup on each node in a multisession backend before creating any future. Currently, there's no API like parallel::clusterCall or parallel::clusterEvalQ to call function or evaluation an expression on each node of a cluster and multisession does not seem to expose to user the PSOCK cluster it creates.

Therefore I have to create my own cluster with

library(future)
library(parallel)
cl <- makeClusterPSOCK(30)
clusterCall(cl, function() {
  # setup code
})
plan(cluster, workers = cl)

It would be very nice to provide API on cluster level to make it easier to perform backend setup. If the setup code relies on some variables in calling scope, it would require user to call parallel::clusterExport to export all used variables before parallel::clusterCall, which is best handled by future package just like how variables are resolved when calling future.

This issue looks similar to #272.

jeffkeller87 commented 4 years ago

Would really like to see this feature as well as I often have a similar workflow.

Worth noting that I needed to add persistent = TRUE to the plan. Otherwise each worker throws an error. If bigData wasn't rmoved from the main thread, the futures would work, though only because bigData was being exported again silently, which is possibly unexpected given that it was already exported.

Works, but unexpectedly exports bigData to the cluster again

cl <- makeClusterPSOCK(2)
bigData <- iris
clusterExport(cl, varlist = "bigData")

plan(cluster, workers = cl)

s %<-% {
  Sys.sleep(5)
  summary(bigData)
}

n %<-% {
  Sys.sleep(3)
  nrow(bigData)
}

s
n

Throws an object 'bigData' not found error

cl <- makeClusterPSOCK(2)
bigData <- iris
clusterExport(cl, varlist = "bigData")
rm(bigData)

plan(cluster, workers = cl)

s %<-% {
  Sys.sleep(5)
  summary(bigData)
}

n %<-% {
  Sys.sleep(3)
  nrow(bigData)
}

s
n

Works as desired

cl <- makeClusterPSOCK(2)
bigData <- iris
clusterExport(cl, varlist = "bigData")
rm(bigData)

plan(cluster, workers = cl, persistent = TRUE)

s %<-% {
  Sys.sleep(5)
  summary(bigData)
}

n %<-% {
  Sys.sleep(3)
  nrow(bigData)
}

s
n
HenrikBengtsson commented 4 years ago

See https://github.com/HenrikBengtsson/future/issues/339#issuecomment-533730606 for a slightly better solution that does not rely on using persistent = TRUE (which should be avoided).

Define:

clusterExportSticky <- function(cl, globals) {
  if (is.character(globals)) {
    names <- globals
    globals <- lapply(globals, FUN = get)
    names(globals) <- names
  } else {
    stopifnot(is.list(globals))
  }
  if (length(globals) == 0L) return(invisible(cl))
  stopifnot(!is.null(names(globals)))
  future_sticky_globals <- globals
  parallel::clusterExport(cl, "future_sticky_globals", envir = environment())
  dummy <- parallel::clusterEvalQ(cl, local({
    name <- "future:sticky_env"
    pos <- match(name, search())
    env <- if (is.na(pos)) attach(list(), name = name) else pos.to.env(pos)
    for (name in names(future_sticky_globals))
      assign(name, future_sticky_globals[[name]], envir = env)
    rm(list = "future_sticky_globals")
  }))
  invisible(cl)
}

Then you can do something like:

library(future)
cl <- makeClusterPSOCK(2L)
plan(cluster, workers = cl)

bigData <- iris 

## Export a "sticky" global to workers
clusterExportSticky(cl, "bigData")

a <- 42

s %<-% {
  Sys.sleep(5)
  str(a)
  summary(bigData)
} %globals% structure(TRUE, ignore = "bigData")

n %<-% {
  Sys.sleep(3)
  str(a)
  nrow(bigData)
} %globals% structure(TRUE, ignore = "bigData")

Inspecting the futures, e.g.

> futureOf(s)
ClusterFuture:
Label: '<none>'
Expression:
{
    Sys.sleep(5)
    str(a)
    summary(bigData)
}
Lazy evaluation: FALSE
Asynchronous evaluation: TRUE
Local evaluation: TRUE
Environment: <environment: R_GlobalEnv>
Capture standard output: TRUE
Capture condition classes: 'condition'
Globals: 1 objects totaling 56 bytes (numeric 'a' of 56 bytes)
Packages: 1 packages ('utils')
L'Ecuyer-CMRG RNG seed: <none>
Resolved: TRUE
Value: <not collected>
Conditions captured: <none>
Early signaling: FALSE
Owner process: 991b54e8-4c7b-31c4-d375-535b42939bb1
Class: 'ClusterFuture', 'MultiprocessFuture', 'Future', 'environment'

shows that bigData is ignored and not exported.