futureverse / future.apply

:rocket: R package: future.apply - Apply Function to Elements in Parallel using Futures
https://future.apply.futureverse.org
211 stars 16 forks source link

Non-blocking, local evaluation of future_lapply()? #44

Open danschrage opened 5 years ago

danschrage commented 5 years ago

Is future_lapply() intended to be non-blocking? I thought it was, but it's not working for me.

Here is a minimal reproducible example (with a simple function that serves no purpose except to take ~10 seconds to evaluate on my machine so I could test blocking and multicore behavior). I'm running Linux, so my understanding is that multiprocess here implies multicore. (Output shown as comments.)

# future_lapply() blocks, even in multiprocess. You can see that resolved()
# does not get evaluated until future_lapply() has finished.
# But it successfully distributes this across 2 cores.
plan(multiprocess)
date()
### [1] "Fri May 24 14:29:51 2019"
a <- future_lapply(rep(50000000, 2), function(i) rnorm(i)*rnorm(i))
resolved(futureOf(a))
### Error: Future (‘a’) not found in environment ‘R_GlobalEnv’: ‘a’
date()
### [1] "Fri May 24 14:30:03 2019"
head(a[[1]])
### [1] -1.2233054  0.1918043 -0.4650852  0.5335259 -0.2493615 -0.8267408
date()
### [1] "Fri May 24 14:30:03 2019"

Note that I get an error when trying to call resolved(futureOf(a)), because a has already been resolved before it gets called, and no future exists because it was implicit. The calls to date() are in there to show that it blocked for 12 seconds while it was evaluating future_lapply().

Based on your response in #1, I tried assigning future_lapply as an implicit future and using nested multiprocess evaluation (though that was intended for someone running SGE). This also blocks, and now future_lapply() is evaluated sequentially, not on multiple cores. I watched process allocation happening, but you can see that it now blocks for twice as long: 24 seconds.

# Try nesting it in an implicit future call. Still blocks. But now this gets
# evaluated sequentially rather than distributed across 2 cores.
plan(list(multiprocess, multiprocess))
date()
### "Fri May 24 14:37:15 2019"
a %<-% future_lapply(rep(50000000, 2), function(i) rnorm(i)*rnorm(i))
resolved(futureOf(a))
### [1] FALSE
date()
### [1] "Fri May 24 14:37:15 2019"
head(a[[1]])
### [1] -0.9747142 -0.1586670 -0.1039924  4.5885303 -0.4779900  0.3339059
date()
### [1] "Fri May 24 14:37:39 2019"

I ran into this issue because I'm trying to switch from mclapply to future_lapply (for the great parallel RNG!), and I do get non-blocking behavior using an implicit future with mclapply (resolved() and date() are both executed immediately after the mclapply call without blocking):

# This works as expected: Setting mc.cores explicitly does distribute across
# multiple cores, and it's non-blocking.
library(parallel)
plan(multiprocess)
date()
### [1] "Fri May 24 14:51:31 2019"
a %<-% mclapply(rep(50000000, 2), function(i) rnorm(i)*rnorm(i), mc.cores=2)
resolved(futureOf(a))
### [1] FALSE
date()
### [1] "Fri May 24 14:51:31 2019"
head(a[[1]])
### [1]  0.968440961 -0.015869658  0.321415096 -0.609809739  0.005155251
date()
### [1] "Fri May 24 14:51:44 2019"

Incidentally, if I replace the call to explicitly set mc.cores=2 with mc.cores=future::availableCores(), I still get non-blocking behavior, but now mclapply gets executed sequentially instead of being distributed across cores. (If I run mc.cores=future::availableCores() I get 16.) I'm not sure if this is a bug, and I didn't explore it thoroughly, but it's not what I expected.

Thanks so much for your help and for all your work to bring R into the future!

HenrikBengtsson commented 5 years ago

Is future_lapply() intended to be non-blocking?

By design it mimics lapply() as far as ever possible, including blocking.

See also the Roadmap section in the package vignette or the README here on GitHub.

HenrikBengtsson commented 5 years ago

mclapply() is also blocking by design

danschrage commented 5 years ago

By design it mimics lapply() as far as ever possible, including blocking. See also the Roadmap section in the package vignette or the README here on GitHub.

Thanks for the response! I did read those things, and I think I'm misunderstanding, so perhaps it's worth clarifying in the documentation for others. I read the README and saw that it's meant to "replace any of these [lapply, etc.] in the core with its futurized equivalent." I combine that with the opening of the README in the future package, which says, "The purpose of the future package is to provide a very simple and uniform way of evaluating R expressions asynchronously using various resources available to the user." So I thought the futurized equivalent to lapply would be asynchronous, and in this context I thought that meant it would be non-blocking.

mclapply() is also blocking by design

I do expect mclapply() to block, and that's why I invoke it as an implicit future, which makes it non-blocking. This meets my basic use case, but I'd like to take advantage of the reproducibility of the parallel RNG offered by future_lapply. Is there a way to do this with the existing `future_lapply', or is this something that falls under category 3 in the roadmap?

Again, thanks for your help and all your work on this! I've been using futures primarily to get asynchronous evaluation, but maybe I've been missing the point (the parallel package works well enough for my parallel-processing needs, so maybe I'm not the target audience).

danschrage commented 5 years ago

I guess this is ultimately a feature request, since it's not the intended behavior of future_lapply(). I'll leave this issue open, but if you don't think it makes sense to add to the long-term roadmap, you're welcome to close it as a sort of "won't fix." Thanks again!

jfbourdon commented 3 years ago

Before I used future_lapply() , me too I was sure that being a future function it would be non-blocking. At first I was disapointed but using nested futures work when tweaking the planned strategies a bit:

library(future.apply)
library(future)

max_cores <- availableCores()
plan(list(tweak(multisession, workers = 2),
          tweak(multisession, workers = max_cores)))

a <- future({
  future_lapply(1:max_cores, function(i) Sys.getpid())
})

while(!resolved(a)) {
  message("Waiting...")
  Sys.sleep(1)
}

plan(sequential)
message(length(unique(unlist(value(a)))),
        " unique PIDs used for future_lapply() for ",
        max_cores,
        " available cores")
HenrikBengtsson commented 3 years ago

Correct, there's a built-in protection against nested parallelism to avoid wreaking havoc on machines. This is explained in this vignette: https://future.futureverse.org/articles/future-3-topologies.html

jfbourdon commented 3 years ago

Thanks, the reason is well explained in the vignette. I guess the use case described in this issue is an exception, because the first two threads spawned are using almost no ressources. I ended here looking to a way to do the same thing as parallel:parLapply() but without blocking. I was about to fallback to using system2 on a dedicate Rscript but even though it would have worked, it would have been more hassle.

HenrikBengtsson commented 3 years ago

Yeah, non-blocking map-reduce functions will probably be implemented in separate package when that happens. This package tries to mimic the behavior base R apply functions as far as ever possible. That way the API is much more clear and easier to understand and predict.