Open HenrikBengtsson opened 8 years ago
Thanks for the detailed answer!
@HenrikBengtsson Are there any platform specific workarounds for this until it gets implemented? I develop things on Windows, but hosting is done on Linux servers, so I guess even just a way to do this on Linux would be useful. It is as simple as just getting the process pid and killing it?
In windows, if I force close the background processes ("R for Windows front end") then R get's very unhappy - I get an error error writing to connection
when next trying to run a function using future
. (I'm using plan(multiprocess)
if that makes a difference). This makes me think it might be more complicated than killing the pid, but maybe things would be different on Linux.
P.S. I can't actually find how to get the pid from a future in the documentation. You have lots of examples which have Sys.getpid()
calls inside the future, but how do I get access to this pid in the main thread while the future is still running?
@HenrikBengtsson Are there any platform specific workarounds for this until it gets implemented? I develop things on Windows, but hosting is done on Linux servers, so I guess even just a way to do this on Linux would be useful. It is as simple as just getting the process pid and killing it?
Not really, unless you're using fork R processes, i.e. plan(multicore)
- and as written below, it should not be treated as an officially supported approach simply because the Future API does not yet have a spec for this stuff.
In windows, if I force close the background processes ("R for Windows front end") then R get's very unhappy - I get an error error writing to connection when next trying to run a function using future. (I'm using plan(multiprocess) if that makes a difference).
Just to clarify, on Windows multiprocess -> multisession, and on *nix/macOS multiprocess -> multicore. So, to reproduce Windows behavior everywhere, one can use plan(multisession)
.
This makes me think it might be more complicated than killing the pid, but maybe things would be different on Linux.
So, multisession futures uses workers that are part of a local "cluster" which is basically the same as when you set up a cluster using cl <- parallel::makePSOCKcluster(); plan(cluster, workers = cl)
. Each worker is a regular R session running in the background. These workers are created once and then servers multiple futures. In other words, if you kill one of these background R workers, then it won't be available to serve other futures. The error message you get is because the future package assumes the cluster worker is still around. There is currently no exception handling implemented in the future package to deal with this case.
This is in contrast to the plan(multicore)
, which is creating a new fork for each future. Thus, kill a fork doesn't harm future futures. So, it works to kill a multicore worker, but not a multisession worker. However, that is works is just a side effect and is not there by design, so it should not be relied upon right now () one could imagine multicore workers with a longer lifespan just like multisession, but that's a different story.
P.S. I can't actually find how to get the pid from a future in the documentation. You have lots of examples which have Sys.getpid() calls inside the future, but how do I get access to this pid in the main thread while the future is still running?
It's not possible. However, you could achieve it by setting up a cluster manually, e.g.
library("future")
cl <- makeClusterPSOCK(availableCores())
and grab the PIDs with the following hack:
for (kk in seq_along(cl)) {
parallel:::sendCall(cl[[kk]], fun = Sys.getpid, args = list())
pid <- parallel:::recvResult(cl[[kk]])
attr(cl[[kk]]$host, "pid") <- pid
}
That will annotate each worker in 'cl' with also the PID. (I've thought about adding this as an automatic feature that could be added to future::makeClusterPSOCK()
- it would then also serve as an additional validation that the master-worker setup works as expected).
To use this cluster with futures, you do:
plan(cluster, workers = cl)
With this, you can grab the worker information for a particular future like this:
f <- future(Sys.sleep(300))
host <- f$workers[[f$node]]$host
print(host)
## [1] "localhost"
## attr(,"pid")
## [1] 7599
pid <- attr(host, "pid")
That gives you the PID of the process for a particular future. Note however, that that f$workers[[f$node]]
is not part of the official API and might change at any time. So, if you use it, make sure you're aware it's a hack. For instance, it's not unlikely that I'll change this to f$worker
at some point.
Then you can send various system signals (e.g. kill -INT
) to this process using system calls or using one of the various R packages that wrap such features in an R API (don't remember them out of my head).
However, to get this working properly you probably also need to make your future expression / future code interrupt aware using withCallingHandlers()
etc. It'll also unknown to me what happens if you signal too many interrupts in a row - it might be that you manage to interrupt the main R-loop of the worker, which then will cause the R worker to terminate. That'll result in a missing R worker and you've got that problem you mention at the beginning.
As you see from the above, there are lots of things that needs to be put in place in order to be able to interrupt a future - and the above is just when you run on your local machine. To do this remotely is even harder. Having said this, it should be possible to add bits and pieces to the future that moves this in the right directly.
Forgot to say, if possible, you could of course always write the future code to once in a while peek at a shared file for instructions from master. For instance, if a file interrupt
exists, then the future should return when it becomes aware of it. Then the master can create that file as a way to signal an "interrupt" to the future. A poor-mans signalling scheme.
@HenrikBengtsson Thanks for the very detailed response. Your additional comment is not helpful to me as I'm running a function from an external package in the future, so can't really make it check a file.
The rest makes sense, and I can see now that it really is a lot of work. I'll maybe have a play on a linux machine if I have time, but this probably won't happen anytime soon.
The reason I'm asking is that I have a "Stop" button in an R shiny app which currently makes the main R process just entirely forget about the future'd function. However, locally on Windows if I try do get more than 3 or 4 (I have 4 cores) things going at a time, everything locks up as there are no more cores. Potentially, the external function could take hours if the user puts bad inputs in, so it'd be useful to properly interrupt the process. The "Stop" button doesn't stop the function (which is running in a future), but will let it continue and just forget about it. Thus is a single user tries running something which will take ages once, everything is pretty much dead until one of their jobs finishes, if that makes sense.
I'll happily help contribute towards a solution to this in any way possible, but wouldn't really know how to start I'm afraid...
If a solution to this just isn't going to happen for a long time then fair enough. I understand that this is currently a limitation of how things are done. Thanks for your time!
I do feel like this is an important issue that shouldn't be ignored though! It would be super nice to have some kind of endFuture
function attached to a future object. Something like
a <- future(...)
futureObj <- futureOf(a)
killProcess(futureObj)
would be ideal, and I feel like it's applicable to a wide range of situations.
It sounds like the hardest case out of the ones you mentioned is the "process on another machine" case. To me, it seems like the simplest solution to that problem is to start and maintain a control process on each remote machine, which just sits idle until instructed to kill something by the master process. That should solve the running out of resources problem since the control process gets to claim those resources before any workers do.
Thanks for spending time thinking about this. Yes, for SOCK-like cluster, having a janitor process on each machine seems to be the one option. This can be done by launching one janitor per machine. However, this would require a new backend or a heavily modified version of parallel's makeCluster()
.
An alternative approach would be that for each worker (future) launched a main monitor/janitor process which then launches the actual working process. This can be done with the existing SOCK framework of parallel.
A third alternative is to have the main R session simply log in to the machine when needed and kill the given set of workers.
However, these type of features may better be suited for a separate backend package (think parallel, snow, processx, sys, batchtools, ..., new_backend_pkg).
Another option is to make cancellation support optional for future backends, and have the cancellation function return a logical vector indicating which futures were actually cancelled (and probably issue a warning if it can't cancel something). The worst case for a future that can't be cancelled is that it finishes running and then the result is discarded, which is fairly benign in many cases. Presumably you'd also want a function to assert that the current backend supports cancellation so you can check that before starting a future that you can't cancel.
I was using this function
stopFuture <- function(x){
tools::pskill(x$job$pid,signal = tools::SIGTERM)
tools::pskill(x$job$pid,signal = tools::SIGKILL)
}
Here is and example (derived from original examples)
require(future)
# normal
plan(multiprocess)
a %<-% {
cat("Resolving 'a' ...")
Sys.sleep(2)
cat("done\n")
Sys.getpid()
}
cat("Waiting for 'a' to be resolved ...\n")
f <- futureOf(a)
count <- 1
while (!resolved(f)) {
cat(count, "\n")
Sys.sleep(0.2)
count <- count + 1
}
cat("Waiting for 'a' to be resolved ... DONE\n")
a
# terminated
plan(multiprocess)
a %<-% {
cat("Resolving 'a' ...")
Sys.sleep(2)
cat("done\n")
Sys.getpid()
}
cat("Waiting for 'a' to be resolved ...\n")
f <- futureOf(a)
stopFuture(f)
count <- 1
while (!resolved(f)) {
cat(count, "\n")
Sys.sleep(0.2)
count <- count + 1
}
cat("Waiting for 'a' to be resolved ... DONE\n")
a
Just an FYI, in relationship to this, I just filed bug report PR17395 with a patch for tools::pskill()
, because it does not return a correct value. It still sends the signal, but due to the bug (actually two bugs) it is currently impossible to know whether the signal was successfully sent or not.
UPDATE: The tools::pskill()
bug was fixed in R-devel r74426 (to become R 3.5.0 in April 2018), cf. https://github.com/wch/r-source/commit/a84374c6750712bf91ab486cf8e6bc71f6afb1e5
Hello,
A tad hacky and obviously needs some logic before kill
can be invoked but this could be helpful to some of us until a clean solution is supported by the package.
The logic to identify could be in the parent (if it makes sense for it to be there) or in another R async sweeper process or cron job checking which pids need to be terminated.
promise = future(myFunctionWichDoesAsyncStuff())
async_pid = promise$job$pid
system(paste("kill -9", async_pid))
promises
packageSee https://github.com/HenrikBengtsson/future/issues/222 if you would like to kill the async process after retrieving its return value into the parent process.
Finally if you need a "fire and forget" call (value returned by async process never retrieved into the parent process), there is always the system call to an R script which will automatically terminate the async process when the processing is done, even if the parent process was terminated before the async completed:
Rcommand = paste("Rscript /path/to/my/script.R", arguments)
out = system(Rcommand, wait = FALSE)
I am in a similar situation to stop async process. The promise from the future() is piped to the file_rows() reactive output value.
Based on the tip by @raphaelvannson i tried the below way to stop the future.
library(shiny)
library(shinyjs)
library(shinydashboard)
library(ipc)
library(promises)
library(future)
library(V8)
plan(multiprocess)
server <- function(input, output, session) {
file_rows <- reactiveVal()
observeEvent(input$rec_run, {
prog <- Progress$new(session)
prog$set(message = "Analysis in progress",
detail = "This may take a while...",
value = NULL)
fut = future({system(paste(
"orthofinder", ###external software
file_input, ##reactive input
"|",
paste0("head -", file_nrows) , ##reactive input
">",
"out.txt"
))
head_rows <- read.delim("out.txt")
head_rows
}) %...>%
file_rows() %>%
finally(~prog$close())
})
observeEvent(input$rec_cancel, {
async_pid <- fut$job$pid
system(paste("kill -15", async_pid))
})
However, it returns the error: "Warning: Error in observeEventHandler: object 'fut' not found"
when promise is given as fut<<- future() in the above code then it returns the below error:
sh: 1: kill: Usage: kill [-s sigspec | -signum | -sigspec] [pid | job]... or kill -l [exitstatus]
Could someone hint what is wrong here?
The usage message is telling you that you are calling kill
incorrectly, probably because of the lack of a space between "kill -15"
and async_pid
. I recommend you look into the glue package as a more robust way to interpolate variables into strings. Instead of paste
, you would use glue("kill -15 {async_pid}")
.
observeEvent(input$rec_cancel, {
async_pid <- fut$job$pid
glue("kill -15 {async_pid}")
})
This is not doing anything and the log file is also empty.
It does not look like the issue is with paste
. It was tested using Sys.getpid()
observeEvent(input$rec_cancel, {
#async_pid <- fut$job$pid
async_pid <- Sys.getpid()
system(paste("kill -15", async_pid))
})
The log file has the below message:
Terminated R --no-save --slave -f \/opt\/shiny-server\/R\/SockJSAdapter.R
It is terminating the pid running "/opt\/shiny-server\/R\/SockJSAdapter.R" but not terminating the PID running the system()
command inside the future()
.
It looks like the problem could be in fetching the PID of future() or the syntax of using future/promises in the above code.
Oh, you're right, I forgot paste defaults to using a space in between. Still, the error message you got from kill
in your previous comment normally indicates an incorrect usage, so I'm not sure what's going on.
Sys.getpid()
correclty fetches the PID value and therefore the kill command works by killing the corresponding PID. I tired to print PIDs to log file to see if they return any PID value.
cat(fut1$job$pid),
cat(Sys.getpid())
It only prints the PID given by Sys.getpid()
while fut1$job$pid
is empty. It points that kill
is not getting any value and hence the usage error in case of fut1$job$pid
. Since the future is run using plan(multiprocess)
the PID that runs the async future is different from the Sys.getpid()
as described at https://cran.r-project.org/web/packages/future/vignettes/future-1-overview.html and the core problem here is to fetch the PID of the async task. It would be helpful if someone could help to fetch the PID for the process running the async task with future() in the above code.
In the meantime the above question from @mehar-GIT was also carried to Stackoverflow: please see my Edit part. I was able to give an example which gets the PID and kills the process but this only works a couple of times before R stops with an error:
Unhandled promise error: callr failed, could not start R, exited with non-zero status, has crashed or was killed
Warning: Error in : callr failed, could not start R, exited with non-zero status, has crashed or was killed
95: <Anonymous>
I guess this is the case because the fututre isn't interrupt aware as mentioned by @HenrikBengtsson earlier in this thread. Maybe someone can point me in the right direction for the implementation of using withCallingHandlers()
as suggested by @HenrikBengtsson .
Don't have a solution and very little time to look into this, but note that when the promises package is involved, things are getting much more complicated because you are now also dealing with a background, asynchronous event loop (by the later package). That becomes rather complicated to troubleshoot and you are likely to run into issues that are hard to replicate (e.g. "works a few times but then fails").
I guess this is the case because the fututre isn't interrupt aware ...
Correct that there is no support for terminating/interrupting futures in the core Future API. Also, the different future backends hasn't really been implemented making them agile to the R workers failing/terminating - but some try to do post-mortem analysis involving trying to detect when a background R process is gone.
@HenrikBengtsson
I was using this function
stopFuture <- function(x){ tools::pskill(x$job$pid,signal = tools::SIGTERM) tools::pskill(x$job$pid,signal = tools::SIGKILL) }
Thanks! I found that using
stopFuture(ft) try(value(ft))
cleans
parallel:::children
, which otherwise is left cluttered with stopped processes ids.library(parallel) parallel:::children() list()
The pid is buried a bit deeper in the Future object now. Also, to trigger the tryCatch
interrupt handler on Windows as well as Linux, use ps::ps_interrupt()
to send SIGINT.
future::plan(future::multisession)
x <- future::future(tryCatch(
expr = { Sys.sleep(15); "Done" },
interrupt = function (i) { "Interrupted" }
))
pid <- x$workers[x$node][[1]]$session_info$process$pid
ps::ps_interrupt(ps::ps_handle(pid))
future::value(x)
The pid is buried a bit deeper in the Future object now. ...
That is an internal structure that stems from both future and the parallelly packages. There are zero guarantees that it will be available in future releases. So, don't rely on it other than for prototyping ideas.
FYI, there's killNode()
in parallelly, that works on cluster nodes that were created by parallelly::makeClusterPSOCK()
, which is how future sets up multisession
workers.
The suggestion of @dansmith01 somewhat works: eventually no nodes are left and future::future
hangs, which is not so cool.
When I try what you suggest @HenrikBengtsson, I get the following error:
Warning: Error in : MultisessionFuture (<none>) failed to call grmall() on cluster RichSOCKnode #1 (PID 80773 on localhost
‘localhost’). The reason reported was ‘error writing to connection’. Post-mortem diagnostic: No process exists with this PID,
i.e. the localhost worker is no longer alive. Detected a non-exportable reference (‘externalptr’) in one of the globals (‘d’ of
class ‘spec_tbl_df’) used in the future expression. The total size of the 33 globals exported is 445.20 KiB. The three largest
globals are ‘d’ (84.41 KiB of class ‘list’), ‘stri_opts_collator’ (57.23 KiB of class ‘function’) and ‘stri_sub’ (43.57 KiB of class
‘function’)
PS: OK, if I use tools::SIGINT it does not crash. But I still eventually get stuck with new futures hanging. Can you be a bit more precise as to how one is supposed to handle the interrupts gracefully?
@HenrikBengtsson Here are some intereseting minimal "non-working" examples:
Without sleep:
> future::plan(future::multisession, workers = 2)
> f <- future::plan()(tryCatch({Sys.sleep(100)}, interrupt = \(cond) stop("interrupted")))
> tools::pskill(f$workers[[f$node]]$session_info$process$pid, signal = tools::SIGINT)
> future::value(f)
Error in unserialize(node$con) :
MultisessionFuture (<none>) failed to receive results from cluster RichSOCKnode #1 (PID 48254 on localhost ‘localhost’). The reason reported was ‘error reading from connection’. Post-mortem diagnostic: No process exists with this PID, i.e. the localhost worker is no longer alive
Sleeping before setting up the future to leave a bit more time for the cluster to set up:
> future::plan(future::multisession, workers = 2)
> Sys.sleep(2)
> f <- future::plan()(tryCatch({Sys.sleep(3)}, interrupt = \(cond) stop("interrupted")))
> tools::pskill(f$workers[[f$node]]$session_info$process$pid, signal = tools::SIGINT)
> future::value(f)
... (stuck forever)
With sleep just before pskill
(the only one that is almost OK):
> future::plan(future::multisession, workers = 2)
> f <- future::plan()(tryCatch({Sys.sleep(100)}, interrupt = \(cond) stop("interrupted")))
> Sys.sleep(1) # This should not be necessary
> tools::pskill(f$workers[[f$node]]$session_info$process$pid, signal = tools::SIGINT)
> future::value(f)
Error in value[[3L]](cond) : interrupted
With a single worker (why is it stuck?):
> future::plan(future::multisession, workers = 1)
> f <- future::plan()(tryCatch({Sys.sleep(100)}, interrupt = \(cond) stop("interrupted")))
> Sys.sleep(1) # This should not be necessary
> tools::pskill(f$workers[[f$node]]$session_info$process$pid, signal = tools::SIGINT)
> future::value(f)
... (stuck forever)
^C
Error in value[[3L]](cond) : interrupted
It seems that sending SIGINTs to nodes is not harmful while they are idle, the only problematic time interval seems to be once a job has been scheduled for them but before it enters the user-provided tryCatch
that swallows interrupts:
> future::plan(future::multisession, workers = 2)
> for (i in 1:100) {
> f <- future::plan()(tryCatch({Sys.sleep(1); "ALIVE AND WELL"}, interrupt = \(cond) stop("interrupted")))
> print(future::value(f))
> tools::pskill(f$workers[[f$node]]$session_info$process$pid, signal = tools::SIGINT)
> }
[1] "ALIVE AND WELL"
[1] "ALIVE AND WELL"
[1] "ALIVE AND WELL"
[1] "ALIVE AND WELL"
... (ad nauseam)
^C
(NOTHING)
Gets interesting again with workers = 1
:
> future::plan(future::multisession, workers = 1)
> for (i in 1:100) {
> f <- future::plan()(tryCatch({Sys.sleep(1); "ALIVE AND WELL"}, interrupt = \(cond) stop("interrupted")))
> print(future::value(f))
> tools::pskill(f$workers[[f$node]]$session_info$process$pid, signal = tools::SIGINT)
> }
[1] "ALIVE AND WELL"
[1] "ALIVE AND WELL"
[1] "ALIVE AND WELL"
[1] "ALIVE AND WELL"
... (ad nauseam)
^C (timed correctly I guess)
Error in value[[3L]](cond) : interrupted
PS: OK, workers = 1
is a bit of an edge case since it just falls back to sequential/lazy.
This one seems like a step in the right direction:
> future::plan(future::multisession, workers = 2);
> f <- future::plan()(tryCatch({Sys.sleep(1); "ALIVE AND WELL"}, interrupt = \(cond) stop("interrupted")));
> later::later(\() {tools::pskill(f$workers[[f$node]]$session_info$process$pid, signal = tools::SIGINT); print(future::value(f))})
later: exception occurred while executing callback:
Evaluation error: interrupted.
In a real-world scenario, I still manage to make it crash with later
. Is it possible that https://github.com/HenrikBengtsson/future/issues/438 is what I am looking after?
Note that using future.callr
as the backend makes it really easy reliably interrupt futures. You can even suspend and resume them.
It's true that other backends would require a more complicated implementation. Having this working reliably would be very useful for me to make reactive Shiny apps that do not choke on heavy computations at the single-user level (data management and plotting). Maybe I can help? I am especially interested in making this work for future::multisession
, as future.callr::callr
(or, I guess, the non-cross-platform future::multicore
) is quite slow in that scenario (process spawn overhead), even though I can interrupt/terminate.
Perhaps a first step would be for future::multisession
to recover from worker crashes, as it would emulate future.callr::callr
if you use SIGTERM
. Sadly, SIGINT
with the wrong timing seems to stall communication forever rather than triggering a "worker down" event. Crashes can happen, they do not necessarily mean bad programming (you cannot cover all scenarios at finite cost, it's easier to clean up and start over). It would be wiser for future::multisession
to take that into account.
Regarding interrupts without terminating/restarting workers, my current understanding is that SIGINT
is too weak for this. You cannot disambiguate between authors of the signal and even then, you cannot disambiguate their intent. I think this requires a proper communication channel. The simplest idea that comes to mind is to use the existing communication channel for that, provided it can be run on a separate thread than the future job (if it is not already the case). The future job can be easily cancelled if it runs in a separate thread.
Side note: terminate
, interrupt
, and abort
make more sense than cancel
, because cancel
can have a rollback
meaning, which is an orthogonal concern.
Continuing on https://github.com/HenrikBengtsson/future/issues/93#issuecomment-1655523337: it may actually be that the cause of the error is much simpler. If I try to get the job node before calling future::value
I run into trouble:
> future::plan(future::multisession, workers = 2)
> f <- future::future({3})
> f$workers[[f$node]]
Error in sprintf(gettext(fmt, domain = domain, trim = trim), ...) :
unsupported type
> future::value(f)
Error: Unexpected result (of class ‘NULL’ != ‘FutureResult’) retrieved for MultisessionFuture future (label = ‘<none>’, expression = ‘{; 3; }’): This suggests that the communication with ‘RichSOCKnode’ #1 on host ‘localhost’ (R version 4.2.2 (2022-10-31), platform x86_64-apple-darwin17.0) is out of sync.
whereas
> future::plan(future::multisession, workers = 2)
> f <- future::future({3})
> f$workers[f$node]
Socket cluster with 1 nodes where 1 node is on host ‘localhost’ (R version 4.2.2 (2022-10-31), platform x86_64-apple-darwin17.0)
> future::value(f)
[1] 3
or
> future::plan(future::multisession, workers = 2)
> f <- future::future({3})
> future::value(f)
[1] 3
> f$workers[[f$node]]
node of a socket cluster on host ‘localhost’ with pid 98384
Is there another way of retrieving the node's pid than f$workers[[f$node]]$session_info$process$pid
? Because that one seem to involve some out-of-sync cluster communication while a future is running.
Mmh, nevermind. It's only trouble if you try to print the node:
> future::plan(future::multisession, workers = 2)
> f <- future::future({3})
> f$workers[[f$node]]$session_info$process$pid
[1] 1194
> future::value(f)
[1] 3
Background / Question
@gdevailly, sorry I didn't see your June 16 question on Twitter until now;
The simple answer is that this is not possible.
CLARIFICATION 2018-03-15: but I hope we can add something that can be used (e.g. although not part of the official Future API there could this be functions that the end user can use to manually kill workers)
Ideas / Thoughts
Technically, it should be possible to interrupt and even terminate background R processes that evaluates a future. At least if they run on the local machine.
For instance, for multicore futures we already have an internal handle for the process ID (pid) of the forked process. For multisession futures, we could retrieve the pid for each cluster worker before launching the actual future. With this pid, we should be able to send an interrupt signal, which should signal an
interrupt
condition within R for that process. What complicates this is that we need to come up with a platform-independent method for terminating/signalling processes. We most likely need to reach out to asystem()
call for this. This should be doable, but increases the risk for not working everywhere.Signalling a process running on another machine is a bit more complicated. It would basically require being able to launch a separate signalling process on that same machine. Not impossible, but also not guaranteed to work, e.g. maybe the future process already running occupies the last available port / socket for that machine.
For futures running on a cluster via a job scheduler (as created by the future.BatchJobs package) it should also be possible to terminate such futures / jobs using the
killJob()
functionality already provided by the BatchJobs package.That's just some of my thoughts for now.