mschubert / clustermq

R package to send function calls as jobs on LSF, SGE, Slurm, PBS/Torque, or each via SSH
https://mschubert.github.io/clustermq/
Apache License 2.0
145 stars 26 forks source link

Job port is set to NA after binding failure #270

Closed statquant closed 1 year ago

statquant commented 2 years ago

Hello, for some reasons that elude me and that I cannot reproduce (yeah that's not much to go to), sometimes I cannot send jobs to my slurm grid. What happen is I see

Submitting 50 worker jobs (ID: cmqNA) # notice the NA here

and then R just get stuck until I send an interrupt in the terminal. Then I have to wait really long to get an error and the terminal back (which might be a bug in itself ?) Given this NA I was wondering if there is not something that can be done. Note that when I rerun the same command later on all works well.

Many thanks for the package, it's great and sorry for this unhelpful issue.

mschubert commented 2 years ago

Is this from CRAN or Github?

It looks like a port binding failure that returns NA instead of retrying/raising an error. I'll need to look at the exact implementation (i.e., version) to see how this is possible.

aruaud commented 2 years ago

I have the same issue with an SGE scheduler - although I do not have to wait long to have the session stop. I usually re-send the job again and again until it works.. Similar to @statquant, I can't reproduce it but it tends to occur when I am sending big data.

Also thanks for the great package :)

luwidmer commented 2 years ago

I also just ran into this with LSF, R 4.1.0 and clustermq_0.8.95.1 off CRAN - might be a good idea to check if the port is NA and fail if it is?

luwidmer commented 1 year ago

Ran into this recently again... pretty hard to reproduce. @mschubert since the NA comes from casting the port to integer (as.integer(sub(".*:", "", private$master))), would it be possible to add something along the lines of

if (is.na(private$port))
{
  stop("Port is NA, aborting, address was: ", private$master)
}

after line 33 in qsys.r on master (https://github.com/mschubert/clustermq/blob/master/R/qsys.r#L33) or after line 21 on develop (https://github.com/mschubert/clustermq/blob/develop/R/qsys.r#L20). This would prevent R from hanging and hopefully give use an error message that gets closer to the root cause.

Regarding the port selection: what are your thoughts on making this configurable (at the moment the range of 6000:9999 is hard-coded in util.r in master at https://github.com/mschubert/clustermq/blob/master/R/util.r#L35)? Sampling some currently free ports might be more robust (e.g. using parallelly::freePort, see https://parallelly.futureverse.org/reference/freePort.html) than sampling 100 ports from a fixed port range without checking whether these ports are free?

luwidmer commented 1 year ago

I can confirm that port NA is generated when all sampled ports are in use (e.g. test by overriding the host method in the package):

library(clustermq)
cmq <- asNamespace("clustermq")
unlockBinding("host", cmq)
cmq$host <- function(
    node=getOption("clustermq.host", Sys.info()["nodename"]),
    ports=32781, # This port is in use 
    n=1
) {
  utils::head(sample(sprintf("tcp://%s:%i", node, ports)), n)
}

fx = function(x) {
  tibble(x = x)
}

Q(fx, x=1:3, n_jobs=3, pkgs = c("tidyverse") )

with the modified error handing yields

> Q(fx, x=1:3, n_jobs=3, pkgs = c("tidyverse") )
Error in super$initialize(..., template = template) : Port is NA, aborting, address was: 

This apparently also happens when the first port in the list is in use, the others are not checked (!):

cmq$host <- function(
    node=getOption("clustermq.host", Sys.info()["nodename"]),
    ports=32781:38000, # This port is in use 
    n=1
) {
  sprintf("tcp://%s:%i", node, ports)
}
> Q(fx, x=1:3, n_jobs=3, pkgs = c("tidyverse") )
Error in super$initialize(..., template = template) : Port is NA, aborting, address was: 
luwidmer commented 1 year ago

@mschubert a suggestion for host() could be:

host <- function(
  node=getOption("clustermq.host", Sys.info()["nodename"]),
  ports=getOption("clustermq.portRange", 1024:65535), 
  n=20
) {
  free_ports <- numeric(n) * NA

  for (i in seq_len(n)){
    free_ports[i] <- parallelly::freePort(ports, default = NA)
    ports <- setdiff(ports, free_ports[i])
  }

  if (any(is.na(free_ports)))
  {
    stop("Free ports must not be NA")
  }

  sprintf("tcp://%s:%i", node, free_ports)
}
luwidmer commented 1 year ago

Is it possible that https://github.com/mschubert/clustermq/blob/master/src/CMQMaster.cpp#L19 has a bug:

    std::string listen(Rcpp::CharacterVector addrs) {
        int i;
        for (i=0; i<addrs.length(); i++) {
            auto addr = Rcpp::as<std::string>(addrs[i]);
            try {
                sock.bind(addr);
            } catch(zmq::error_t const &e) {
                if (errno != EADDRINUSE)
                    Rf_error(e.what());
            }
            return sock.get(zmq::sockopt::last_endpoint);
        }
        Rf_error("Could not bind port after ", i, " tries");
    }

Shouldn't this read as follows (note the return statement location):

    std::string listen(Rcpp::CharacterVector addrs) {
        int i;
        for (i=0; i<addrs.length(); i++) {
            auto addr = Rcpp::as<std::string>(addrs[i]);
            try {
                sock.bind(addr);
        return sock.get(zmq::sockopt::last_endpoint);
            } catch(zmq::error_t const &e) {
                if (errno != EADDRINUSE)
                    Rf_error(e.what());
            }
        }
        Rf_error("Could not bind port after ", i, " tries");
    }
mschubert commented 1 year ago

Great catch @luwidmer, that return statement indeed looks off!

Note that it's fixed in develop, but happy to merge a PR if you don't want to wait for that

luwidmer commented 1 year ago

@mschubert awesome, thanks! I patched this in the CRAN version for me, I'd be happy to wait for develop / the next version to hit CRAN. I use clustermq a lot 👍

What do you think of using parallelly in host to pre-populate the list with some ports that (should) be free (barring a race condition where something else is grabbing a bunch of ports between the R part and the C++ call), and making the port range a package option as in https://github.com/mschubert/clustermq/issues/270#issuecomment-1480935856 ?

mschubert commented 1 year ago

Making the port range configurable via an option makes sense, but I'm not sure I see the advantage of using parallely::freePort?

luwidmer commented 1 year ago

Indeed... I suppose one could also just pass the entire port range into the C++ without pre-scanning in R! Thanks!