HenrikBengtsson / parallelly

R package: parallelly - Enhancing the 'parallel' Package
https://parallelly.futureverse.org
128 stars 7 forks source link

WISH: makeClusterPSOCK(): Different shell quoting for main and worker #25

Closed HenrikBengtsson closed 3 years ago

HenrikBengtsson commented 3 years ago

(Migrated from https://github.com/HenrikBengtsson/parallelly/issues/20#issuecomment-484221340)

@jdnewmil wrote on 2019-04-17:

I encountered problems with shell quoting in the future v1.12.0 makeNodePSOCK function. You have to know what OS you are on in order to quote shell commands properly but the standard function did not seem to do that correctly, so I made a modified version of the function that accepts a mastershell argument (calling computer) and workershell (called computer) argument that works on Windows 10. Unfortunately, I have had difficulty using this function on the Linux machines, so it isn't a general solution but for now it allows me to kickstart a plan from Win10 and fall back to the normal worker handling by name for the deeper plan levels. Maybe this will be helpful in fixing Win10 support in the future package. It may be advisable to build the shell type into the plan in future versions of future.

The test1.R script runs on my Win10 laptop, uses my patched function to access a cluster main computer using putty, then activates a couple of node computers using DNS node names, and uses forking on the node computers to obtain proof of activation for return to the caller. The future_patch.R script contains my modified-but-slightly-broken makeNodePSOCK function.

https://gist.github.com/jdnewmil/008a15162bebac9d64d46b175b5d9813

@hb wrote on 2019-04-21:

Thanks for this. It's meant to work everywhere, so I'm happy to fix makeNodePSOCK(). Would you mind pointing to me the critical code change, or much better, provide a minimimal PR / git diff so I can understand exactly what's missing.

@jdnewmil wrote on 2019-04-21:

Attached is a unified diff with both versions reformatted to minimize whitespace differences. It doesn't work for me when triggered under Linux, and I think the way the function gets triggered by the plan has to be updated to keep track of which shell applies. I doubt that remoting into Windows will be very popular though (and it might be enabled using cygwin/bash anyway), so the immediate focus will probably remain on the Windows CMD -> Linux Bash step that this code does handle.

Attachment 1:

```r function(worker = "localhost", master = NULL, port, connectTimeout = getOption("future.makeNodePSOCK.connectTimeout", as.numeric(Sys.getenv("R_FUTURE_MAKENODEPSOCK_CONNECTTIMEOUT", 2 * 60))), timeout = getOption("future.makeNodePSOCK.timeout", as.numeric(Sys.getenv("R_FUTURE_MAKENODEPSOCK_TIMEOUT", 30 * 24 * 60 * 60))), rscript = NULL, homogeneous = NULL, rscript_args = NULL, methods = TRUE, useXDR = TRUE, outfile = "/dev/null", renice = NA_integer_, rshcmd = getOption("future.makeNodePSOCK.rshcmd", Sys.getenv("R_FUTURE_MAKENODEPSOCK_RSHCMD")), user = NULL, revtunnel = TRUE, rshlogfile = NULL, rshopts = getOption("future.makeNodePSOCK.rshopts", Sys.getenv("R_FUTURE_MAKENODEPSOCK_RSHOPTS")), rank = 1L, manual = FALSE, dryrun = FALSE, verbose = FALSE, workershell = "sh", mastershell = "cmd") { localMachine <- is.element(worker, c("localhost", "127.0.0.1")) if (!localMachine) { localMachine <- is_localhost(worker) if (localMachine) worker <- "localhost" } attr(worker, "localhost") <- localMachine manual <- as.logical(manual) stop_if_not(length(manual) == 1L, !is.na(manual)) dryrun <- as.logical(dryrun) stop_if_not(length(dryrun) == 1L, !is.na(dryrun)) if (identical(rshcmd, "")) rshcmd <- NULL if (!is.null(rshcmd)) { rshcmd <- as.character(rshcmd) stop_if_not(length(rshcmd) >= 1L) } if (identical(rshopts, "")) rshopts <- NULL rshopts <- as.character(rshopts) user <- as.character(user) stop_if_not(length(user) <= 1L) port <- as.integer(port) if (is.na(port) || port < 0L || port > 65535L) { stop("Invalid port: ", port) } revtunnel <- as.logical(revtunnel) stop_if_not(length(revtunnel) == 1L, !is.na(revtunnel)) if (!is.null(rshlogfile)) { if (is.logical(rshlogfile)) { stop_if_not(!is.na(rshlogfile)) if (rshlogfile) { rshlogfile <- tempfile(pattern = "future_makeClusterPSOCK_", fileext = ".log") } else { rshlogfile <- NULL } } else { rshlogfile <- as.character(rshlogfile) } } if (is.null(master)) { if (localMachine || revtunnel) { master <- "localhost" } else { master <- Sys.info()[["nodename"]] } } stop_if_not(!is.null(master)) timeout <- as.numeric(timeout) stop_if_not(length(timeout) == 1L, !is.na(timeout), is.finite(timeout), timeout >= 0) methods <- as.logical(methods) stop_if_not(length(methods) == 1L, !is.na(methods)) if (is.null(homogeneous)) { homogeneous <- { localMachine || (!revtunnel && is_localhost(master)) || (!is_ip_number(worker) && !is_fqdn(worker)) } } homogeneous <- as.logical(homogeneous) stop_if_not(length(homogeneous) == 1L, !is.na(homogeneous)) if (is.null(rscript)) { rscript <- "Rscript" if (homogeneous) rscript <- file.path(R.home("bin"), rscript) } else { rscript <- as.character(rscript) stop_if_not(length(rscript) >= 1L) bin <- Sys.which(rscript[1]) if (bin == "") bin <- rscript[1] rscript[1] <- bin } rscript_args <- as.character(rscript_args) useXDR <- as.logical(useXDR) stop_if_not(length(useXDR) == 1L, !is.na(useXDR)) stop_if_not(is.null(outfile) || is.character(outfile)) renice <- as.integer(renice) stop_if_not(length(renice) == 1L) rank <- as.integer(rank) stop_if_not(length(rank) == 1L, !is.na(rank)) verbose <- as.logical(verbose) stop_if_not(length(verbose) == 1L, !is.na(verbose)) verbose_prefix <- "[local output] " if (!any(grepl("parallel:::.slaveRSOCK()", rscript_args, fixed = TRUE))) { rscript_args <- c(rscript_args, "-e", shQuote("parallel:::.slaveRSOCK()", workershell)) } pidfile <- NULL if (localMachine && !dryrun) { autoKill <- isTRUE(getOption("future.makeNodePSOCK.autoKill", as.logical(Sys.getenv("R_FUTURE_MAKENODEPSOCK_AUTOKILL", TRUE)))) if (autoKill) { pidfile <- tempfile(pattern = sprintf("future.parent=%d.", Sys.getpid()), fileext = ".pid") pidcode <- sprintf("try(cat(Sys.getpid(),file=\"%s\"), silent = TRUE)", pidfile) rscript_pid_args <- c("-e", shQuote(pidcode, mastershell)) test_cmd <- paste(c(rscript, rscript_pid_args, "-e", shQuote(sprintf("file.exists(%s)", shQuote(pidfile, "sh")), workershell)), collapse = " ") if (verbose) { message("Testing if worker's PID can be inferred: ", sQuote(test_cmd)) } input <- NULL if (any(grepl("singularity", rscript, ignore.case = TRUE))) input <- "" res <- system(test_cmd, intern = TRUE, input = input) status <- attr(res, "status") suppressWarnings(file.remove(pidfile)) if ((is.null(status) || status == 0L) && any(grepl("TRUE", res))) { if (verbose) message("- Possible to infer worker's PID: TRUE") rscript_args <- c(rscript_pid_args, rscript_args) } else { if (verbose) message("- Possible to infer worker's PID: FALSE") pidfile <- NULL } } } rscript_label <- getOption("future.makeNodePSOCK.rscript_label", Sys.getenv("R_FUTURE_MAKENODEPSOCK_RSCRIPT_LABEL")) if (!is.null(rscript_label) && nzchar(rscript_label) && !isFALSE(as.logical(rscript_label))) { if (isTRUE(as.logical(rscript_label))) { script <- grep("[.]R$", commandArgs(), value = TRUE)[1] if (is.na(script)) script <- "UNKNOWN" rscript_label <- sprintf("%s:%s:%s:%s", script, Sys.getpid(), Sys.info()[["nodename"]], Sys.info()[["user"]]) } rscript_args <- c("-e", shQuote(paste0("#label=", rscript_label), mastershell), rscript_args) } if (methods) { rscript_args <- c("--default-packages=datasets,utils,grDevices,graphics,stats,methods", rscript_args) } if (!localMachine && revtunnel) { rscript_port <- port + (rank - 1L) } else { rscript_port <- port } rscript <- paste(shQuote(rscript, workershell), collapse = " ") rscript_args <- paste(rscript_args, collapse = " ") envvars <- paste0("MASTER=", master, " PORT=", rscript_port, " OUT=", outfile, " TIMEOUT=", timeout, " XDR=", useXDR) cmd <- paste(rscript, rscript_args, envvars) if (!is.na(renice) && renice > 0L) { cmd <- sprintf("nice --adjustment=%d %s", renice, cmd) } if (!localMachine) { find <- is.null(rshcmd) if (find) { which <- NULL if (verbose) { message(sprintf("%sWill search for all 'rshcmd' available\n", verbose_prefix)) } } else if (all(grepl("^<[a-zA-Z-]+>$", rshcmd))) { find <- TRUE if (verbose) { message(sprintf("%sWill search for specified 'rshcmd' types: %s\n", verbose_prefix, paste(sQuote(rshcmd), collapse = ", "))) } which <- gsub("^<([a-zA-Z-]+)>$", "\\1", rshcmd) } if (find) { rshcmd <- find_rshcmd(which = which, must_work = !localMachine && !manual && !dryrun) if (verbose) { s <- unlist(lapply(rshcmd, FUN = function(r) { sprintf("%s [type=%s, version=%s]", paste(sQuote(r), collapse = ", "), sQuote(attr(r, "type")), sQuote(attr(r, "version"))) })) s <- paste(sprintf("%s %d. %s", verbose_prefix, seq_along(s), s), collapse = "\n") message(sprintf("%sFound the following available 'rshcmd':\n%s", verbose_prefix, s)) } rshcmd <- rshcmd[[1]] } else { if (is.null(attr(rshcmd, "type"))) attr(rshcmd, "type") <- "" if (is.null(attr(rshcmd, "version"))) attr(rshcmd, "version") <- "" } s <- sprintf("type=%s, version=%s", sQuote(attr(rshcmd, "type")), sQuote(attr(rshcmd, "version"))) rshcmd_label <- sprintf("%s [%s]", paste(sQuote(rshcmd), collapse = ", "), s) if (verbose) message(sprintf("%sUsing 'rshcmd': %s", verbose_prefix, rshcmd_label)) if (length(user) == 1L) rshopts <- c("-l", user, rshopts) if (revtunnel) { rshopts <- c(sprintf("-R %d:%s:%d", rscript_port, master, port), rshopts) if (isTRUE(attr(rshcmd, "OpenSSH_for_Windows"))) { ver <- windows_build_version() if (!is.null(ver) && ver <= "10.0.17763.253") { msg <- sprintf("WARNING: You're running Windows 10 (build %s) where this 'rshcmd' (%s) may not support reverse tunneling (revtunnel = TRUE) resulting in worker failing to launch", ver, paste(sQuote(rshcmd), collapse = ", "), rshcmd_label) if (verbose) message(c(verbose_prefix, msg)) } } } if (is.character(rshlogfile)) { rshlogflag <- if (grepl("[Pp][Ll][Ii][Nn][Kk]", rshcmd[1])) { "-sshlog" } else { "-E" } rshopts <- c(sprintf("%s %s", rshlogflag, shQuote(rshlogfile, mastershell)), rshopts) } rshopts <- paste(rshopts, collapse = " ") rsh_call <- paste(paste(shQuote(rshcmd, mastershell), collapse = " "), rshopts, worker) local_cmd <- paste(rsh_call, shQuote(cmd, mastershell)) } else { local_cmd <- cmd } stop_if_not(length(local_cmd) == 1L) is_worker_output_visible <- is.null(outfile) if (manual || dryrun) { msg <- c("----------------------------------------------------------------------") if (localMachine) { msg <- c(msg, sprintf("Manually, start worker #%s on local machine %s with:", rank, sQuote(worker)), sprintf("\n %s\n", cmd)) } else { msg <- c(msg, sprintf("Manually, (i) login into external machine %s:", sQuote(worker)), sprintf("\n %s\n", rsh_call)) msg <- c(msg, sprintf("and (ii) start worker #%s from there:", rank), sprintf("\n %s\n", cmd)) msg <- c(msg, sprintf("Alternatively, start worker #%s from the local machine by combining both step in a single call:", rank), sprintf("\n %s\n", local_cmd)) } msg <- paste(c(msg, ""), collapse = "\n") cat(msg) utils::flush.console() if (dryrun) return(NULL) } else { if (verbose) { message(sprintf("%sStarting worker #%s on %s: %s", verbose_prefix, rank, sQuote(worker), local_cmd)) } input <- if (.Platform$OS.type == "windows") "" else NULL res <- system(local_cmd, wait = FALSE, input = input) if (verbose) { message(sprintf("%s- Exit code of system() call: %s", verbose_prefix, res)) } if (res != 0) { warning(sprintf("system(%s) had a non-zero exit code: %d", local_cmd, res)) } } if (verbose) { message(sprintf("%sWaiting for worker #%s on %s to connect back", verbose_prefix, rank, sQuote(worker))) if (is_worker_output_visible) { if (.Platform$OS.type == "windows") { message(sprintf("%s- Detected 'outfile=NULL' on Windows: this will make the output from the background worker visible when running R from a terminal, but it will most likely not be visible when using a GUI.", verbose_prefix)) } else { message(sprintf("%s- Detected 'outfile=NULL': this will make the output from the background worker visible", verbose_prefix)) } } } con <- local({ setTimeLimit(elapsed = connectTimeout) on.exit(setTimeLimit(elapsed = Inf)) warnings <- list() tryCatch({ withCallingHandlers({ socketConnection("localhost", port = port, server = TRUE, blocking = TRUE, open = "a+b", timeout = timeout) }, warning = function(w) { if (verbose) { message(sprintf("%sDetected a warning from socketConnection(): %s", verbose_prefix, sQuote(conditionMessage(w)))) } warnings <<- c(warnings, list(w)) }) }, error = function(ex) { setTimeLimit(elapsed = Inf) machineType <- if (localMachine) "local" else "remote" msg <- sprintf("Failed to launch and connect to R worker on %s machine %s from local machine %s.\n", machineType, sQuote(worker), sQuote(Sys.info()[["nodename"]])) cmsg <- conditionMessage(ex) if (grepl(gettext("reached elapsed time limit"), cmsg)) { msg <- c(msg, sprintf(" * The error produced by socketConnection() was: %s (which suggests that the connection timeout of %.0f seconds (argument 'connectTimeout') kicked in)\n", sQuote(cmsg), connectTimeout)) } else { msg <- c(msg, sprintf(" * The error produced by socketConnection() was: %s\n", sQuote(cmsg))) } if (length(warnings) > 0) { msg <- c(msg, sprintf(" * In addition, socketConnection() produced %d warning(s):\n", length(warnings))) for (kk in seq_along(warnings)) { cmsg <- conditionMessage(warnings[[kk]]) if (grepl("port [0-9]+ cannot be opened", cmsg)) { msg <- c(msg, sprintf(" - Warning #%d: %s (which suggests that this port is either already occupied by another process or blocked by the firewall on your local machine)\n", kk, sQuote(cmsg))) } else { msg <- c(msg, sprintf(" - Warning #%d: %s\n", kk, sQuote(cmsg))) } } } msg <- c(msg, sprintf(" * The localhost socket connection that failed to connect to the R worker used port %d using a communication timeout of %.0f seconds and a connection timeout of %.0f seconds.\n", port, timeout, connectTimeout)) msg <- c(msg, sprintf(" * Worker launch call: %s.\n", local_cmd)) pid <- readWorkerPID(pidfile) if (!is.null(pid)) { if (verbose) message(sprintf("Killing worker process (PID %d) if still alive", pid)) success <- pid_kill(pid) if (verbose) message(sprintf("Worker (PID %d) was successfully killed: %s", pid, success)) msg <- c(msg, sprintf(" * Worker (PID %d) was successfully killed: %s\n", pid, success)) } else if (localMachine) { msg <- c(msg, sprintf(" * Failed to kill local worker because it's PID is could not be identified.\n")) } suggestions <- NULL if (!verbose) { suggestions <- c(suggestions, "Set 'verbose=TRUE' to see more details.") } if (.Platform$OS.type == "windows") { if (is_worker_output_visible) { suggestions <- c(suggestions, "On Windows, to see output from worker, set 'outfile=NULL' and run R from a terminal (not a GUI).") } else { suggestions <- c(suggestions, "On Windows, output from worker when using 'outfile=NULL' is only visible when running R from a terminal (not a GUI).") } } else { if (!is_worker_output_visible) { suggestions <- c(suggestions, "Set 'outfile=NULL' to see output from worker.") } } if (is.character(rshlogfile)) { smsg <- sprintf("Inspect the content of log file %s for %s.", sQuote(rshlogfile), sQuote(rshcmd)) lmsg <- tryCatch(readLines(rshlogfile, n = 15L, warn = FALSE), error = function(ex) NULL) if (length(lmsg) > 0) { lmsg <- sprintf(" %2d: %s", seq_along(lmsg), lmsg) smsg <- sprintf("%s The first %d lines are:\n%s", smsg, length(lmsg), paste(lmsg, collapse = "\n")) } suggestions <- c(suggestions, smsg) } else { suggestions <- c(suggestions, sprintf("Set 'rshlogfile=TRUE' to enable logging for %s.", sQuote(rshcmd))) } if (!localMachine && revtunnel && isTRUE(attr(rshcmd, "OpenSSH_for_Windows"))) { suggestions <- c(suggestions, sprintf("The 'rshcmd' (%s) used may not support reverse tunneling (revtunnel = TRUE). See ?future::makeClusterPSOCK for alternatives.\n", rshcmd_label)) } if (length(suggestions) > 0) { suggestions <- sprintf(" - Suggestion #%d: %s\n", seq_along(suggestions), suggestions) msg <- c(msg, " * Troubleshooting suggestions:\n", suggestions) } msg <- paste(msg, collapse = "") ex$message <- msg local({ oopts <- options(warning.length = 2000L) on.exit(options(oopts)) stop(ex) }) }) }) setTimeLimit(elapsed = Inf) if (verbose) { message(sprintf("%sConnection with worker #%s on %s established", verbose_prefix, rank, sQuote(worker))) } structure(list(con = con, host = worker, rank = rank, rshlogfile = rshlogfile), class = if (useXDR) "SOCKnode" else "SOCK0node") } ```

Attachment 2:

--- makeNodePSOCK.R 2019-04-21 18:13:49.119601300 -0700
+++ mymakenodePSOCK.R   2019-04-21 18:19:34.572029000 -0700
@@ -8,7 +8,8 @@
         Sys.getenv("R_FUTURE_MAKENODEPSOCK_RSHCMD")), user = NULL,
     revtunnel = TRUE, rshlogfile = NULL, rshopts = getOption("future.makeNodePSOCK.rshopts",
         Sys.getenv("R_FUTURE_MAKENODEPSOCK_RSHOPTS")),
-    rank = 1L, manual = FALSE, dryrun = FALSE, verbose = FALSE) {
+    rank = 1L, manual = FALSE, dryrun = FALSE, verbose = FALSE,
+    workershell = "sh", mastershell = "cmd") {
     localMachine <- is.element(worker, c("localhost", "127.0.0.1"))
     if (!localMachine) {
         localMachine <- is_localhost(worker)
@@ -48,7 +49,6 @@
             }
         } else {
             rshlogfile <- as.character(rshlogfile)
-            rshlogfile <- normalizePath(rshlogfile, mustWork = FALSE)
         }
     }
     if (is.null(master)) {
@@ -81,7 +81,7 @@
         stop_if_not(length(rscript) >= 1L)
         bin <- Sys.which(rscript[1])
         if (bin == "")
-            bin <- normalizePath(rscript[1], mustWork = FALSE)
+            bin <- rscript[1]
         rscript[1] <- bin
     }
     rscript_args <- as.character(rscript_args)
@@ -97,7 +97,8 @@
     verbose_prefix <- "[local output] "
     if (!any(grepl("parallel:::.slaveRSOCK()", rscript_args,
         fixed = TRUE))) {
-        rscript_args <- c(rscript_args, "-e", shQuote("parallel:::.slaveRSOCK()"))
+        rscript_args <- c(rscript_args, "-e", shQuote("parallel:::.slaveRSOCK()",
+            workershell))
     }
     pidfile <- NULL
     if (localMachine && !dryrun) {
@@ -107,14 +108,14 @@
         if (autoKill) {
             pidfile <- tempfile(pattern = sprintf("future.parent=%d.",
                 Sys.getpid()), fileext = ".pid")
-            pidfile <- normalizePath(pidfile, winslash = "/",
-                mustWork = FALSE)
             pidcode <- sprintf("try(cat(Sys.getpid(),file=\"%s\"), silent = TRUE)",
                 pidfile)
-            rscript_pid_args <- c("-e", shQuote(pidcode))
+            rscript_pid_args <- c("-e", shQuote(pidcode,
+                mastershell))
             test_cmd <- paste(c(rscript, rscript_pid_args,
                 "-e", shQuote(sprintf("file.exists(%s)",
-                  shQuote(pidfile)))), collapse = " ")
+                  shQuote(pidfile, "sh")), workershell)),
+                collapse = " ")
             if (verbose) {
                 message("Testing if worker's PID can be inferred: ",
                   sQuote(test_cmd))
@@ -150,7 +151,7 @@
                 Sys.info()[["user"]])
         }
         rscript_args <- c("-e", shQuote(paste0("#label=",
-            rscript_label)), rscript_args)
+            rscript_label), mastershell), rscript_args)
     }
     if (methods) {
         rscript_args <- c("--default-packages=datasets,utils,grDevices,graphics,stats,methods",
@@ -161,7 +162,7 @@
     } else {
         rscript_port <- port
     }
-    rscript <- paste(shQuote(rscript), collapse = " ")
+    rscript <- paste(shQuote(rscript, workershell), collapse = " ")
     rscript_args <- paste(rscript_args, collapse = " ")
     envvars <- paste0("MASTER=", master, " PORT=", rscript_port,
         " OUT=", outfile, " TIMEOUT=", timeout, " XDR=",
@@ -233,13 +234,19 @@
             }
         }
         if (is.character(rshlogfile)) {
-            rshopts <- c(sprintf("-E %s", shQuote(rshlogfile)),
-                rshopts)
+            rshlogflag <- if (grepl("[Pp][Ll][Ii][Nn][Kk]",
+                rshcmd[1])) {
+                "-sshlog"
+            } else {
+                "-E"
+            }
+            rshopts <- c(sprintf("%s %s", rshlogflag, shQuote(rshlogfile,
+                mastershell)), rshopts)
         }
         rshopts <- paste(rshopts, collapse = " ")
-        rsh_call <- paste(paste(shQuote(rshcmd), collapse = " "),
-            rshopts, worker)
-        local_cmd <- paste(rsh_call, shQuote(cmd))
+        rsh_call <- paste(paste(shQuote(rshcmd, mastershell),
+            collapse = " "), rshopts, worker)
+        local_cmd <- paste(rsh_call, shQuote(cmd, mastershell))
     } else {
         local_cmd <- cmd
     }
HenrikBengtsson commented 3 years ago

For my own notes, if I understand it correctly, this problem occurs when:

  1. running R on Windows 10, and
  2. try to connect to a remote Linux machine using PuTTY.

It's not 100% clear to me what the error message is, and where the error occurs, e.g. is the system call to PuTTY corrupt resulting in syntax error on the local Windows machine, or a syntax error on the remote Linux machine?

If anyone can reproduce this before me, the output from:

cl <- makeClusterPSOCK("my.remote.machine.org", verbose = TRUE)

would help me fix this.

maksimhorowitz commented 3 years ago

Did this ever get resolved? I am continuing to have issues with this going from Windows 10 to Linux AWS instance.

HenrikBengtsson commented 3 years ago

I don't think so. My last ask (https://github.com/HenrikBengtsson/parallelly/issues/25#issuecomment-725088283) was on getting a reproducible example with verbose output. I wanna understand the problem before attempting/incorporate a fix.

maksimhorowitz commented 3 years ago

Got it. I haven't been able to generate any useful logs. If I do, I will report back.

HenrikBengtsson commented 3 years ago

I am continuing to have issues with this going from Windows 10 to Linux AWS instance.

Even just getting output from:

cl <- makeClusterPSOCK("<your-Linux-AWS-instance>", verbose = TRUE)

would increase the chances for moving this forward.

maksimhorowitz commented 3 years ago

Not sure if this helps at all, but I am able to get connect to the AWS ec2 instance using the SSH package with the following command:

library(ssh)
ssh_connect(host = paste(**myuser**, **my_instance_ip**, sep = "@"),
                   keyfile = **my_open_ssh_key**,
                   verbose = TRUE)

But when using makeClusterPSOCK the script hangs and eventually times out.

HenrikBengtsson commented 3 years ago

Are you saying that you get no output at all when you use makeClusterPSOCK(..., verbose = TRUE)?

maksimhorowitz commented 3 years ago

I do. When using PuTTY nothing is outputted to the log file but there is verbose output into the R console. I will paste it here shortly after I re-run.

maksimhorowitz commented 3 years ago

So below is the command I use to try and connect

cl <- makeClusterPSOCK(
  "my_cluster_ip",
  user = "ubuntu",
  rshcmd = "<putty-plink>",
  rshopts = c("-P", 22, "-i", my_ssh_ppk_file_path),
  dryrun = FALSE,
  verbose = TRUE,
  rshlogfile = "mylogfile.txt"
)

Attached is the verbose output. I went through and replaced confidential paths and etc. with generic names.

As mentioned, the "mylogfile.txt" is blank so there isn't any helpful information there.

future_parallel_log.txt


EDIT 2021-01-26 by @HenrikBengtsson:

The essense of the verbose output is:

[local output] Workers: [n = 1] ‘my_cluster_ip’
[local output] Base port: 11751
[local output] Creating node 1 of 1 ...
[local output] - setting up node
[local output] - attempt #1 of 3
[local output] Will search for specified 'rshcmd' types: ‘<putty-plink>’

[local output] Found the following available 'rshcmd':
[local output]  1. ‘"~\PuTTY\plink.exe’, ‘-ssh’ [type=‘putty-plink’, version=‘plink: Release 0.74; Build platform: 64-bit x86 Windows; Compiler: clang 11.0.0 (https://github.com/llvm/llvm-project/ bc15bf66dcca76cc06fe71fca35b74dc4d521021), emulating Visual Studio 2013 (12.0), _MSC_VER=1800; Source commit: 014d4fb151369f255b3debed7d15a154fd9036f5’]
[local output] Using 'rshcmd': ‘"~\PuTTY\plink.exe’, ‘-ssh’ [type=‘putty-plink’, version=‘plink: Release 0.74; Build platform: 64-bit x86 Windows; Compiler: clang 11.0.0 (https://github.com/llvm/llvm-project/ bc15bf66dcca76cc06fe71fca35b74dc4d521021), emulating Visual Studio 2013 (12.0), _MSC_VER=1800; Source commit: 014d4fb151369f255b3debed7d15a154fd9036f5’]
[local output] Starting worker #1 on ‘my_cluster_ip’: "~\PuTTY\plink.exe" "-ssh" -E "~\mylogfile.txt" -R 11751:localhost:11751 -l ubuntu -P 22 -i ~/myscript_key.ppk my_cluster_ip "\"Rscript\" --default-packages=datasets,utils,grDevices,graphics,stats,methods -e \"workRSOCK <- tryCatch(parallel:::.slaveRSOCK, error=function(e) parallel:::.workRSOCK); workRSOCK()\" MASTER=localhost PORT=11751 OUT=/dev/null TIMEOUT=2592000 XDR=TRUE"
[local output] - Exit code of system() call: 0
[local output] Waiting for worker #1 on ‘my_cluster_ip’ to connect back
Failed to launch and connect to R worker on remote machine ‘my_cluster_ip’ from local machine ‘NAME_OF_LOCAL_MACHINE’.
 * The error produced by socketConnection() was: ‘reached elapsed time limit’ (which suggests that the connection timeout of 120 seconds (argument 'connectTimeout') kicked in)
 * The localhost socket connection that failed to connect to the R worker used port 11751 using a communication timeout of 2592000 seconds and a connection timeout of 120 seconds.
 * Worker launch call: ""~\PuTTY\plink.exe" "-ssh" -E "~\mylogfile.txt" -R 11751:localhost:11751 -l ubuntu -P 22 -i ~/myscript_key.ppk my_cluster_ip "\"Rscript\" --default-packages=datasets,utils,grDevices,graphics,stats,methods -e \"workRSOCK <- tryCatch(parallel:::.slaveRSOCK, error=function(e) parallel:::.workRSOCK); workRSOCK()\" MASTER=localhost PORT=11751 OUT=/dev/null TIMEOUT=2592000 XDR=TRUE".
 * Troubleshooting suggestions:
   - Suggestion #1: On Windows, output from worker when using 'outfile=NULL' is only visible when running R from a terminal (not a GUI).
   - Suggestion #2: Inspect the content of log file ‘~\mylogfile.txt’ for '~\PuTTY\plink.exe’.
   - Suggestion #3: Inspect the content of log file ‘~\mylogfile.txt’ for ‘-ssh’.

[local output] - waiting 15 seconds before trying again
...
[local output]   Failed 3 attempts with 15 seconds delay
Error in socketConnection("localhost", port = port, server = TRUE, blocking = TRUE,  : 
  Failed to launch and connect to R worker on remote machine ‘my_cluster_ip’ from local machine ‘NAME_OF_LOCAL_MACHINE’.
 * The error produced by socketConnection() was: ‘reached elapsed time limit’ (which suggests that the connection timeout of 120 seconds (argument 'connectTimeout') kicked in)
 * The localhost socket connection that failed to connect to the R worker used port 11751 using a communication timeout of 2592000 seconds and a connection timeout of 120 seconds.
 * Worker launch call: "~\PuTTY\plink.exe" "-ssh" -E "~\mylogfile.txt" -R 11751:localhost:11751 -l ubuntu -P 22 -i ~/myscript_key.ppk my_cluster_ip "\"Rscript\" --default-packages=datasets,utils,grDevices,graphics,stats,methods -e \"workRSOCK <- tryCatch(parallel:::.slaveRSOCK, error=function(e) parallel:::.workRSOCK); workRSOCK()\" MASTER=localhost PORT=11751 OUT=/dev/null TIMEOUT=2592000 XDR=TRUE".
HenrikBengtsson commented 3 years ago

Thanks. Hopefully I can combine this output with OP's patch to better understand what the problem is. I can't promise anything soon but will try to look at this before the next release.

maksimhorowitz commented 3 years ago

Yes. And for what it's worth, using Open SSH instead of PuTTY results in the same errors. The "ssh" R package uses Open SSH to connect to remote servers and as mentioned, I was able to connect with the AWS instance using the ssh package.

HenrikBengtsson commented 3 years ago

From manual inspection of your verbose output, I don't think it's related to OPs problems. Instead ...

Note that PuTTY does not support option -E, which you get if you specify argument rshlogfile (see the help page). This most likely causes plink to fail. Depending on what you're R environment is, you may see:

> cl <- parallelly::makeClusterPSOCK("foo.bar.org", rshlogfile="foo.log")
plink: unknown option "-E"

That example is from running R in the Windows terminal. So, retry without.

HenrikBengtsson commented 3 years ago

Another attempt to reproduce OP's issue

This is my test environment:

C:\Users\hb>R --vanilla

R version 4.0.3 (2020-10-10) -- "Bunny-Wunnies Freak Out"
Copyright (C) 2020 The R Foundation for Statistical Computing
Platform: x86_64-w64-mingw32/x64 (64-bit)

...

> ## Force PuTTY to be used (in case there are other SSH clients available)
> options(parallelly.makeNodePSOCK.rshcmd = "<putty-plink>")

> ## NB: I use plink's option -pw to specify SSH password. This is only
> ## because I'm lazy and can't be bothered to generate and deploy SSH keys
> ## for this example.  Passing passwords this way is bad security practice
> ## and only supported by PuTTY.
rshopts <- c("-pw", "MySecretPwd")

> ## Set up one PSOCK worker on the Raspberry Pi
> ## This uses reverse SSH tunneling just as you would with a remote machine
> cl <- parallelly::makeClusterPSOCK("192.168.10.42", user="pi", rshopts = rshops, verbose = TRUE, outfile = NULL)

[local output] Workers: [n = 1] '192.168.10.42'
[local output] Base port: 11953
[local output] Creating node 1 of 1 ...
[local output] - setting up node
[local output] - attempt #1 of 3
[local output] Will search for specified 'rshcmd' types: '<putty-plink>'

[local output] Found the following available 'rshcmd':
[local output]  1. 'C:\PROGRA~1\PuTTY\plink.exe', '-ssh' [type='putty-plink', version='plink: Release 0.74; Build platform: 64-bit x86 Windows; Compiler: clang 11.0.0 (https://github.com/llvm/llvm-project/ bc15bf66dcca76cc06fe71fca35b74dc4d521021), emulating Visual Studio 2013 (12.0), _MSC_VER=1800; Source commit: 014d4fb151369f255b3debed7d15a154fd9036f5']
[local output] Using 'rshcmd': 'C:\PROGRA~1\PuTTY\plink.exe', '-ssh' [type='putty-plink', version='plink: Release 0.74; Build platform: 64-bit x86 Windows; Compiler: clang 11.0.0 (https://github.com/llvm/llvm-project/ bc15bf66dcca76cc06fe71fca35b74dc4d521021), emulating Visual Studio 2013 (12.0), _MSC_VER=1800; Source commit: 014d4fb151369f255b3debed7d15a154fd9036f5']
[local output] Starting worker #1 on '192.168.10.42': "C:\PROGRA~1\PuTTY\plink.exe" "-ssh" -R 11953:localhost:11953 -l pi -pw MySecretPwd 192.168.10.42 "\"Rscript\" --default-packages=datasets,utils,grDevices,graphics,stats,methods -e \"workRSOCK <- tryCatch(parallel:::.slaveRSOCK, error=function(e) parallel:::.workRSOCK); workRSOCK()\" MASTER=localhost PORT=11953 OUT=/dev/null TIMEOUT=2592000 XDR=FALSE"
[local output] - Exit code of system() call: 0
[local output] Waiting for worker #1 on '192.168.10.42' to connect back
[local output] Connection with worker #1 on '192.168.10.42' established
[local output] - collecting session information
[local output] Creating node 1 of 1 ... done

Success. Verifying we indeed have a working cluster node:

> parallel::clusterEvalQ(cl, Sys.info())
[[1]]
                                 sysname
                                 "Linux"
                                 release
                            "5.4.83-v7+"
                                 version
"#1379 SMP Mon Dec 14 13:08:57 GMT 2020"
                                nodename
                               "pi-test"
                                 machine
                                "armv7l"
                                   login
                                    "pi"
                                    user
                                    "pi"
                          effective_user
                                    "pi"

>

Session information

> sessionInfo()
R version 4.0.3 (2020-10-10)
Platform: x86_64-w64-mingw32/x64 (64-bit)
Running under: Windows 10 x64 (build 18363)

Matrix products: default

locale:
[1] LC_COLLATE=English_United States.1252
[2] LC_CTYPE=English_United States.1252
[3] LC_MONETARY=English_United States.1252
[4] LC_NUMERIC=C
[5] LC_TIME=English_United States.1252

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base

loaded via a namespace (and not attached):
[1] compiler_4.0.3    parallelly_1.23.0 tools_4.0.3       parallel_4.0.3
>
HenrikBengtsson commented 3 years ago

I've tried my best to reverse engineer what I think was OP's (@jdnewmil) issue on "Win10 laptop, uses my patched function to access a cluster main computer using putty, ..." but I've failed. It could be that I've missed something but I'm gonna close this issue and label it 'invalid' until I get some reproducible example that shows that there is a need for providing arguments (e.g. workershell = "sh", mastershell = "cmd") for controlling the shell type on localhost and worker.