HenrikBengtsson / future

:rocket: R package: future: Unified Parallel and Distributed Processing in R for Everyone
956 stars 83 forks source link

More efficient chunking #512

Open arunsrinivasan opened 5 years ago

arunsrinivasan commented 5 years ago
nodes <- 5L
cl <- future::makeClusterPSOCK(nodes)
plan(cluster, workers=cl, persistent=TRUE)

foo <- function(i) {
  if (i %in% 5:8) Sys.sleep(3L)

x <- 1:20
system.time(ans <- future_lapply(x, function(i) {
  msg <- sprintf("[%s] [%5.0f] ans=%2.0f", as.character(Sys.time()), Sys.getpid(), i)
  cat(msg, sep="\n")
  ans <- foo(i)
# [2019-04-18 13:17:08] [17620] ans= 1
# [2019-04-18 13:17:08] [17620] ans= 2
# [2019-04-18 13:17:08] [17620] ans= 3
# [2019-04-18 13:17:08] [17620] ans= 4
# [2019-04-18 13:17:08] [16016] ans= 5 # <~~ predefined chunk node = wait until previous one completes
# [2019-04-18 13:17:11] [16016] ans= 6 # <~~ predefined chunk node = wait until previous one completes
# [2019-04-18 13:17:14] [16016] ans= 7 # <~~ predefined chunk node = wait until previous one completes
# [2019-04-18 13:17:17] [16016] ans= 8 # <~~ predefined chunk node = wait until previous one completes
# [2019-04-18 13:17:08] [ 3992] ans= 9
# [2019-04-18 13:17:08] [ 3992] ans=10
# [2019-04-18 13:17:08] [ 3992] ans=11
# [2019-04-18 13:17:08] [ 3992] ans=12
# [2019-04-18 13:17:08] [ 3836] ans=13
# [2019-04-18 13:17:08] [ 3836] ans=14
# [2019-04-18 13:17:08] [ 3836] ans=15
# [2019-04-18 13:17:08] [ 3836] ans=16
# [2019-04-18 13:17:08] [ 8856] ans=17
# [2019-04-18 13:17:08] [ 8856] ans=18
# [2019-04-18 13:17:08] [ 8856] ans=19
# [2019-04-18 13:17:08] [ 8856] ans=20
#    user  system elapsed 
#    0.03    0.02   12.14  # <~~ 4x more time

system.time(ans <- foreach(i=x) %dopar% {
  msg <- sprintf("[%s] [%5.0f] ans=%2.0f", as.character(Sys.time()), Sys.getpid(), i)
  foo_i <- foo(i)
# [2019-04-18 13:17:20] [17620] ans= 1
# [2019-04-18 13:17:20] [16016] ans= 2
# [2019-04-18 13:17:20] [ 3992] ans= 3
# [2019-04-18 13:17:20] [ 3836] ans= 4
# [2019-04-18 13:17:20] [ 8856] ans= 5 # <~~ runs on next available free node
# [2019-04-18 13:17:20] [17620] ans= 6 # <~~
# [2019-04-18 13:17:20] [16016] ans= 7 # <~~
# [2019-04-18 13:17:20] [ 3992] ans= 8 # <~~
# [2019-04-18 13:17:20] [ 3836] ans= 9
# [2019-04-18 13:17:20] [ 3836] ans=10
# [2019-04-18 13:17:20] [ 3836] ans=11
# [2019-04-18 13:17:20] [ 3836] ans=12
# [2019-04-18 13:17:20] [ 3836] ans=13
# [2019-04-18 13:17:20] [ 3836] ans=14
# [2019-04-18 13:17:20] [ 3836] ans=15
# [2019-04-18 13:17:20] [ 3836] ans=16
# [2019-04-18 13:17:20] [ 3836] ans=17
# [2019-04-18 13:17:20] [ 3836] ans=18
# [2019-04-18 13:17:20] [ 3836] ans=19
# [2019-04-18 13:17:20] [ 3836] ans=20
#    user  system elapsed 
#    0.01    0.00    3.07  # <~~ results in 4 times lesser runtime

The point is, even if things are random (I understand there's ordering="random"), there can be chunks that get stuck due to a big job, when other nodes are potentially free. I think it's much more efficient to look for free nodes and assign jobs on the fly than to determine chunk sizes / entries upfront.

What do you think?

HenrikBengtsson commented 5 years ago

Did you forget a registerDoFuture()? At its inner core, the design of cluster futures is to use first available (=free) worker.

HenrikBengtsson commented 5 years ago

Scratch that. My brain is tired. I was just over at the doFuture repository and my brain stayed there. Anyway, the part:

At its inner core, the design of cluster futures is to use first available (=free) worker.

is true. So, if it's true what you're claiming, then there's a bug. I will add it to the bug to go into the details on this and try to reproduce.

arunsrinivasan commented 5 years ago

At this point, the chunk ii is entirely run on the same node. Since all the entries within the chunk never gets to see if other nodes are free. You can take a look at the pid value to see that all chunks with 3s sleep runs on the same node. This is because it gets assigned to the same chunk.

HenrikBengtsson commented 5 years ago

Ah... now I think I understand what you're getting at (disclaimer: I'm busy with other things so haven't had time to focus on this). The default chunking strategy is to split up the elements to processed in equal-sized chunks upfront where the number of chunks corresponds to the number of workers (=nbrOfWorkers()). This will cause each worker to process one and only one chunk (=future).

You can tell it to use smaller chunks such that each worker processes more than one chunk (=future). The easiest is to set argument future.scheduling to a number larger than the default 1.0. If you use +Inf, you'll get one element per chunk (=future). A bijective argument is future.chunk.size. Do those do what you want?

arunsrinivasan commented 5 years ago

The problem is that as you increase future.scheduling value, the overhead seems to be increasing by huge amounts as well. In the same example as above, having x <- 1:100 instead of x <- 1:20 and adding future.scheduling=Inf takes:

nodes <- 5L
cl <- future::makeClusterPSOCK(nodes)
plan(cluster, workers=cl, persistent=TRUE)

foo <- function(i) {
  if (i %in% 5:8) Sys.sleep(3L)

x <- 1:100

system.time(ans <- future_lapply(x, function(i) {
  msg <- sprintf("[%s] [%5.0f] ans=%2.0f", as.character(Sys.time()), Sys.getpid(), i)
  cat(msg, sep="\n")
  ans <- foo(i)
}, future.scheduling=Inf))
# Takes 21 seconds

And changing future.scheduling=3.0 takes 9.14 seconds. foreach takes 3.08s still.

Let me put it this way. With x <- 1:100, how can I run future_lapply(...) in a way that it runs in 3.1s, assuming I do not know beforehand that i=5:8 will have a 3s sleep?

HenrikBengtsson commented 5 years ago

Got it. So, now we narrowed down the problem to a performance difference between future.apply::future_lapply() with plan(cluster, workers = cl) and foreach::foreach() with doSNOW::registerDoSNOW(cl) where both are using cl <- future::makeClusterPSOCK(nodes).

I can investigate further because there's no reason there should be a major difference given the same amount of chunking. One way to rule out some differences is to use the same foreach::foreach() call comparing doSNOW::registerDoSNOW(cl) to doFuture::registerDoFuture() with plan(cluster, workers = cl).

arunsrinivasan commented 5 years ago

(I noticed that I didn't have doFuture installed. Just installed CRAN version)

Aha.. good catch. The one with doFuture::registerDoFuture() takes 12s.


nodes <- 5L
cl <- future::makeClusterPSOCK(nodes)

foo <- function(i) {
  if (i %in% 5:8) Sys.sleep(3L)

x <- 1:100
system.time(ans <- foreach(i=x) %dopar% {
  msg <- sprintf("[%s] [%5.0f] ans=%2.0f", as.character(Sys.time()), Sys.getpid(), i)
  foo_i <- foo(i)
#    user  system elapsed 
#    0.05    0.00    3.05 

Restart session. But also checked within the same session and timings are identical.


nodes <- 5L
cl <- future::makeClusterPSOCK(nodes)

foo <- function(i) {
  if (i %in% 5:8) Sys.sleep(3L)

x <- 1:100
plan(cluster, workers=cl, persistent=TRUE)
system.time(ans <- foreach(i=x) %dopar% {
  msg <- sprintf("[%s] [%5.0f] ans=%2.0f", as.character(Sys.time()), Sys.getpid(), i)
  foo_i <- foo(i)
#    user  system elapsed 
#    0.04    0.00   12.15 
HenrikBengtsson commented 5 years ago

Thanks a bunch for doing this. Now I have enough to investigate - the difference is surprising but there might (=probably is) an easy explanation.

PS. The underlying code for future_lapply and doFuture are similar (cut'n'paste-ish history) so this is in one sense what I hoped for.

arunsrinivasan commented 5 years ago

@HenrikBengtsson is there a timeline on when this would be done?

The problem is this: The chunks are pre-assigned as to which node they'll be run.

Imagine there are about 1000 files (corresponding to 1000 dates) to be updated. Using foreach, the next available node was provided to the immediate file to be processed which would mean the files are updated in the order provided. Since you assign the file chunks to be processed for each node prior, it's possible that another node finishes its tasks earlier and thereby we end up with files updating in a random order. And if something breaks, it makes it even harder to tell which files have been updated, since it's not a continuous sequence anymore. With foreach, if date x broke, then we know to update from x until the most recent date..

arunsrinivasan commented 5 years ago

Just setting options(future.wait.interval=0L) reduces the runtime from 21s to 3.8s (approximately the same as that of foreach:


options(future.wait.interval=0L) ## <~~~ newly added line

nodes <- 5L
cl <- future::makeClusterPSOCK(nodes)
plan(cluster, workers=cl, persistent=TRUE)

foo <- function(i) {
  if (i %in% 5:8) Sys.sleep(3L)

x <- 1:100

system.time(ans <- future_lapply(x, function(i) {
  msg <- sprintf("[%s] [%5.0f] ans=%2.0f", as.character(Sys.time()), Sys.getpid(), i)
  cat(msg, sep="\n")
  ans <- foo(i)
}, future.scheduling=Inf))
# 3.8s

I think this more or less solves the issue I reported. Will write back if I encounter issues.

HenrikBengtsson commented 5 years ago

Yes, it clearly seems that the polling wait time (option future.wait.interval) is the culprit. It's quite obvious - sorry for not realizing/mentioning this earlier.

The reason for future.wait.interval not defaulting to zero is that we don't want to induce too frequent polling/querying on other backends, e.g. job schedulers, but also because it induces some extra CPU load. Having said this, this should not be too big of a deal for parallelization on the local machine.

I'll add it to the todo list to be able to have different defaults on future.wait.interval for different types of future backends. Ideally, it should also be possible to control this via the plan() setup, e.g.

plan(multisession, workers = 4L, wait.interval = 0.01)

There are other types of settings that we also want to be backend specific, so this one falls nicely into that plan (no pun intended).

HenrikBengtsson commented 2 years ago

FWIW, the default value for option future.wait.interval was decreased 20 times - from 0.2 to 0.01 seconds - in future 1.25.0.