Closed schloerke closed 3 years ago
Would be helpful to offload some future
worker logic if future
had a all workers are busy method. See https://github.com/HenrikBengtsson/future/issues/264
There exists a time between scheduling a future and evaluating the future.
What if the globals contain an environment and the environment is changed during while the future object waits to execute?
... What happens in the third promise?
library(future)
library(promises)
future::plan(future::multisession(workers = 2))
source("future_promise.r") # local exploration file
env <- new.env()
env$a <- 1
fps <- lapply(1:3, function(i) {
future_promise({
Sys.sleep(10);
env$a
})
})
env$a <- 2
Map(
fps,
seq_along(fps),
f = function(fp, i) {
print(i);
fps[[i]] %...>%
{
print(list(i = i, val = .))
}
}
)
Currently... 2
. Which is incorrect behavior.
[1] 1
[1] 2
[1] 3
[[1]]
<Promise [pending]>
[[2]]
<Promise [pending]>
[[3]]
<Promise [pending]>
>
$i
[1] 1
$val
[1] 1
$i
[1] 2
$val
[1] 1
$i
[1] 3
$val
[1] 2
cc @dipterix
I was under the impression that future::getGlobalsAndPackages(expr)
returned unique information. It does not. It only returns environment pointers. This information is usually safe as the main session is blocked and then immediately serializes when sending to the future process. However, with promises, the environment values can be altered before the future is executed.
What's returned from future::getGlobalsAndPackages()
..
val <- 1
info <- future::getGlobalsAndPackages(
substitute(env$a + val),
envir = parent.frame()
)
> info
#> $expr
#> env$a + val
#>
#> $globals
#> $env
#> <environment: 0x7fb2e5e5eda8>
#>
#> $val
#> [1] 1
#>
#> attr(,"where")
#> attr(,"where")$env
#> <environment: R_GlobalEnv>
#>
#> attr(,"where")$val
#> <environment: R_GlobalEnv>
#>
#> attr(,"class")
#> [1] "FutureGlobals" "Globals" "list"
#> attr(,"resolved")
#> [1] FALSE
#> attr(,"total_size")
#> [1] 112
#>
#> $packages
#> character(0)
We could try to capture/duplicate the full environment information (rlang::env_clone()
)... but knowing where to stop would be the challenge. (ex: circular environment references)
Calling future::resolve(info$globals)
does not provide serialized information:
env
<environment: 0x7fb2e5e5eda8>
resolve(info$globals)
$env
<environment: 0x7fb2e5e5eda8>
$val
[1] 1
attr(,"where")
attr(,"where")$env
<environment: R_GlobalEnv>
attr(,"where")$val
<environment: R_GlobalEnv>
attr(,"class")
[1] "FutureGlobals" "Globals" "list"
attr(,"resolved")
[1] TRUE
attr(,"total_size")
[1] NA
Might have to go the recommendation that is usually used to address isolating information when using just promises
/later
... use force()
and save it to a scoped variable. Obviously not optimal, but it works and only the necessary work is done (by the expr author).
env <- new.env()
env$a <- 1
fps <- lapply(1:3, function(i) {
val <- force(env$a)
future_promise({
Sys.sleep(10);
val
})
})
env$a <- 2
Map(
fps,
seq_along(fps),
f = function(fp, i) {
print(i);
fps[[i]] %...>%
{
print(list(i = i, val = .))
}
}
)
[1] 1
[1] 2
[1] 3
[[1]]
<Promise [pending]>
[[2]]
<Promise [pending]>
[[3]]
<Promise [pending]>
$i
[1] 1
$val
[1] 1
$i
[1] 2
$val
[1] 1
$i
[1] 3
$val
[1] 1
I guess what you can do is to implement a new future class that wraps regular future expressions. The new future class always leave one session open so future:::requestCore
or future:::requestNode
won't block.
See the blocking code:
You only need to let used < total
. What you can do is to check in your run.xxx
...
futures <- FutureRegistry(reg, action = "list", earlySignal = FALSE)
if(length(futures[!resolved(futures)]) >= workers) {
# transfer globals first
# queue the future
}
...
Before taking future to the queue, we immediately transfer the globals to the idle node so the data on future sessions are "as-is", then only the evaluation is deferred.
Uh, It's complicated...
PS: I'm doing something different here, but it shares something in common. Hope it could help.
So host the queue in a single, non-main R process? This would solve the change-able values problem but would probably dramatically increase "between process communication".
Uh, It's complicated...
😞 yes. To gain one feature, it will cost another.
After a sleep, I think the proposed solution is no worse than what happens with promises
in general. So it is still something to pursue.
Example showing existing "unexpected" behavior.
library(promises)
env <- new.env()
env$a <- 1
promise_resolve(TRUE) %...>%
{ Sys.sleep(1); env$a } %...>%
{ print(.) }
env$a <- 2
print("done")
#> [1] "done"
>
#> [1] 2
When all future workers are busy, the main process is blocked until the worker queue is underutilized. In the case of
plumber
andshiny
, this will block the main process from functioning.In
plumber
's case (and if many future-based routes are hit excessively)future
routes are blocked from being startedfuture
routes can not be startedfuture
routes that have been started can not respond ... until the blocking queue is no longer blocking.Proposal:
future
objects could be stored in a promisepromises::promise()
object is always returnedApproach:
future_promise()
globals
andpackages
information from theexpr
, usingfuture::getGlobalsAndPackages()
[aka _future_promise()
info_]future_promise()
info_ to a FIFO queue.future_promise <- future_promise_gen()
future_promise()
info usingfuture::future()
future::future()
value completes,resolve()
the returned promise object using this value.Assume routes are submitted matching the number in the route.
/slow
routes take ~10s to compute./fast
routes take ~0s to compute.Using
future::future(expr)
withworkers = 2
. Total wait time for/fast/7
: ~20s. Total wait time: ~100sUsing a proposed
promises::future_promise(expr)
with futureworkers = 2
. Total wait time for/fast/7
: ~0s. Total wait time: ~60s (expected / minimal wait time)