r-lib / ps

R package to query, list, manipulate system processes
https://ps.r-lib.org/
Other
78 stars 20 forks source link

The `callr::r_session` process seems to hang on `Windows` after using `ps::ps_interrupt` #187

Open mihaiconstantin opened 1 day ago

mihaiconstantin commented 1 day ago

Brief Description

My sincere apologies if this issue doesn’t fit here, but I have run out of ideas of things to try.

In short:

I would greatly appreciate your help in trying to understand what is happening...

Contents of example.R:

# Start a permanent session.
session <- callr::r_session$new()

# Create a cluster in the session.
invisible(session$run(function() {
    cluster <<- parallel::makeCluster(2, type = "PSOCK")
}))

# Get the worker PIDs.
worker_pids <- session$run(function() {
    parallel::clusterCall(cluster, Sys.getpid)
})

# Get handles to the worker processes.
worker_handles <- lapply(worker_pids, function(pid) {
    return(ps::ps_handle(pid))
})

# Keep the session busy (i.e., but not the workers.)
session$call(function() {
    while (TRUE) { Sys.sleep(0.1) }
})

# Allow some time for the call to kick in.
Sys.sleep(0.25)

# Get the state (i.e., expect `busy`).
cat(paste0("Session state before interrupt: ", session$get_state()), "\n\n")

# Interrupt the session.
session$interrupt()

# Get the state (i.e., expect `busy`).
cat(paste0("\nSession state after interrupt: ", session$get_state()), "\n\n")

# Wait for the session to be interrupted.
if (session$get_state() == "busy" && session$poll_process(0) == "timeout") {
    # Wait.
    session$poll_process(-1)
}

# Read the interrupt result (i.e., error).
cat("\n", rep("-", 25), "\n")
cat(paste0("Session result after session interrupt:\n"))
session$read()
cat(rep("-", 25), "\n\n")

# Get the state (i.e., expect `idle`).
cat(paste0("Session state after reading the interrupt result: ", session$get_state()), "\n\n")

# Manually propagate the interrupt to the cluster workers.
lapply(worker_handles, function(handle) {
    tryCatch(
        expr = {
            # Interrupt the process.
            ps::ps_interrupt(p = handle, ctrl_c = TRUE)

            # Return some informative message.
            return(paste0("Interrupted worker `", ps::ps_pid(handle), "`."))
        },
        error = function(e) {
            # Return some informative message.
            return(paste0("Failed to interrupt worker `", ps::ps_pid(handle), "`."))
        }
    )
})

# Get the state (i.e., expect `idle`).
cat(paste0("Session state after interrupting the workers: ", session$get_state()), "\n\n")

# Sys.sleep(0.1)

# Run something in the background session.
session$run(function() {
    print("Session `run` output.")
})

cat("\n")

# Verify that the workers are still alive.
session$run(function() {
    parallel::clusterEvalQ(cluster, {
        print(paste0("Worker `", Sys.getpid(), "` is alive."))
    })
})

# Close the session later.
session$close()

Observed Output

On macOS (i.e., as expected):

Session state before interrupt: busy

[1] TRUE

Session state after interrupt: busy

[1] "ready"

 - - - - - - - - - - - - - - - - - - - - - - - - -
Session result after session interrupt:
$code
[1] 200

$message
[1] "done callr-rs-result-2583f17261a"

$result
NULL

$stdout
[1] ""

$stderr
[1] ""

$error
<callr_timeout_error/callr_error/rlib_error_3_0/rlib_error/error>
Error:
! callr subprocess interrupted
Caused by error:
! interrupt

attr(,"class")
[1] "callr_session_result"
- - - - - - - - - - - - - - - - - - - - - - - - -

Session state after reading the interrupt result: idle

[[1]]
[1] "Interrupted worker `627`."

[[2]]
[1] "Interrupted worker `630`."

Session state after interrupting the workers: idle

[1] "Session `run` output."

[[1]]
[1] "Worker `627` is alive."

[[2]]
[1] "Worker `630` is alive."

On Windows (i.e., hanging):

Session state before interrupt: busy

[1] TRUE

Session state after interrupt: busy

[1] "ready"

 - - - - - - - - - - - - - - - - - - - - - - - - -
Session result after session interrupt:
$code
[1] 200

$message
[1] "done callr-rs-result-9964509e49b3"

$result
NULL

$stdout
[1] ""

$stderr
[1] ""

$error
<callr_timeout_error/callr_error/rlib_error_3_0/rlib_error/error>
Error:
! callr subprocess interrupted
Caused by error:
! interrupt

attr(,"class")
[1] "callr_session_result"
- - - - - - - - - - - - - - - - - - - - - - - - -

Session state after reading the interrupt result: idle

[[1]]
[1] "Interrupted worker `36216`."

[[2]]
[1] "Interrupted worker `12316`."

Session state after interrupting the workers: idle

NULL
gaborcsardi commented 1 day ago

AFAIR, this is the best pattern for interrupting a session:

session$interrupt()
session$poll_io(2000)
session$read()

You could use a different timeout in the poll, and you should also check that the poll_io() returned the expected result, because R sessions are not always interruptible.

As for your specific issue, I suspect that the problem is that parallel's subprocesses use the same console as the callr subprocess, so when you call ps_interrupt() on them, all of them are interrupted. At least, this almost works for me, except that (I think) the callr subprocess receives some extra interrupts:


# Start a permanent session.
session <- callr::r_session$new()

# Create a cluster in the session.
invisible(session$run(function() {
    cluster <<- parallel::makeCluster(2, type = "PSOCK")
}))

# Get the worker PIDs.
worker_pids <- session$run(function() {
    parallel::clusterCall(cluster, Sys.getpid)
})

# Get handles to the worker processes.
worker_handles <- lapply(worker_pids, function(pid) {
    return(ps::ps_handle(pid))
})

# Keep the session busy (i.e., but not the workers.)
session$call(function() {
    while (TRUE) { Sys.sleep(0.1) }
})

# Allow some time for the call to kick in.
Sys.sleep(0.25)

# Get the state (i.e., expect `busy`).
cat(paste0("Session state before interrupt: ", session$get_state()), "\n\n")

# Interrupt the session.
session$interrupt()
print(session$poll_io(10000))

# Get the state (i.e., expect `busy`).
cat(paste0("\nSession state after interrupt: ", session$get_state()), "\n\n")

# Read the interrupt result (i.e., error).
cat("\n", rep("-", 25), "\n")
cat(paste0("Session result after session interrupt:\n"))
session$read()
cat(rep("-", 25), "\n\n")

# Get the state (i.e., expect `idle`).
cat(paste0("Session state after reading the interrupt result: ", session$get_state()), "\n\n")

# Manually propagate the interrupt to the cluster workers.
lapply(worker_handles, function(handle) {
    tryCatch(
        expr = {
            # Interrupt the process.
            ps::ps_interrupt(p = handle, ctrl_c = TRUE)

            # Return some informative message.
            return(paste0("Interrupted worker `", ps::ps_pid(handle), "`."))
        },
        error = function(e) {
            # Return some informative message.
            return(paste0("Failed to interrupt worker `", ps::ps_pid(handle), "`."))
        }
    )
})

# Get the state (i.e., expect `idle`).
cat(paste0("Session state after interrupting the workers: ", session$get_state()), "\n\n")

# Sys.sleep(0.1)

# Run something in the background session.
message("running")
session$run(function() {
    print("Session `run` output.")
})

cat("\n")

# Verify that the workers are still alive.
session$run(function() {
    parallel::clusterEvalQ(cluster, {
        print(paste0("Worker `", Sys.getpid(), "` is alive."))
    })
})

# Close the session later.
session$close()
mihaiconstantin commented 12 hours ago

Thanks a lot for your answer!

You could use a different timeout in the poll, and you should also check that the poll_io() returned the expected result, because R sessions are not always interruptible.

This is very helpful to know.

At least, this almost works for me, except that (I think) the callr subprocess receives some extra interrupts:

I was still not able to get it to work on Windows, however your other sentence gave me an idea.

As for your specific issue, I suspect that the problem is that parallel's subprocesses use the same console as the callr subprocess, so when you call ps_interrupt() on them, all of them are interrupted.

This made me wonder if the order in which the interrupts are sent matters. I am not entirely sure why, but if I interrupt the parallel processes (i.e., ps::ps_interrupt(p = handle, ctrl_c = TRUE)) before the session interrupt (i.e., session$interrupt()), the script seems to output as expected.