billdenney / crew.ssh

'crew' Launcher Plugin for Running on a Remote System with 'ssh'
GNU General Public License v3.0
0 stars 0 forks source link

Testing code #1

Open billdenney opened 7 months ago

billdenney commented 7 months ago

This code may be helpful to test the package:

library(ssh)
library(crew)

session <- ssh::ssh_connect("bill@example.com")
out <-
  ssh_exec_wait(session, command = c(
    "ls"
  ))
rawToChar(out$stdout)

crew_class_launcher_ssh_mirai <- R6::R6Class(
  classname = "ssh_mirai_launcher_class",
  inherit = crew::crew_class_launcher,
  public = list(
    #' @field ssh_host passed to [ssh::ssh_connect()]
    ssh_host = NULL,
    #' @field ssh_keyfile passed to [ssh::ssh_connect()]
    ssh_keyfile = NULL,
    #' @field ssh_passwd passed to [ssh::ssh_connect()]
    ssh_passwd = NULL,
    #' @field ssh_verbose passed to [ssh::ssh_connect()]
    ssh_verbose = NULL,

    initialize =
      function(
        name = NULL,
        seconds_interval = NULL,
        seconds_launch = NULL,
        seconds_idle = NULL,
        seconds_wall = NULL,
        seconds_exit = NULL,
        tasks_max = NULL,
        tasks_timers = NULL,
        reset_globals = NULL,
        reset_packages = NULL,
        reset_options = NULL,
        garbage_collection = NULL,
        ssh_host = NULL,
        ssh_keyfile = NULL,
        ssh_passwd = NULL,
        ssh_verbose = NULL
      ) {
        super$initialize(
          name = name,
          seconds_interval = seconds_interval,
          seconds_launch = seconds_launch,
          seconds_idle = seconds_idle,
          seconds_wall = seconds_wall,
          seconds_exit = seconds_exit,
          tasks_max = tasks_max,
          tasks_timers = tasks_timers,
          reset_globals = reset_globals,
          reset_packages = reset_packages,
          reset_options = reset_options,
          garbage_collection = garbage_collection
        )
        self$ssh_host <- ssh_host
        self$ssh_keyfile <- ssh_keyfile
        self$ssh_passwd <- ssh_passwd
        self$ssh_verbose <- ssh_verbose
      },

    #' @description
    #' SSH-mirai launcher constructor
    #'
    #' @return An SSH-mirai launcher object
    #'
    launch_worker = function(call, name, launcher, worker, instance) {
      cat("call:", call, "\n")
      browser()
      call_lang <- str2lang(call)
    },
    terminate_worker = function(handle) {
      browser()
      if (ssh_info(session)$connected) {
        # TODO: Kill the job before disconnecting
        ssh_disconnect(handle)
      }
      # If not connected, should we reconnect and try to kill the job?
    }
  )
)

crew_launcher_ssh_mirai <- function(
    name = "ssh mirai",
    workers = 1L,
    host = NULL,
    port = NULL,
    seconds_interval = 0.5,
    seconds_timeout = 10,
    seconds_launch = 30,
    seconds_idle = Inf,
    seconds_wall = Inf,
    seconds_exit = 1,
    tasks_max = Inf,
    tasks_timers = 0L,
    reset_globals = TRUE,
    reset_packages = FALSE,
    reset_options = FALSE,
    garbage_collection = FALSE,
    # ssh-specific options
    ssh_host = NULL,
    ssh_keyfile = NULL,
    ssh_passwd = askpass::askpass(),
    ssh_verbose = FALSE
) {
  client <- crew::crew_client(
    name = name,
    workers = workers,
    host = host,
    port = port,
    seconds_interval = seconds_interval,
    seconds_timeout = seconds_timeout
  )
  launcher <- crew_class_launcher_ssh_mirai$new(
    name = name,
    seconds_interval = seconds_interval,
    seconds_launch = seconds_launch,
    seconds_idle = seconds_idle,
    seconds_wall = seconds_wall,
    seconds_exit = seconds_exit,
    tasks_max = tasks_max,
    tasks_timers = tasks_timers,
    reset_globals = reset_globals,
    reset_packages = reset_packages,
    reset_options = reset_options,
    garbage_collection = garbage_collection
  )
  controller <- crew::crew_controller(client = client, launcher = launcher)
  controller$validate()
  controller
}

controller <- crew_launcher_ssh_mirai(host = "bill@picasso.humanpredictions.local")
controller$start()
controller$push(
  name = "get worker IP address and process ID",
  command = paste(getip::getip(type = "local"), ps::ps_pid())
)
controller$wait()
billdenney commented 7 months ago

Or this:

library(ssh)
ssh_session <- ssh_connect("bill@example.com")
out <- ssh_exec_wait(ssh_session, command = "ls")
dput(ssh_session)

#' Create an ssh job launcher object
#' @export
#' @inheritParams crew::crew_class_launcher
#' @param ssh_host,ssh_keyfile,ssh_passwd,ssh_verbose Passed to
#'   `ssh::ssh_connect()`
ssh_launcher_class <- R6::R6Class(
  classname = "ssh_launcher_class",
  inherit = crew::crew_class_launcher,
  public = list(
    ssh_host = NA_character_,
    ssh_keyfile = NULL,
    ssh_passwd = "",
    ssh_verbose = FALSE,

    initialize = function(..., ssh_host, ssh_keyfile = NULL, ssh_passwd = "", ssh_verbose = FALSE) {
      super$initialize(...)
      self$ssh_host <- ssh_host
      self$ssh_keyfile <- ssh_keyfile
      self$ssh_passwd <- ssh_passwd
      self$ssh_verbose <- ssh_verbose
    },
    launch_worker = function(call, name, launcher, worker, instance) {
      private$session <-
        ssh::ssh_connect(
          host = self$ssh_host,
          keyfile = self$ssh_keyfile,
          passwd = self$ssh_passwd,
          verbose = self$ssh_verbose
        )
      # Track the PID and PID start time to ensure that only the correct job is
      # killed.  Solution based on
      # https://stackoverflow.com/questions/5731234/how-to-get-the-start-time-of-a-long-running-linux-process
      ps_start_time <- 'date --date="$(ps -p $! -o lstart=)" "+%s"'
      pid_lstart <-
        ssh::ssh_exec_internal(
          session = private$session,
          command = sprintf("R -e %s & echo $! $(%s)", shQuote(call), ps_start_time)
        )
      setNames(
        strsplit(x = rawToChar(pid_lstart$stdout), split = " ")[[1]],
        c("pid", "start_time")
      )
    },
    terminate_worker = function(handle) {
      # Need to clean up the processes on the other end of the connection first,
      # if possible.
      # TODO: Try to kill the process on the server; verify PID and start time, first
      try({
        ssh::ssh_disconnect(private$session)
      }, silent = TRUE)
    }
  ),
  private = list(
    session = NULL
  )
)

#' @title Create a controller with the ssh launcher.
#' @export
#' @description Create an `R6` object to submit tasks and
#'   launch workers.
#' @inheritParams crew::crew_controller_local
crew_controller_ssh <- function(
    name = "ssh controller name",
    ssh_host,
    ssh_keyfile = NULL,
    ssh_passwd = "",
    ssh_verbose = FALSE,
    workers = 1L,
    host = NULL,
    port = NULL,
    tls_enable = FALSE,
    tls_config = NULL,
    seconds_interval = 0.5,
    seconds_timeout = 10,
    seconds_launch = 30,
    seconds_idle = Inf,
    seconds_wall = Inf,
    seconds_exit = 1,
    tasks_max = Inf,
    tasks_timers = 0L,
    reset_globals = TRUE,
    reset_packages = FALSE,
    reset_options = FALSE,
    garbage_collection = FALSE
) {
  client <- crew::crew_client(
    name = name,
    workers = workers,
    host = host,
    port = port,
    tls_enable = tls_enable,
    tls_config = tls_config,
    seconds_interval = seconds_interval,
    seconds_timeout = seconds_timeout
  )
  launcher <- ssh_launcher_class$new(
    name = name,
    ssh_host = ssh_host,
    ssh_keyfile = ssh_keyfile,
    ssh_passwd = ssh_passwd,
    ssh_verbose = ssh_verbose,
    seconds_interval = seconds_interval,
    seconds_launch = seconds_launch,
    seconds_idle = seconds_idle,
    seconds_wall = seconds_wall,
    seconds_exit = seconds_exit,
    tasks_max = tasks_max,
    tasks_timers = tasks_timers,
    reset_globals = reset_globals,
    reset_packages = reset_packages,
    reset_options = reset_options,
    garbage_collection = garbage_collection
  )
  controller <- crew::crew_controller(client = client, launcher = launcher)
  controller$validate()
  controller
}

library(crew)
controller <- crew_controller_ssh(workers = 2, ssh_host = "bill@example.com")
controller$start()

# Informal testing ####

controller$push(
  name = "get worker IP address and process ID",
  command = paste(getip::getip(type = "local"), ps::ps_pid())
)
controller$wait()
result <- controller$pop()
result$result[[1]]

controller$client$summary()
controller$terminate()

# Load Testing ####

controller <- crew_controller_ssh(
  seconds_idle = 2L,
  workers = 2L,
  ssh_host = "bill@example.com"
)
controller$start()
# Push 100 tasks
for (index in seq_len(100L)) {
  name <- paste0("task_", index)
  controller$push(name = name, command = index, data = list(index = index))
  message(paste("push", name))
}
# Wait for the tasks to complete.
controller$wait()
# Wait for the workers to idle out and exit on their own.
crew_retry(
  ~all(controller$client$summary()$online == FALSE),
  seconds_interval = 1,
  seconds_timeout = 60
)
# Do the same for 100 more tasks.
for (index in (seq_len(100L) + 100L)) {
  name <- paste0("task_", index)
  controller$push(name = name, command = index, data = list(index = index))
  message(paste("push", name))
}
controller$wait()
crew_retry(
  ~all(controller$client$summary()$online == FALSE),
  seconds_interval = 1,
  seconds_timeout = 60
)
# Collect the results.
results <- NULL
while (!is.null(result <- controller$pop(scale = FALSE))) {
  if (!is.null(result)) {
    results <- dplyr::bind_rows(results, result)
  }
}
# Check the results
all(sort(unlist(results$result)) == seq_len(200L))
# View worker and task summaries.
controller$summary()
controller$schedule$summary()
controller$launcher$summary()
controller$schedule$summary()
# Terminate the controller.
controller$terminate()
# Now outside crew, verify that the mirai dispatcher
# and crew workers successfully terminated.