mihaiconstantin / parabar

An `R` package for parallelizing tasks, tracking their progress, and displaying accurate progress bars.
https://parabar.mihaiconstantin.com
Other
18 stars 1 forks source link

Allow user interrupts to stop the progress tracking and the computation when using `ProgressTrackingContext` #64

Open mihaiconstantin opened 2 weeks ago

mihaiconstantin commented 2 weeks ago

Note. None of the things discussed below seem to be the case on Windows.

Below, I write some code, in an attempt to understand some behavior that initially may appear puzzling. This is more just to explain to myself what I think is happening.

# Create a new `R` background session.
session <- callr::r_session$new()

# Create a cluster with two worker processes and store their PIDs.
session$run(function() {
    # Create a cluster.
    cluster <<- parallel::makeCluster(spec = 2, type = "PSOCK")

    # Store the workers PIDs.
    worker_pids <<- unlist(parallel::clusterCall(cluster, Sys.getpid))
})

# Run a task in the background that keeps the cluster busy indefinitely.
session$call(function() {
    # Send the task to the cluster workers.
    parallel::parSapply(cluster, 1:10, function(x) {
        # An infinite loop with sleep to adhere to cooperative multitasking.
        while (TRUE) Sys.sleep(0.0001)
    })
})

# Interrupt the session.
session$interrupt()

# Read the session interruption condition.
session$read()

# Confirm the session is `idle`.
session$get_state()

Next, we'll send additional tasks to the cluster while the workers are still busy running the infinite loop. We'll then interrupt the session to attempt to "cancel" these new tasks before they are picked up by the workers. However, even though we interrupt the session, the tasks remain in the event queue. This means that despite our expectation that they would be discarded, the tasks will eventually run when the workers become available. We'll demonstrate this behavior below.

# Send the first call request to the cluster.

# Send the call.
session$call(function() {
    parallel::clusterCall(cluster, print, "Call 1.")
})

Since the workers are still busy with the infinite loop, the session will be waiting for the workers to become available and pick up the call.

# Interrupt the session (i.e., "cancel" the cluster call).
session$interrupt()

# Read the session interruption condition.
session$read()

# Send the second call request to the cluster.

# Send the call.
session$call(function() {
    parallel::clusterCall(cluster, print, "Call 2.")
})

# Interrupt the session (i.e., "cancel" the cluster call).
session$interrupt()

# Read the session interruption condition.
session$read()

Now, we manually interrupt the worker processes to stop the infinite loop by sending a SIGINT signal to each worker.

# Send the interrupt signal to the workers.
session$run(function() {
    # Send the `SIGINT` signal.
    tools::pskill(worker_pids, signal = 2)
})

At this point, the cluster should get rather busy catching up with all the tasks that were added to the event loop. Sending a new call will not yield the expected output, instead, it will return the output of the previously scheduled calls.

# Send a new task "Call 3" to the cluster, which will return "Call 1" instead.
session$run(function() {
    parallel::clusterCall(cluster, print, "Call 3.")
})

# Send a new task "Call 4" to the cluster, which will return "Call 2" instead.
session$run(function() {
    parallel::clusterCall(cluster, print, "Call 4.")
})

Indeed, we can verify that there are "unread" results in the workers' output by using the low-level snow::recvResult function, or by manually deserializing the socket connections using the base::unserialize function. For example, running twice

# Wait for results from the worker nodes.
session$run(function() {
    # Use the built-in `snow` function to receive results from the workers.
    lapply(cluster, snow::recvResult)
})

or

# Wait for results from the worker nodes.
session$run(function() {
    # Use the `unserialize` function if the nodes are `SOCKET` connections.
    lapply(cluster, function(node) unserialize(node$con))
})

will produce first "Call 3." and then "Call 4.". Running snow::recvResult subsequent times will block the session waiting for the workers to produce more output.

Note. It's seems better to use the snow::recvResult method because it will automatically dispatch the correct deserialization function based on the connection type.

It is becoming clear that the cluster is no longer in a consistent state. The tasks we thought were canceled are executed later when the workers become available. This raises the following question:

Is there a way to check if the workers are busy before issuing a cluster call?

Could we maybe infer this from checking if the workers have pending outputs that need to be sent back to the master process (i.e., by executing snow::recvResult with a timeout)?

# Close the session.
session$close()

The behavior discussed above doesn't occur in an interactive session because a cluster call is always blocking. We cannot interact with the cluster object (e.g., to send another call) while it's busy. Instead, we must wait for the workers to complete their computations or interrupt them (e.g., by pressing CTRL+C). In an interactive session, interrupting the main process propagates the signal to the workers, stopping their computations.

However, when using a background session, sending an interrupt signal stops the current call in the session (e.g., the parallel::parSapply call). But the workers continue running their tasks because the interrupt signal isn't propagated to them unless we manually send a SIGINT to the workers. This allows us to interact with the cluster object in ways it wasn't designed to handle, such as sending multiple calls while the workers are still busy. Perhaps unsurprisingly, these calls appear to be queued and are executed in order when the workers become free. However, this can cause confusion because the output we receive may not correspond to the most recent call we made (i.e., as seen in the examples above). It seems that these queued calls already been executed at the next available moment, and that the their outputs are just delayed.

Moreover, It looks like sending an interrupt signal to the background session causes the worker nodes to linger around even after the session is closed and the cluster object is garbage collected.

# Create a new `R` background session.
session <- callr::r_session$new()

# Create a cluster with two worker processes and store their PIDs.
worker_pids <- session$run(function() {
    # Create a cluster.
    cluster <<- parallel::makeCluster(spec = 2, type = "PSOCK")

    # Store the workers PIDs.
    worker_pids <<- unlist(parallel::clusterCall(cluster, Sys.getpid))

    # Return the process IDs.
    worker_pids
})

# Run a task in the background that keeps the cluster busy indefinitely.
session$call(function() {
    # Send the task to the cluster workers.
    parallel::parSapply(cluster, 1:10, function(x) {
        while(TRUE) Sys.sleep(0.0001)
    })
})

# Interrupt the session.
session$interrupt()

# Read the interrupt condition.
session$read()

# Close the session.
session$close()

# Check that the worker processes are still alive.
lapply(worker_pids, function(pid) {
    ps::ps_is_running(
        ps::ps_handle(pid)
    )
})

At this point the worker processes are still alive. However, the processes will be terminated should the interrupt signal be propagated and the workers are not busy. We can verify this below, but first let us close the lingering processes.

# Send a `SIGTERM` signal to the worker processes.
tools::pskill(worker_pids, signal = 5)

# Check that the worker processes were terminated
lapply(worker_pids, function(pid) {
    ps::ps_is_running(
        ps::ps_handle(pid)
    )
})

Start a new session and verify that the worker processes are terminated along side the session when the interrupt signal is propagated and the workers are not busy.

# Create a new `R` background session.
session <- callr::r_session$new()

# Create a cluster with two worker processes and store their PIDs.
worker_pids <- session$run(function() {
    # Create a cluster.
    cluster <<- parallel::makeCluster(spec = 2, type = "PSOCK")

    # Store the workers PIDs.
    worker_pids <<- unlist(parallel::clusterCall(cluster, Sys.getpid))

    # Return the process IDs.
    worker_pids
})

# Run a task in the background that keeps the cluster busy indefinitely.
session$call(function() {
    # Send the task to the cluster workers.
    parallel::parSapply(cluster, 1:10, function(x) {
        while (TRUE) Sys.sleep(0.0001)
    })
})

# Interrupt the session.
session$interrupt()

# Read the interrupt condition.
session$read()

# Propagate the `SIGINT` to the worker processes.
tools::pskill(worker_pids, signal = 2)

# Close the session.
session$close()

# Check that the worker processes are not alive.
lapply(worker_pids, function(pid) {
    ps::ps_is_running(
        ps::ps_handle(pid)
    )
})

My intuition is that we should ensure consistency between sending calls to the cluster and the state of the workers. In other words, ensure that the session holding the cluster object is blocked as long as the worker nodes are busy computing.

Note. In the last code block above, if we replace while(TRUE) Sys.sleep(0.0001) with while(TRUE) { }, the SIGINT signal will not be propagated to the worker processes. I suspect this is the case due to there being no chance for the worker processes to check for the interrupt signal (i.e., the infinite loop does not contain any interruptible operations, violating the paradigm of cooperative multitasking that R is perhaps based on).

mihaiconstantin commented 1 week ago

Perhaps something like this?

# Create a session.
session <- callr::r_session$new()

# Create a cluster and store its worker PIDs.
session$run(function() {
    cluster <<- parallel::makeCluster(spec = 2, type = "PSOCK")
    worker_pids <<- unlist(parallel::clusterCall(cluster, Sys.getpid))
})

# Run a task in the background.
session$call(function() {
    # Run the task with handlers.
    tryCatch(
        # The expression to evaluate.
        expr = {
            # Send the task to the cluster workers.
            parallel::parSapply(cluster, 1:10, function(x) {
                # An interruptible task.
                while (TRUE) Sys.sleep(0.0001)
            })
        },
        # Catch the interrupt signal.
        interrupt = function(condition) {
            # Propagate the `SIGINT` to the worker processes.
            tools::pskill(worker_pids, signal = 2)

            # Throw.
            stop("Task interrupted.")
        }
    )
})

# Send the interrupt signal to the session.
session$interrupt()

# Wait for the interruption to complete (i.e., expect immediate return).
session$poll_process(-1)

# Expect `busy` session state.
session$get_state()

# Read the manual interrupt condition.
session$read()

# See that the session can perform other tasks.
session$run(function() {
    "Dummy run (session)."
})

# Verify that the worker processes are free (i.e., it will return immediately).
session$run(function() {
    parallel::clusterCall(cluster, print, "Dummy call (cluster).")
})

# Close the session.
session$close()