HenrikBengtsson / future

:rocket: R package: future: Unified Parallel and Distributed Processing in R for Everyone
https://future.futureverse.org
951 stars 83 forks source link

Use withTimeout when resolving remote future may corrupt its state #266

Open renkun-ken opened 5 years ago

renkun-ken commented 5 years ago

I need to execute some machine learning algorithms on a remote GPU server. It is likely that some heavy training is going and all CPU and GPU resources are occupied. In this case, starting a remote process and communicating with it are both likely to hang. Therefore I need an external timeout (rather than an remote call on withTimeout as suggested in #169) to control the time it takes and if timeout occurs other measures will be taken.

library(future)

v <- R.utils::withTimeout({
  p <- remote({
    Sys.sleep(10)
    1
  }, workers = "<remote-ip>", user = "<remote-user>", persistent = FALSE, earlySignal = TRUE)

  value(p)
}, timeout = 1)

However, the behavior can be quite random as I run it repeatedly. In some cases, it works as expected, but sometimes it directly returns 1 without a sleep, and in other cases, it ends up in the following error:

Error: Unexpected result (of class ‘NULL’ != ‘FutureResult’) retrieved for ClusterFuture future (label = ‘<none>’, expression = ‘{; Sys.sleep(10); 1; }’): 

which implies that the internal state of the future seems corrupted somehow.

HenrikBengtsson commented 5 years ago

My first reaction reading this is that: instead of playing wild wild west on that machine, you would really benefit from running an HPC scheduler (e.g. Slurm or SGE) even if there's only a single user on it. Schedulers are designed to do workload balancing and avoid overloading machines. It takes some efforts to install a scheduler but it's certainly not impossible and definitely worth the investment.

To your question:

However, the behavior can be quite random as I run it repeatedly. In some cases, it works as expected, but sometimes it directly returns 1 without a sleep, and in other cases, it ends up in the following error:

Error: Unexpected result (of class ‘NULL’ != ‘FutureResult’) retrieved for ClusterFuture future (label = ‘<none>’, expression = ‘{; Sys.sleep(10); 1; }’): 

It looks like you're running into issues with R's connections becoming corrupt, cf. Issue #261 and https://github.com/HenrikBengtsson/Wishlist-for-R/issues/81. If you instead of returning constant 1 would return a unique value for each future, I think you'll see that you actually get a "1" from a previous future rather than from the one you expect.

Retry with the develop version:

remotes::install_github("HenrikBengtsson/future@develop")

It should detect corrupt R connections and give a much more informative error message what goes on.

Let me know if this is the case.

HenrikBengtsson commented 5 years ago

After troubleshooting a somewhat related StackOverflow question, I think I now have a better answer to what's going on.

What I think is key to your problem is that:

launching of a future on a cluster/remote worker is not atomic

When a cluster/remote future is launched, there are several steps of back-and-forth communication with the worker. If you interrupt R, either manually (Ctrl-C) or via R.utils::withTimeout(), while this communication is going on, then the communication is likely to become come out of sync. I've updated the develop version of future to give a bit more informative error messages when this happens:

Error: Unexpected result (of class ‘character’ != ‘FutureResult’) retrieved for 
  ClusterFuture future (label = ‘future-plan-test’, expression = ‘NA’): future-grmall.
  This suggests that the communication with ClusterFuture worker (‘SOCKnode’ #1) is
  out of sync.

I recommend that you install the develop version of the future package to get that more informative error message to rule out other things:

remotes::install_github("HenrikBengtsson/future@develop")

Note, you would only get this kind of error if you call your code more than once, e.g.

library(future)

R.utils::withTimeout({
  f1 <- remote({
   Sys.sleep(10); 1
  }, workers = "<remote-ip>", user = "<remote-user>", persistent = FALSE, earlySignal = TRUE)
  v1 <- value(p1)
}, timeout = 1.0)

R.utils::withTimeout({
  f2 <- remote({
   Sys.sleep(10); 2 
  }, workers = "<remote-ip>", user = "<remote-user>", persistent = FALSE, earlySignal = TRUE)
  v2 <- value(p1)
}, timeout = 1.0)

As I mentioned in my previous comment, I'm pretty sure what happens is that if you interrupt the first future f1 while it's communicating with the remote worker, we first know that v1 will never be created. Then when you move on to create f2 on the same remote worker, the communication between f2 and the remote workers will be tangled by queued messages between f1 and that same worker. The launching of the f2 will throw an error (like the one above) when if receives an unexpected response from the worker.

renkun-ken commented 5 years ago

Thanks for the informative update.

First try:

[2019-01-09 10:57:51] TimeoutException: Failed to retrieve the value of ClusterFuture (<none>) from cluster SOCKnode #1 (PID 10330 on ‘***.***.***.***’). The reason reported was ‘reached elapsed time limit’ [cpu=1s, elapsed=1s]

  at #08. Exception(...)
          - Exception() is in environment 'R.oo'

  at #07. extend(Exception(...), "TimeoutException", cpu = cpu, elapsed = elapsed)
          - extend() is in environment 'R.oo'

  at #06. TimeoutException(msg, cpu = cpu, elapsed = elapsed)
          - TimeoutException() is in environment 'R.utils'

  at #05. value[[3L]](cond)
          - value[[3L]]() is local of the calling function

  at #04. tryCatchOne(expr, names, parentenv, handlers[[1L]])
          - tryCatchOne() is local of the calling function

  at #03. tryCatchList(expr, classes, parentenv, handlers)
          - tryCatchList() is local of the calling function

  at #02. tryCatch({
              eval(expr, envir = envir)
          }, error = function(ex) {
              msg <- ex$message
              pattern <- gettext("reached elapsed time limit", "reached CPU time limit", 
                  domain = "R")
              pattern <- paste(pattern, collapse = "|")
              if (regexpr(pattern, msg) != -1L) {
                  ex <- TimeoutException(msg, cpu = cpu, elapsed = elapsed)
                  if (onTimeout == "error") {
                      throw(ex)
                  }
                  else if (onTimeout == "warning") {
                      warning(getMessage(ex))
                  }
                  else if (onTimeout == "silent") {
                  }
              }
              else {
                  throw(ex)
              }
          })
          - tryCatch() is in environment 'base'

  at #01. R.utils::withTimeout({
              p <- remote({
                  Sys.sleep(10)
                  1
              }, workers = "***.***.***.***", user = "ken", persistent = FALSE, earlySignal = TRUE)
              value(p)
          }, timeout = 1)
          - R.utils::withTimeout() is in environment 'R.utils'

Error: Failed to retrieve the value of ClusterFuture (<none>) from cluster SOCKnode #1 (PID 10330 on ‘***.***.***.***’). The reason reported was ‘reached elapsed time limit’ [cpu=1s, elapsed=1s]

Then try again:

List of 2
 $ node_idx: int 1
 $ node    :List of 5
  ..$ con         : 'sockconn' int 3
  .. ..- attr(*, "conn_id")=<externalptr> 
  ..$ host        : chr "***.***.***.***"
  .. ..- attr(*, "localhost")= logi FALSE
  ..$ rank        : int 1
  ..$ logfile     : NULL
  ..$ session_info:List of 3
  .. ..$ r      :List of 15
  .. .. ..$ platform      : chr "x86_64-pc-linux-gnu"
  .. .. ..$ arch          : chr "x86_64"
  .. .. ..$ os            : chr "linux-gnu"
  .. .. ..$ system        : chr "x86_64, linux-gnu"
  .. .. ..$ status        : chr ""
  .. .. ..$ major         : chr "3"
  .. .. ..$ minor         : chr "5.2"
  .. .. ..$ year          : chr "2018"
  .. .. ..$ month         : chr "12"
  .. .. ..$ day           : chr "20"
  .. .. ..$ svn rev       : chr "75870"
  .. .. ..$ language      : chr "R"
  .. .. ..$ version.string: chr "R version 3.5.2 (2018-12-20)"
  .. .. ..$ nickname      : chr "Eggshell Igloo"
  .. .. ..$ os.type       : chr "unix"
  .. ..$ system :List of 8
  .. .. ..$ sysname       : chr "Linux"
  .. .. ..$ release       : chr "4.15.0-43-generic"
  .. .. ..$ version       : chr "#46~16.04.1-Ubuntu SMP Fri Dec 7 13:31:08 UTC 2018"
  .. .. ..$ nodename      : chr "***.***.***.***"
  .. .. ..$ machine       : chr "x86_64"
  .. .. ..$ login         : chr "ken"
  .. .. ..$ user          : chr "ken"
  .. .. ..$ effective_user: chr "ken"
  .. ..$ process:List of 1
  .. .. ..$ pid: int 10330
  ..- attr(*, "class")= chr "SOCKnode"
Error: Unexpected result (of class ‘character’ != ‘FutureResult’) retrieved for ClusterFuture future (label = ‘<none>’, expression = ‘{; Sys.sleep(10); 1; }’): future-grmall. This suggests that the communication with ClusterFuture worker (‘SOCKnode’ #1) is out of sync.
HenrikBengtsson commented 5 years ago

Yup, that

Error: Unexpected result (of class ‘character’ != ‘FutureResult’) retrieved for
ClusterFuture future (label = ‘<none>’, expression = ‘{; Sys.sleep(10); 1; }’): 
future-grmall. This suggests that the communication with ClusterFuture worker
(‘SOCKnode’ #1) is out of sync.

confirms that the communication with the worker is out of sync and broken until the worker is restart, i.e. a new worker is created.

I don't know an obvious solution/workaround, but you need to figure out how to kill that remote workers first. If you setup the worker "manually" first by:

cl <- future::makeClusterPSOCK(remote_hostname, user = remote_user)

and then use remote(..., workers = cl) you will have a better handle on the remote cluster worker. More precisely, you actually have info on the process ID of the remote R workers, e.g.

> remote_pid <- cl[[1]]$session_info$process$pid
> remote_pid
[1] 30575

(Note, although I have no plans on changing this, please note that this is a non-official feature for now.)

Then you can do something like:

system2("ssh", args =c("-l", remote_user, remote_hostname, "kill", "-9", remote_pid))

to kill the remote process so you can create a new one on the same host again (as above).

While thinking about it, you can actually kill the existing process at the same time as you the new worker by doing:

> kill_cmd <- sprintf("tools::pskill(%d)", cl[[1]]$session_info$process$pid))
> kill_cmd  ## [1] "tools::pskill(31452)"
> cl <- future::makeClusterPSOCK(remote_hostname, rscript_args = c("-e", shQuote(kill_cmd)))

FYI, methods for terminating future (local or remote) is a frequently requested feature, e.g. Issue #93. The above approaches are a bit of a hack.