Open EmilRehnberg opened 5 years ago
Are you saying it hangs when you use:
future::plan(list(future::multiprocess, future::sequential))
but not
future::plan(future::multiprocess)
which effectively should be identical because when no more plans are specified, it'll fall back to using sequential
?
Also, what's you sessionInfo()
?
Using plan(sequential)
works well.
Using any parallelization does not. The furrr
progress bar goes to 100% and hangs / don't seem to finish.
Thank you for the quick reply. Can you reproduce this?
> sessionInfo()
R version 3.6.1 (2019-07-05)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Ubuntu 18.04.3 LTS
Matrix products: default
BLAS: /usr/lib/x86_64-linux-gnu/blas/libblas.so.3.7.1
LAPACK: /usr/lib/x86_64-linux-gnu/lapack/liblapack.so.3.7.1
locale:
[1] LC_CTYPE=en_US.UTF-8 LC_NUMERIC=C
[3] LC_TIME=en_US.UTF-8 LC_COLLATE=en_US.UTF-8
[5] LC_MONETARY=en_US.UTF-8 LC_MESSAGES=en_US.UTF-8
[7] LC_PAPER=en_US.UTF-8 LC_NAME=C
[9] LC_ADDRESS=C LC_TELEPHONE=C
[11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C
attached base packages:
[1] stats graphics grDevices utils datasets methods base
other attached packages:
[1] mlr3learners_0.1.4 mlr3_0.1.4-9000 nvimcom_0.9-83 setwidth_1.0-4
[5] devtools_2.2.1 usethis_1.5.1 magrittr_1.5
loaded via a namespace (and not attached):
[1] Rcpp_1.0.2 pillar_1.4.2 compiler_3.6.1
[4] prettyunits_1.0.2 R.methodsS3_1.7.1 mlr3misc_0.1.5-9000
[7] R.utils_2.9.0 remotes_2.1.0 tools_3.6.1
[10] testthat_2.2.1 digest_0.6.22 pkgbuild_1.0.6
[13] pkgload_1.0.2 uuid_0.1-2 evaluate_0.14
[16] tibble_2.1.3 memoise_1.1.0 checkmate_1.9.4
[19] pkgconfig_2.0.3 rlang_0.4.1 reprex_0.3.0.9000
[22] cli_1.1.0 parallel_3.6.1 xfun_0.10
[25] knitr_1.25 withr_2.1.2 dplyr_0.8.3
[28] globals_0.12.4 desc_1.2.0 fs_1.3.1
[31] tidyselect_0.2.5 rprojroot_1.3-2 glue_1.3.1
[34] data.table_1.12.6 listenv_0.7.0 R6_2.4.0
[37] processx_3.4.1 rmarkdown_1.16 sessioninfo_1.1.1
[40] clipr_0.7.0 whisker_0.4 purrr_0.3.3
[43] callr_3.3.2 lgr_0.3.3 htmltools_0.4.0
[46] codetools_0.2-16 backports_1.1.5 ps_1.3.0
[49] ellipsis_0.3.0 assertthat_0.2.1 future_1.14.0
[52] paradox_0.1.0-9000 crayon_1.3.4 R.oo_1.23.0
Also, I think I forgot to mention that if I run do furrr
and mlr3
parallel runs without nesting, everything's working great.
@HenrikBengtsson
I got a segfault when I ran the parallelized run on a mac. (Sequential run was fine)
*** caught segfault ***
address 0x10c8f1020, cause 'memory not mapped'
Traceback:
1: strsplit(on, "(?=[`])", perl = TRUE)
2: .parse_on(substitute(on), isnull_inames)
3: `[.data.table`(self$col_info, id == target)
4: self$col_info[id == target]
5: .subset2(public_bind_env, "initialize")(...)
6: mlr3::TaskRegr$new("ttsk", backend = dta, target = target_column)
7: ...future.f(...future.x_jj, ...)
8: .f(.x[[i]], ...)
9: ...future.map(seq_along(...future.x_ii), .f = function(jj) { ...future.x_jj <- ...future.x_ii[[jj]] .out <- ...future.f(...future.x_jj, ...) if (
.progress) update_progress(temp_file_con) .out})
10: eval(quote({ ...future.f.env <- environment(...future.f) if (!is.null(...future.f.env$`~`)) { if (is_bad_rlang_tilde(...future.f.env$`~`)) {
...future.f.env$`~` <- base::`~` } } if (.progress) { temp_file_con <- file(temp_file, "a") on.exit(close(temp_file_con)
) } ...future.map(seq_along(...future.x_ii), .f = function(jj) { ...future.x_jj <- ...future.x_ii[[jj]] .out <- ...future.f(...future.x_jj
, ...) if (.progress) update_progress(temp_file_con) .out })}), new.env())
11: eval(quote({ ...future.f.env <- environment(...future.f) if (!is.null(...future.f.env$`~`)) { if (is_bad_rlang_tilde(...future.f.env$`~`)) {
...future.f.env$`~` <- base::`~` } } if (.progress) { temp_file_con <- file(temp_file, "a") on.exit(close(temp_file_con)
) } ...future.map(seq_along(...future.x_ii), .f = function(jj) { ...future.x_jj <- ...future.x_ii[[jj]] .out <- ...future.f(...future.x_jj
, ...) if (.progress) update_progress(temp_file_con) .out })}), new.env())
12: eval(expr, p)
13: eval(expr, p)
14: eval.parent(substitute(eval(quote(expr), envir)))
15: local({ ...future.f.env <- environment(...future.f) if (!is.null(...future.f.env$`~`)) { if (is_bad_rlang_tilde(...future.f.env$`~`)) {
...future.f.env$`~` <- base::`~` } } if (.progress) { temp_file_con <- file(temp_file, "a") on.exit(close(temp_file_con))
} ...future.map(seq_along(...future.x_ii), .f = function(jj) { ...future.x_jj <- ...future.x_ii[[jj]] .out <- ...future.f(...future.x_jj, ...
) if (.progress) update_progress(temp_file_con) .out })})
16: withVisible(local({ ...future.f.env <- environment(...future.f) if (!is.null(...future.f.env$`~`)) { if (is_bad_rlang_tilde(...future.f.env$`
~`)) { ...future.f.env$`~` <- base::`~` } } if (.progress) { temp_file_con <- file(temp_file, "a") on.exit(close(temp_fi
le_con)) } ...future.map(seq_along(...future.x_ii), .f = function(jj) { ...future.x_jj <- ...future.x_ii[[jj]] .out <- ...future.f(...futu
re.x_jj, ...) if (.progress) update_progress(temp_file_con) .out })}))
17: withCallingHandlers({ ...future.value <- withVisible(local({ ...future.f.env <- environment(...future.f) if (!is.null(...future.f.env$`~`
)) { if (is_bad_rlang_tilde(...future.f.env$`~`)) { ...future.f.env$`~` <- base::`~` } } if (.progress) {
temp_file_con <- file(temp_file, "a") on.exit(close(temp_file_con)) } ...future.map(seq_along(...future.x_ii), .f = functio
n(jj) { ...future.x_jj <- ...future.x_ii[[jj]] .out <- ...future.f(...future.x_jj, ...) if (.progress) update
_progress(temp_file_con) .out }) })) future::FutureResult(value = ...future.value$value, visible = ...future.value$visible, st
arted = ...future.startTime, version = "1.8")}, condition = local({ inherits <- base::inherits invokeRestart <- base::invokeRestart length <- base::l
ength seq.int <- base::seq.int sys.calls <- base::sys.calls `[[` <- base::`[[` `+` <- base::`+` `<<-` <- base::`<<-` sysCalls <- function(ca
lls = sys.calls(), from = 1L) { calls[seq.int(from = from + 12L, to = length(calls) - 3L)] } function(cond) { if (inherits(con
d, "error")) { ...future.conditions[[length(...future.conditions) + 1L]] <<- list(condition = cond, calls = c(sysCalls(from = ...fu
ture.frame), cond$call), timestamp = base::Sys.time(), signaled = 0L) signalCondition(cond) } else if (inherits(cond,
"condition")) { signal <- FALSE && inherits(cond, character(0)) ...future.conditions[[length(...future.conditions) + 1L
]] <<- list(condition = cond, signaled = as.integer(signal)) if (!signal) { muffleCondition <- function (cond) {
inherits <- base::inherits invokeRestart <- base::invokeRestart muffled <- FALSE if (inherits
(cond, "message")) { invokeRestart("muffleMessage") muffled <- TRUE } else if (inherit
s(cond, "warning")) { invokeRestart("muffleWarning") muffled <- TRUE } else if (inheri
ts(cond, "condition")) { computeRestarts <- base::computeRestarts grepl <- base::grepl is.null <- bas
e::is.null restarts <- computeRestarts(cond) for (restart in restarts) { name <- restart$name
if (is.null(name)) next if (!grepl("^muffle", name)) next
invokeRestart(restart) muffled <- TRUE break } } invisibl
e(muffled) } muffleCondition(cond) } } }}))
18: doTryCatch(return(expr), name, parentenv, handler)
19: tryCatchOne(expr, names, parentenv, handlers[[1L]])
20: tryCatchList(expr, classes, parentenv, handlers)
21: tryCatch({ withCallingHandlers({ ...future.value <- withVisible(local({ ...future.f.env <- environment(...future.f) if (!i
s.null(...future.f.env$`~`)) { if (is_bad_rlang_tilde(...future.f.env$`~`)) { ...future.f.env$`~` <- base::`~`
} } if (.progress) { temp_file_con <- file(temp_file, "a") on.exit(close(temp_file_con)) }
...future.map(seq_along(...future.x_ii), .f = function(jj) { ...future.x_jj <- ...future.x_ii[[jj]] .out <- ...future.f(.
..future.x_jj, ...) if (.progress) update_progress(temp_file_con) .out }) })) future:
:FutureResult(value = ...future.value$value, visible = ...future.value$visible, started = ...future.startTime, version = "1.8") }, condition =
local({ inherits <- base::inherits invokeRestart <- base::invokeRestart length <- base::length seq.int <- base::seq.int sys
.calls <- base::sys.calls `[[` <- base::`[[` `+` <- base::`+` `<<-` <- base::`<<-` sysCalls <- function(calls = sys.calls(), from
= 1L) { calls[seq.int(from = from + 12L, to = length(calls) - 3L)] } function(cond) { if (inherits(cond, "
error")) { ...future.conditions[[length(...future.conditions) + 1L]] <<- list(condition = cond, calls = c(sysCalls(from = ...
future.frame), cond$call), timestamp = base::Sys.time(), signaled = 0L) signalCondition(cond) } else if
(inherits(cond, "condition")) { signal <- FALSE && inherits(cond, character(0)) ...future.conditions[[length(...future.conditio
ns) + 1L]] <<- list(condition = cond, signaled = as.integer(signal)) if (!signal) { muffleCondition <- funct
ion (cond) { inherits <- base::inherits invokeRestart <- base::invokeRestart muffle
d <- FALSE if (inherits(cond, "message")) { invokeRestart("muffleMessage") muffled <- TRUE
} else if (inherits(cond, "warning")) { invokeRestart("muffleWarning") muffled <- TR
UE } else if (inherits(cond, "condition")) { computeRestarts <- base::computeRestarts
grepl <- base::grepl is.null <- base::is.null restarts <- computeRestarts(cond) for (re
start in restarts) { name <- restart$name if (is.null(name)) next
if (!grepl("^muffle", name)) next invokeRestart(restart) muffled <- TRUE
break } } invisible(muffled) } muffleCondition(cond)
} } } }))}, error = function(ex) { structure(list(value = NULL, visible = NULL, conditions = ...future.conditions,
version = "1.8"), class = "FutureResult")}, finally = { { { options(mc.cores = ...future.mc.cores.old) } future::plan(
list(function (expr, envir = parent.frame(), substitute = TRUE, lazy = FALSE, seed = NULL, globals = TRUE, workers = availableCores(),
gc = FALSE, earlySignal = FALSE, label = NULL, ...) { if (substitute) expr <- substitute(expr) fun
<- if (supportsMulticore(warn = TRUE)) multicore else multisession fun(expr = expr, envir = envir, substitute = FALSE,
lazy = lazy, seed = seed, globals = globals, workers = workers, gc = gc, earlySignal = earlySignal, label = lab
el, ...) }, function (expr, envir = parent.frame(), substitute = TRUE, lazy = FALSE, seed = NULL, globals = TRUE, local = TRUE,
earlySignal = FALSE, label = NULL, ...) { if (substitute) expr <- substitute(expr) local <- as.logical(local)
future <- SequentialFuture(expr = expr, envir = envir, substitute = FALSE, lazy = lazy, seed = seed, globals = globa
ls, local = local, earlySignal = earlySignal, label = label, ...) if (!future$lazy) future <- run(future)
invisible(future) }), .cleanup = FALSE, .init = FALSE) } options(...future.oldOptions)})
22: eval(expr, env)
23: doTryCatch(return(expr), name, parentenv, handler)
24: tryCatchOne(expr, names, parentenv, handlers[[1L]])
25: tryCatchList(expr, classes, parentenv, handlers)
26: tryCatch(expr, error = function(e) { call <- conditionCall(e) if (!is.null(call)) { if (identical(call[[1L]], quote(doTryCatch)))
call <- sys.call(-4L) dcall <- deparse(call)[1L] prefix <- paste("Error in", dcall, ": ") LONG <- 75L sm <- strsplit(conditionMes
sage(e), "\n")[[1L]] w <- 14L + nchar(dcall, type = "w") + nchar(sm[1L], type = "w") if (is.na(w)) w <- 14L + nchar(dcall, type = "b
") + nchar(sm[1L], type = "b") if (w > LONG) prefix <- paste0(prefix, "\n ") } else prefix <- "Error : " msg <- p
aste0(prefix, conditionMessage(e), "\n") .Internal(seterrmessage(msg[1L])) if (!silent && isTRUE(getOption("show.error.messages"))) { cat(msg, fi
le = outFile) .Internal(printDeferredWarnings()) } invisible(structure(msg, class = "try-error", condition = e))})
27: try(eval(expr, env), silent = TRUE)
28: sendMaster(try(eval(expr, env), silent = TRUE))
29: (function (expr, name, mc.set.seed = TRUE, silent = FALSE, mc.affinity = NULL, mc.interactive = FALSE, detached = FALSE) { f <- mcfork(detached)
env <- parent.frame() if (isTRUE(mc.set.seed)) mc.advance.stream() if (inherits(f, "masterProcess")) { on.exit(mcexit(1L, structure("fat
al error in wrapper code", class = "try-error"))) if (isTRUE(mc.set.seed)) mc.set.stream() mc.interactive <- as.logical(
mc.interactive) if (isTRUE(mc.interactive)) mcinteractive(TRUE) if (isTRUE(!mc.interactive)) mcinteractive(FALSE)
if (!is.null(mc.affinity)) mcaffinity(mc.affinity) if (isTRUE(silent)) closeStdout(TRUE) if (detached) { on.
exit(mcexit(1L)) eval(expr, env) mcexit(0L) } sendMaster(try(eval(expr, env), silent = TRUE)) mcexit(0L) } if
(!missing(name) && !is.null(name)) f$name <- as.character(name)[1L] class(f) <- c("parallelJob", class(f)) f})({ { ...future.startTim
e <- Sys.time() ...future.oldOptions <- options(future.startup.script = FALSE, future.globals.onMissing = "ignore", future.globals.maxSize
= NULL, future.globals.method = NULL, future.globals.onMissing = "ignore", future.globals.onReference = NULL, future.globals.resolve =
NULL, future.resolve.recursive = NULL, width = 158L) { { { { has_future <- r
equireNamespace("future", quietly = TRUE) version <- if (has_future) packageVersion("future")
else NULL if (!has_future || version < "1.8.0") { info <- c(r_version = gsub("R version ",
"", R.version$version.string), platform = sprintf("%s (%s-bit)", R.version$platform, 8 * .Machine$sizeof.pointer),
os = paste(Sys.info()[c("sysname", "release", "version")], collapse = " "), hostname = Sys.info()[["nodename"]])
info <- sprintf("%s: %s", names(info), info) info <- paste(info, collapse = "; ")
if (!has_future) { msg <- sprintf("Package 'future' is not installed on worker (%s)", info)
} else { msg <- sprintf("Package 'future' on worker (%s) must be of version >= 1.8.0: %s",
info, version) } stop(msg) } } ...future.mc.
cores.old <- getOption("mc.cores") options(mc.cores = 1L) } local({ for (pkg in c("purrr", "fu
ture")) { loadNamespace(pkg) library(pkg, character.only = TRUE) } }) }
future::plan(list(function (expr, envir = parent.frame(), substitute = TRUE, lazy = FALSE, seed = NULL, globals = TRUE,
local = TRUE, earlySignal = FALSE, label = NULL, ...) { if (substitute) expr <- substitute(expr)
local <- as.logical(local) future <- SequentialFuture(expr = expr, envir = envir, substitute = FALSE, lazy = l
azy, seed = seed, globals = globals, local = local, earlySignal = earlySignal, label = label, ...) if (!fut
ure$lazy) future <- run(future) invisible(future) }), .cleanup = FALSE, .init = FALSE) } } if (is.na(
TRUE)) { } else { if (TRUE) { ...future.stdout <- rawConnection(raw(0L), open = "w") } else { ...future.stdou
t <- file(switch(.Platform$OS.type, windows = "NUL", "/dev/null"), open = "w") } sink(...future.stdout, type = "output", split =
FALSE) on.exit(if (!is.null(...future.stdout)) { sink(type = "output", split = FALSE) close(...future.stdout) }, add = TR
UE) } ...future.frame <- sys.nframe() ...future.conditions <- list() ...future.result <- tryCatch({ withCallingHandlers({ ...fut
ure.value <- withVisible(local({ ...future.f.env <- environment(...future.f) if (!is.null(...future.f.env$`~`)) {
if (is_bad_rlang_tilde(...future.f.env$`~`)) { ...future.f.env$`~` <- base::`~` } } if (.
progress) { temp_file_con <- file(temp_file, "a") on.exit(close(temp_file_con)) } ...future.ma
p(seq_along(...future.x_ii), .f = function(jj) { ...future.x_jj <- ...future.x_ii[[jj]] .out <- ...future.f(...future.x_jj,
...) if (.progress) update_progress(temp_file_con) .out }) })) futu
re::FutureResult(value = ...future.value$value, visible = ...future.value$visible, started = ...future.startTime, version = "1
.8") }, condition = local({ inherits <- base::inherits invokeRestart <- base::invokeRestart length <- base::length
seq.int <- base::seq.int sys.calls <- base::sys.calls `[[` <- base::`[[` `+` <- base::`+` `<<-` <- base::`
<<-` sysCalls <- function(calls = sys.calls(), from = 1L) { calls[seq.int(from = from + 12L, to = length(calls) -
3L)] } function(cond) { if (inherits(cond, "error")) { ...future.conditions[[length(...future.conditions
) + 1L]] <<- list(condition = cond, calls = c(sysCalls(from = ...future.frame), cond$call), timestamp = base::Sys.time
(), signaled = 0L) signalCondition(cond) } else if (inherits(cond, "condition")) {
signal <- FALSE && inherits(cond, character(0)) ...future.conditions[[length(...future.conditions) + 1L]] <<- li
st(condition = cond, signaled = as.integer(signal)) if (!signal) { muffleCondition <- function (cond)
{ inherits <- base::inherits invokeRestart <- base::invokeRestart muffled <- FALSE
if (inherits(cond, "message")) { invokeRestart("muffleMessage") muffled <- TRUE }
else if (inherits(cond, "warning")) { invokeRestart("muffleWarning") muffled <- TRUE
} else if (inherits(cond, "condition")) { computeRestarts <- base::computeRestarts
grepl <- base::grepl is.null <- base::is.null restarts <- computeRestarts(cond) fo
r (restart in restarts) { name <- restart$name if (is.null(name)) next
if (!grepl("^muffle", name)) next invokeRestart(restart) muffled
<- TRUE break } } invisible(muffled) }
muffleCondition(cond) } } } })) }, error = function(ex) { structure(list(value = NULL, v
isible = NULL, conditions = ...future.conditions, version = "1.8"), class = "FutureResult") }, finally = { { {
options(mc.cores = ...future.mc.cores.old) } future::plan(list(function (expr, envir = parent.frame(), substitute = TRU
E, lazy = FALSE, seed = NULL, globals = TRUE, workers = availableCores(), gc = FALSE, earlySignal = FALSE, label = NULL, ...)
{ if (substitute) expr <- substitute(expr) fun <- if (supportsMulticore(warn = TRUE))
multicore else multisession fun(expr = expr, envir = envir, substitute = FALSE, lazy = lazy, seed = seed
, globals = globals, workers = workers, gc = gc, earlySignal = earlySignal, label = label, ...) }, function (ex
pr, envir = parent.frame(), substitute = TRUE, lazy = FALSE, seed = NULL, globals = TRUE, local = TRUE, earlySignal = FALSE, l
abel = NULL, ...) { if (substitute) expr <- substitute(expr) local <- as.logical(local)
future <- SequentialFuture(expr = expr, envir = envir, substitute = FALSE, lazy = lazy, seed = seed, globals = global
s, local = local, earlySignal = earlySignal, label = label, ...) if (!future$lazy) future <- run(future)
invisible(future) }), .cleanup = FALSE, .init = FALSE) } options(...future.oldOptions) }) if (is.na(TRUE)) { }
else { sink(type = "output", split = FALSE) if (TRUE) { ...future.result$stdout <- rawToChar(rawConnectionValue(...future.stdout))
} else { ...future.result["stdout"] <- list(NULL) } close(...future.stdout) ...future.stdout <- NULL } ..
.future.result$conditions <- ...future.conditions ...future.result})
30: do.call(parallel::mcparallel, args = future.args, envir = envir)
31: run.MulticoreFuture(future)
32: run(future)
33: fun(expr = expr, envir = envir, substitute = FALSE, lazy = lazy, seed = seed, globals = globals, workers = workers, gc = gc, earlySignal = earlySi
gnal, label = label, ...)
34: makeFuture(...)
35: .makeFuture(expr, substitute = FALSE, envir = envir, globals = globals, packages = packages, seed = seed, lazy = lazy, ...)
36: future({ ...future.f.env <- environment(...future.f) if (!is.null(...future.f.env$`~`)) { if (is_bad_rlang_tilde(...future.f.env$`~`)) {
...future.f.env$`~` <- base::`~` } } if (.progress) { temp_file_con <- file(temp_file, "a") on.exit(close(temp_file_con))
} ...future.map(seq_along(...future.x_ii), .f = function(jj) { ...future.x_jj <- ...future.x_ii[[jj]] .out <- ...future.f(...future.x_jj, ..
.) if (.progress) update_progress(temp_file_con) .out })}, envir = envir, lazy = .options$lazy, globals = globals_ii, package
s = packages)
37: future_map_template(purrr::map, "list", .x, .f, ..., .progress = .progress, .options = .options)
38: furrr::future_map(dtas_lst, run_xgb, .progress = TRUE)
An irrecoverable exception occurred. R is aborting now ...
*** caught segfault ***
address 0x10c8f1020, cause 'memory not mapped'
Traceback:
1: strsplit(on, "(?=[`])", perl = TRUE)
2: .parse_on(substitute(on), isnull_inames)
3: `[.data.table`(self$col_info, id == target)
4: self$col_info[id == target]
5: .subset2(public_bind_env, "initialize")(...)
6: mlr3::TaskRegr$new("ttsk", backend = dta, target = target_column)
7: ...future.f(...future.x_jj, ...)
8: .f(.x[[i]], ...)
9: ...future.map(seq_along(...future.x_ii), .f = function(jj) { ...future.x_jj <- ...future.x_ii[[jj]] .out <- ...future.f(...future.x_jj, ...) if (
.progress) update_progress(temp_file_con) .out})
10: eval(quote({ ...future.f.env <- environment(...future.f) if (!is.null(...future.f.env$`~`)) { if (is_bad_rlang_tilde(...future.f.env$`~`)) {
...future.f.env$`~` <- base::`~` } } if (.progress) { temp_file_con <- file(temp_file, "a") on.exit(close(temp_file_con)
) } ...future.map(seq_along(...future.x_ii), .f = function(jj) { ...future.x_jj <- ...future.x_ii[[jj]] .out <- ...future.f(...future.x_jj
, ...) if (.progress) update_progress(temp_file_con) .out })}), new.env())
11: eval(quote({ ...future.f.env <- environment(...future.f) if (!is.null(...future.f.env$`~`)) { if (is_bad_rlang_tilde(...future.f.env$`~`)) {
...future.f.env$`~` <- base::`~` } } if (.progress) { temp_file_con <- file(temp_file, "a") on.exit(close(temp_file_con)
) } ...future.map(seq_along(...future.x_ii), .f = function(jj) { ...future.x_jj <- ...future.x_ii[[jj]] .out <- ...future.f(...future.x_jj
, ...) if (.progress) update_progress(temp_file_con) .out })}), new.env())
12: eval(expr, p)
13: eval(expr, p)
14: eval.parent(substitute(eval(quote(expr), envir)))
15: local({ ...future.f.env <- environment(...future.f) if (!is.null(...future.f.env$`~`)) { if (is_bad_rlang_tilde(...future.f.env$`~`)) {
...future.f.env$`~` <- base::`~` } } if (.progress) { temp_file_con <- file(temp_file, "a") on.exit(close(temp_file_con))
} ...future.map(seq_along(...future.x_ii), .f = function(jj) { ...future.x_jj <- ...future.x_ii[[jj]] .out <- ...future.f(...future.x_jj, ...
) if (.progress) update_progress(temp_file_con) .out })})
16: withVisible(local({ ...future.f.env <- environment(...future.f) if (!is.null(...future.f.env$`~`)) { if (is_bad_rlang_tilde(...future.f.env$`
~`)) { ...future.f.env$`~` <- base::`~` } } if (.progress) { temp_file_con <- file(temp_file, "a") on.exit(close(temp_fi
le_con)) } ...future.map(seq_along(...future.x_ii), .f = function(jj) { ...future.x_jj <- ...future.x_ii[[jj]] .out <- ...future.f(...futu
re.x_jj, ...) if (.progress) update_progress(temp_file_con) .out })}))
17: withCallingHandlers({ ...future.value <- withVisible(local({ ...future.f.env <- environment(...future.f) if (!is.null(...future.f.env$`~`
)) { if (is_bad_rlang_tilde(...future.f.env$`~`)) { ...future.f.env$`~` <- base::`~` } } if (.progress) {
temp_file_con <- file(temp_file, "a") on.exit(close(temp_file_con)) } ...future.map(seq_along(...future.x_ii), .f = functio
n(jj) { ...future.x_jj <- ...future.x_ii[[jj]] .out <- ...future.f(...future.x_jj, ...) if (.progress) update
_progress(temp_file_con) .out }) })) future::FutureResult(value = ...future.value$value, visible = ...future.value$visible, st
arted = ...future.startTime, version = "1.8")}, condition = local({ inherits <- base::inherits invokeRestart <- base::invokeRestart length <- base::l
ength seq.int <- base::seq.int sys.calls <- base::sys.calls `[[` <- base::`[[` `+` <- base::`+` `<<-` <- base::`<<-` sysCalls <- function(ca
lls = sys.calls(), from = 1L) { calls[seq.int(from = from + 12L, to = length(calls) - 3L)] } function(cond) { if (inherits(con
d, "error")) { ...future.conditions[[length(...future.conditions) + 1L]] <<- list(condition = cond, calls = c(sysCalls(from = ...fu
ture.frame), cond$call), timestamp = base::Sys.time(), signaled = 0L) signalCondition(cond) } else if (inherits(cond,
"condition")) { signal <- FALSE && inherits(cond, character(0)) ...future.conditions[[length(...future.conditions) + 1L
]] <<- list(condition = cond, signaled = as.integer(signal)) if (!signal) { muffleCondition <- function (cond) {
inherits <- base::inherits invokeRestart <- base::invokeRestart muffled <- FALSE if (inherits
(cond, "message")) { invokeRestart("muffleMessage") muffled <- TRUE } else if (inherit
s(cond, "warning")) { invokeRestart("muffleWarning") muffled <- TRUE } else if (inheri
ts(cond, "condition")) { computeRestarts <- base::computeRestarts grepl <- base::grepl is.null <- bas
e::is.null restarts <- computeRestarts(cond) for (restart in restarts) { name <- restart$name
if (is.null(name)) next if (!grepl("^muffle", name)) next
invokeRestart(restart) muffled <- TRUE break } } invisibl
e(muffled) } muffleCondition(cond) } } }}))
18: doTryCatch(return(expr), name, parentenv, handler)
19: tryCatchOne(expr, names, parentenv, handlers[[1L]])
20: tryCatchList(expr, classes, parentenv, handlers)
21: tryCatch({ withCallingHandlers({ ...future.value <- withVisible(local({ ...future.f.env <- environment(...future.f) if (!i
s.null(...future.f.env$`~`)) { if (is_bad_rlang_tilde(...future.f.env$`~`)) { ...future.f.env$`~` <- base::`~`
} } if (.progress) { temp_file_con <- file(temp_file, "a") on.exit(close(temp_file_con)) }
...future.map(seq_along(...future.x_ii), .f = function(jj) { ...future.x_jj <- ...future.x_ii[[jj]] .out <- ...future.f(.
..future.x_jj, ...) if (.progress) update_progress(temp_file_con) .out }) })) future:
:FutureResult(value = ...future.value$value, visible = ...future.value$visible, started = ...future.startTime, version = "1.8") }, condition =
local({ inherits <- base::inherits invokeRestart <- base::invokeRestart length <- base::length seq.int <- base::seq.int sys
.calls <- base::sys.calls `[[` <- base::`[[` `+` <- base::`+` `<<-` <- base::`<<-` sysCalls <- function(calls = sys.calls(), from
= 1L) { calls[seq.int(from = from + 12L, to = length(calls) - 3L)] } function(cond) { if (inherits(cond, "
error")) { ...future.conditions[[length(...future.conditions) + 1L]] <<- list(condition = cond, calls = c(sysCalls(from = ...
future.frame), cond$call), timestamp = base::Sys.time(), signaled = 0L) signalCondition(cond) } else if
(inherits(cond, "condition")) { signal <- FALSE && inherits(cond, character(0)) ...future.conditions[[length(...future.conditio
ns) + 1L]] <<- list(condition = cond, signaled = as.integer(signal)) if (!signal) { muffleCondition <- funct
ion (cond) { inherits <- base::inherits invokeRestart <- base::invokeRestart muffle
d <- FALSE if (inherits(cond, "message")) { invokeRestart("muffleMessage") muffled <- TRUE
} else if (inherits(cond, "warning")) { invokeRestart("muffleWarning") muffled <- TR
UE } else if (inherits(cond, "condition")) { computeRestarts <- base::computeRestarts
grepl <- base::grepl is.null <- base::is.null restarts <- computeRestarts(cond) for (re
start in restarts) { name <- restart$name if (is.null(name)) next
if (!grepl("^muffle", name)) next invokeRestart(restart) muffled <- TRUE
break } } invisible(muffled) } muffleCondition(cond)
} } } }))}, error = function(ex) { structure(list(value = NULL, visible = NULL, conditions = ...future.conditions,
version = "1.8"), class = "FutureResult")}, finally = { { { options(mc.cores = ...future.mc.cores.old) } future::plan(
list(function (expr, envir = parent.frame(), substitute = TRUE, lazy = FALSE, seed = NULL, globals = TRUE, workers = availableCores(),
gc = FALSE, earlySignal = FALSE, label = NULL, ...) { if (substitute) expr <- substitute(expr) fun
<- if (supportsMulticore(warn = TRUE)) multicore else multisession fun(expr = expr, envir = envir, substitute = FALSE,
lazy = lazy, seed = seed, globals = globals, workers = workers, gc = gc, earlySignal = earlySignal, label = lab
el, ...) }, function (expr, envir = parent.frame(), substitute = TRUE, lazy = FALSE, seed = NULL, globals = TRUE, local = TRUE,
earlySignal = FALSE, label = NULL, ...) { if (substitute) expr <- substitute(expr) local <- as.logical(local)
future <- SequentialFuture(expr = expr, envir = envir, substitute = FALSE, lazy = lazy, seed = seed, globals = globa
ls, local = local, earlySignal = earlySignal, label = label, ...) if (!future$lazy) future <- run(future)
invisible(future) }), .cleanup = FALSE, .init = FALSE) } options(...future.oldOptions)})
22: eval(expr, env)
23: doTryCatch(return(expr), name, parentenv, handler)
24: tryCatchOne(expr, names, parentenv, handlers[[1L]])
25: tryCatchList(expr, classes, parentenv, handlers)
26: tryCatch(expr, error = function(e) { call <- conditionCall(e) if (!is.null(call)) { if (identical(call[[1L]], quote(doTryCatch)))
call <- sys.call(-4L) dcall <- deparse(call)[1L] prefix <- paste("Error in", dcall, ": ") LONG <- 75L sm <- strsplit(conditionMes
sage(e), "\n")[[1L]] w <- 14L + nchar(dcall, type = "w") + nchar(sm[1L], type = "w") if (is.na(w)) w <- 14L + nchar(dcall, type = "b
") + nchar(sm[1L], type = "b") if (w > LONG) prefix <- paste0(prefix, "\n ") } else prefix <- "Error : " msg <- p
aste0(prefix, conditionMessage(e), "\n") .Internal(seterrmessage(msg[1L])) if (!silent && isTRUE(getOption("show.error.messages"))) { cat(msg, fi
le = outFile) .Internal(printDeferredWarnings()) } invisible(structure(msg, class = "try-error", condition = e))})
27: try(eval(expr, env), silent = TRUE)
28: sendMaster(try(eval(expr, env), silent = TRUE))
29: (function (expr, name, mc.set.seed = TRUE, silent = FALSE, mc.affinity = NULL, mc.interactive = FALSE, detached = FALSE) { f <- mcfork(detached)
env <- parent.frame() if (isTRUE(mc.set.seed)) mc.advance.stream() if (inherits(f, "masterProcess")) { on.exit(mcexit(1L, structure("fat
al error in wrapper code", class = "try-error"))) if (isTRUE(mc.set.seed)) mc.set.stream() mc.interactive <- as.logical(
mc.interactive) if (isTRUE(mc.interactive)) mcinteractive(TRUE) if (isTRUE(!mc.interactive)) mcinteractive(FALSE)
if (!is.null(mc.affinity)) mcaffinity(mc.affinity) if (isTRUE(silent)) closeStdout(TRUE) if (detached) { on.
exit(mcexit(1L)) eval(expr, env) mcexit(0L) } sendMaster(try(eval(expr, env), silent = TRUE)) mcexit(0L) } if
(!missing(name) && !is.null(name)) f$name <- as.character(name)[1L] class(f) <- c("parallelJob", class(f)) f})({ { ...future.startTim
e <- Sys.time() ...future.oldOptions <- options(future.startup.script = FALSE, future.globals.onMissing = "ignore", future.globals.maxSize
= NULL, future.globals.method = NULL, future.globals.onMissing = "ignore", future.globals.onReference = NULL, future.globals.resolve =
NULL, future.resolve.recursive = NULL, width = 158L) { { { { has_future <- r
equireNamespace("future", quietly = TRUE) version <- if (has_future) packageVersion("future")
else NULL if (!has_future || version < "1.8.0") { info <- c(r_version = gsub("R version ",
"", R.version$version.string), platform = sprintf("%s (%s-bit)", R.version$platform, 8 * .Machine$sizeof.pointer),
os = paste(Sys.info()[c("sysname", "release", "version")], collapse = " "), hostname = Sys.info()[["nodename"]])
info <- sprintf("%s: %s", names(info), info) info <- paste(info, collapse = "; ")
if (!has_future) { msg <- sprintf("Package 'future' is not installed on worker (%s)", info)
} else { msg <- sprintf("Package 'future' on worker (%s) must be of version >= 1.8.0: %s",
info, version) } stop(msg) } } ...future.mc.
cores.old <- getOption("mc.cores") options(mc.cores = 1L) } local({ for (pkg in c("purrr", "fu
ture")) { loadNamespace(pkg) library(pkg, character.only = TRUE) } }) }
future::plan(list(function (expr, envir = parent.frame(), substitute = TRUE, lazy = FALSE, seed = NULL, globals = TRUE,
local = TRUE, earlySignal = FALSE, label = NULL, ...) { if (substitute) expr <- substitute(expr)
local <- as.logical(local) future <- SequentialFuture(expr = expr, envir = envir, substitute = FALSE, lazy = l
azy, seed = seed, globals = globals, local = local, earlySignal = earlySignal, label = label, ...) if (!fut
ure$lazy) future <- run(future) invisible(future) }), .cleanup = FALSE, .init = FALSE) } } if (is.na(
TRUE)) { } else { if (TRUE) { ...future.stdout <- rawConnection(raw(0L), open = "w") } else { ...future.stdou
t <- file(switch(.Platform$OS.type, windows = "NUL", "/dev/null"), open = "w") } sink(...future.stdout, type = "output", split =
FALSE) on.exit(if (!is.null(...future.stdout)) { sink(type = "output", split = FALSE) close(...future.stdout) }, add = TR
UE) } ...future.frame <- sys.nframe() ...future.conditions <- list() ...future.result <- tryCatch({ withCallingHandlers({ ...fut
ure.value <- withVisible(local({ ...future.f.env <- environment(...future.f) if (!is.null(...future.f.env$`~`)) {
if (is_bad_rlang_tilde(...future.f.env$`~`)) { ...future.f.env$`~` <- base::`~` } } if (.
progress) { temp_file_con <- file(temp_file, "a") on.exit(close(temp_file_con)) } ...future.ma
p(seq_along(...future.x_ii), .f = function(jj) { ...future.x_jj <- ...future.x_ii[[jj]] .out <- ...future.f(...future.x_jj,
...) if (.progress) update_progress(temp_file_con) .out }) })) futu
re::FutureResult(value = ...future.value$value, visible = ...future.value$visible, started = ...future.startTime, version = "1
.8") }, condition = local({ inherits <- base::inherits invokeRestart <- base::invokeRestart length <- base::length
seq.int <- base::seq.int sys.calls <- base::sys.calls `[[` <- base::`[[` `+` <- base::`+` `<<-` <- base::`
<<-` sysCalls <- function(calls = sys.calls(), from = 1L) { calls[seq.int(from = from + 12L, to = length(calls) -
3L)] } function(cond) { if (inherits(cond, "error")) { ...future.conditions[[length(...future.conditions
) + 1L]] <<- list(condition = cond, calls = c(sysCalls(from = ...future.frame), cond$call), timestamp = base::Sys.time
(), signaled = 0L) signalCondition(cond) } else if (inherits(cond, "condition")) {
signal <- FALSE && inherits(cond, character(0)) ...future.conditions[[length(...future.conditions) + 1L]] <<- li
st(condition = cond, signaled = as.integer(signal)) if (!signal) { muffleCondition <- function (cond)
{ inherits <- base::inherits invokeRestart <- base::invokeRestart muffled <- FALSE
if (inherits(cond, "message")) { invokeRestart("muffleMessage") muffled <- TRUE }
else if (inherits(cond, "warning")) { invokeRestart("muffleWarning") muffled <- TRUE
} else if (inherits(cond, "condition")) { computeRestarts <- base::computeRestarts
grepl <- base::grepl is.null <- base::is.null restarts <- computeRestarts(cond) fo
r (restart in restarts) { name <- restart$name if (is.null(name)) next
if (!grepl("^muffle", name)) next invokeRestart(restart) muffled
<- TRUE break } } invisible(muffled) }
muffleCondition(cond) } } } })) }, error = function(ex) { structure(list(value = NULL, v
isible = NULL, conditions = ...future.conditions, version = "1.8"), class = "FutureResult") }, finally = { { {
options(mc.cores = ...future.mc.cores.old) } future::plan(list(function (expr, envir = parent.frame(), substitute = TRU
E, lazy = FALSE, seed = NULL, globals = TRUE, workers = availableCores(), gc = FALSE, earlySignal = FALSE, label = NULL, ...)
{ if (substitute) expr <- substitute(expr) fun <- if (supportsMulticore(warn = TRUE))
multicore else multisession fun(expr = expr, envir = envir, substitute = FALSE, lazy = lazy, seed = seed
, globals = globals, workers = workers, gc = gc, earlySignal = earlySignal, label = label, ...) }, function (ex
pr, envir = parent.frame(), substitute = TRUE, lazy = FALSE, seed = NULL, globals = TRUE, local = TRUE, earlySignal = FALSE, l
abel = NULL, ...) { if (substitute) expr <- substitute(expr) local <- as.logical(local)
future <- SequentialFuture(expr = expr, envir = envir, substitute = FALSE, lazy = lazy, seed = seed, globals = global
s, local = local, earlySignal = earlySignal, label = label, ...) if (!future$lazy) future <- run(future)
invisible(future) }), .cleanup = FALSE, .init = FALSE) } options(...future.oldOptions) }) if (is.na(TRUE)) { }
else { sink(type = "output", split = FALSE) if (TRUE) { ...future.result$stdout <- rawToChar(rawConnectionValue(...future.stdout))
} else { ...future.result["stdout"] <- list(NULL) } close(...future.stdout) ...future.stdout <- NULL } ..
.future.result$conditions <- ...future.conditions ...future.result})
30: do.call(parallel::mcparallel, args = future.args, envir = envir)
31: run.MulticoreFuture(future)
32: run(future)
33: fun(expr = expr, envir = envir, substitute = FALSE, lazy = lazy, seed = seed, globals = globals, workers = workers, gc = gc, earlySignal = earlySi
gnal, label = label, ...)
34: makeFuture(...)
35: .makeFuture(expr, substitute = FALSE, envir = envir, globals = globals, packages = packages, seed = seed, lazy = lazy, ...)
36: future({ ...future.f.env <- environment(...future.f) if (!is.null(...future.f.env$`~`)) { if (is_bad_rlang_tilde(...future.f.env$`~`)) {
...future.f.env$`~` <- base::`~` } } if (.progress) { temp_file_con <- file(temp_file, "a") on.exit(close(temp_file_con))
} ...future.map(seq_along(...future.x_ii), .f = function(jj) { ...future.x_jj <- ...future.x_ii[[jj]] .out <- ...future.f(...future.x_jj, ..
.) if (.progress) update_progress(temp_file_con) .out })}, envir = envir, lazy = .options$lazy, globals = globals_ii, package
s = packages)
37: future_map_template(purrr::map, "list", .x, .f, ..., .progress = .progress, .options = .options)
38: furrr::future_map(dtas_lst, run_xgb, .progress = TRUE)
An irrecoverable exception occurred. R is aborting now ...
Error: Failed to retrieve the result of MulticoreFuture (<none>) from the forked worker (on localhost; PID 17120). Post-mortem diagnostic: No process exists w
ith this PID, i.e. the forked localhost worker is no longer alive.
In addition: Warning message:
In mccollect(jobs = jobs, wait = TRUE) :
1 parallel job did not deliver a result
Mac session info
> sessionInfo()
R version 3.6.1 (2019-07-05)
Platform: x86_64-apple-darwin18.6.0 (64-bit)
Running under: macOS Mojave 10.14.6
Matrix products: default
BLAS/LAPACK: /usr/local/Cellar/openblas/0.3.7/lib/libopenblasp-r0.3.7.dylib
locale:
[1] en_US.UTF-8/en_US.UTF-8/en_US.UTF-8/C/en_US.UTF-8/en_US.UTF-8
attached base packages:
[1] stats graphics grDevices utils datasets methods base
other attached packages:
[1] purrr_0.3.2.9000 mlr3learners_0.1.4 mlr3_0.1.4 nvimcom_0.9-83 setwidth_1.0-4 devtools_2.0.1 usethis_1.4.0
[8] magrittr_1.5.0.9000
loaded via a namespace (and not attached):
[1] progress_1.2.2.9000 tidyselect_0.2.5 remotes_2.0.2 listenv_0.7.0 lattice_0.20-38 vctrs_0.2.0.9002
[7] testthat_2.0.1 paradox_0.1.0 rlang_0.4.0.9002 pkgbuild_1.0.3 R.oo_1.22.0 pillar_1.4.2.9001
[13] glue_1.3.1.9000 withr_2.1.2.9000 R.utils_2.8.0 xgboost_0.90.0.2 sessioninfo_1.1.1 uuid_0.1-2
[19] R.methodsS3_1.7.1-9000 future_1.15.0 codetools_0.2-15 memoise_1.1.0 callr_3.3.1.9000 ps_1.3.0
[25] parallel_3.6.1 fansi_0.4.0 furrr_0.1.0 Rcpp_1.0.2 backports_1.1.3 checkmate_1.9.4
[31] desc_1.2.0 pkgload_1.0.2 fs_1.3.1 hms_0.4.2 digest_0.6.20 stringi_1.4.3
[37] processx_3.4.1.9000 dplyr_0.8.3.9000 rprojroot_1.3-2 grid_3.6.1 cli_1.9.9.9000 tools_3.6.1
[43] tibble_2.99.99.9005 mlr3misc_0.1.5 crayon_1.3.4 pkgconfig_2.0.2 zeallot_0.1.0 Matrix_1.2-17
[49] data.table_1.12.3 prettyunits_1.0.2 assertthat_0.2.1 lgr_0.3.3 R6_2.4.0 globals_0.12.4
[55] compiler_3.6.1
Hi. I haven't had time to reproduce/troubleshoot your original problem, but I'm suspect you're hitting issues because forked processes are in place (=multiprocess
-> multicore
) and some of the packages are not fork-proof. On top of that you might hit issues because there's multi-threading going on to that is then forked (e.g. xgboost). I don't have a reference of hand, but I think some of the R code folks (Tomas Kalibera?) have strongly adviced against using forking and multi-threading at the same time.
I would switch to using multisession
. That will most likely reveal that there's code in there that can not be parallelized.
Just to clarify, this is not specific to the future framework. So, if the fork+threading is the cause of your problem, then there is no solution to it. Instead, you need to turn to workarounds, such as launching xgboost on each worker independently of the others.
Oh... and I forgot to say, set
options(future.globals.onReference = "error")
that will help identify cases where there're non-exportable objects in the parallelized code, cf. https://cran.r-project.org/web/packages/future/vignettes/future-4-non-exportable-objects.html
Thank you so much for the feedback @HenrikBengtsson
This is a well-known factor for you then. Should I add xgboost
as mention in your docs?
I wouldn't call it a well-known factor for me. I've only heard anecdotal from various savvy R users, but I don't have a solid example yet (at least not one I recall).
I do not know for sure that xgboost
is to blame here, so I don't really want to add it to the docs without further proof. It would be great to get a minimal reproducible (as far as possible) example showing how it can happen. If one then can force xgboost
to run in single-threaded mode, and it does not fail then, then we have a likely candidate for this and for documenting it. To force single-threaded processing, it could be that some of the comments in https://github.com/HenrikBengtsson/future/issues/255 are helpful.
A lower hanging fruit might be:
options(future.globals.onReference = "error")
If that's the culprit and one can find a reasonable argument for that being the problem, then it could be added to the list of common examples. (Some onReference errors are false positives because the underlying package was designed to handle lost external pointers).
I can reproduce the "stall" on R 3.6.1 on Linux (Ubuntu 18.04):
library(magrittr)
library(mlr3)
library(mlr3learners)
run_xgb <- function(dta, target_column = "Sepal.Length") {
ttsk <- mlr3::TaskRegr$new("ttsk", backend = dta, target = target_column)
learner <-
mlr3::lrn(
"regr.xgboost",
objective = "reg:linear",
eval_metric = "rmse",
nrounds = 100,
verbose = 0
)
learner$train(ttsk)
learner$predict(ttsk)$response
}
dtas_lst <-
list(
dta1 = iris %>% dplyr::select(-Species) %>% utils::head(75),
dta2 = iris %>% dplyr::select(-Species) %>% utils::tail(75)
)
future::plan("multicore", workers = 2L)
## assert all globals can be exported
options(future.globals.onReference = "error") ## just in case
y <- furrr::future_map(dtas_lst, run_xgb, .progress = TRUE)
str(y)
Disabling multi-threading for xgboost using:
RhpcBLASctl::omp_set_num_threads(1L)
seems to fix the problem.
However, for me it also works to set something less that the default eight (8) max threads. Even:
RhpcBLASctl::omp_set_num_threads(7L)
works for me. That confuses me.
There's a related xgboost issue over https://github.com/topepo/caret/issues/1106#issuecomment-570308981.
I am having issues with nested
future
calls.Here I'm using
mlr3
andfurrr
, both usingfuture
package. When using them both together I'm having issues to have the parallelized calls to terminate.I've had cases where the parallel runs do work once during a session, but then not terminating on a rerun.
Sometimes when closing down the session (after manually terminating the parallel run), I get the below message
Upon googling this, I found that I could do the following check.
Does anyone have any solutions to this? Is this perhaps a bug? Related to
future
or perhapsparallel
or other package? I was having the same issue when I was running themlr
package.