wlandau / crew

A distributed worker launcher
https://wlandau.github.io/crew/
Other
129 stars 4 forks source link

Hanging tasks in a `targets` pipeline #75

Closed wlandau closed 1 year ago

wlandau commented 1 year ago

Tasks hang when running this example. The mirais show as "unresolved".

controller <- crew::crew_controller_local(
  seconds_launch = 120,
  workers = 20L,
  seconds_idle = 5,
  seconds_exit = 5
)
controller$start()
for (i in seq_len(6000)) {
  print(i)
  controller$push(
    command = targets:::target_run(target, globalenv(), "_targets"),
    data = list(target = targets::tar_target(y, tibble::tibble(x = x, a = 1, b = 2, c = "3"))), 
  )
  tmp <- controller$pop()
}
while (!controller$empty()) {
  out <- controller$pop()
  if (!is.null(out)) {
    print(out$name)
  }
}
controller$terminate()

And if I wait long enough, I see

[1] "162217f067283e9c493c253ef92d77872c032373"
[1] "f840548352495f2ffb71b1b08e05db14ae022a05"
[1] "af1267432fb1f3bba52b9d28472c21c8a50768b2"
[1] "7e03d62698d4684c59e5e3a532a3e98624970413"
...

Error in `attributes<-`(res, list(dim = c(length(envir[["urls"]]), 4L),  : 
  dims [product 80] do not match the length of object [65]
In addition: Warning messages:
1: In is.character(r) && nzchar(r) :
  'length(x) = 49 > 1' in coercion to 'logical(1)'
2: In envir[["urls"]][i] <- r :
  number of items to replace is not a multiple of replacement length
3: In self$workers$socket[index] <- socket :
  number of items to replace is not a multiple of replacement length
4: In recv(sock, mode = mode, block = timeout) :
  received data could not be converted to integer
wlandau commented 1 year ago

This is related to https://github.com/shikokuchuo/mirai/issues/58 and https://github.com/shikokuchuo/mirai/issues/53. Elusive intermittent problems like this have been persisting for weeks, and I have no idea how to solve them. Every time I try to simplify a reproducible example and peel back layers, everything starts working.

The only thing I can think of to do at this point is to fork mirai and add a bunch of logging to server() and see what prints out.

wlandau commented 1 year ago

So far, this only happens when there are a lot of targets tasks that run quickly. I'm really stuck, and I need more data to solve this, so I may just release and announce this stuff so more edge cases will pop up.

shikokuchuo commented 1 year ago

The only thing I can think of to do at this point is to fork mirai and add a bunch of logging to server() and see what prints out.

I don't think that would be fruitful as we've just seen the problem isn't likely to be within mirai itself, but possibly how the commands interact with other stuff in crew.

I wonder if it worked for you when you removed tmp <- controller$pop()? Just want to check that we're starting from the same point, in case there is some other corruption going on.

When it stops for me - it does after about 10 or so IDs, there is no error message. But my laptop fan turns on and if I check htop I see activity on all cores, even after quite some time and stays like this. This seems to suggest something is stuck in a (moderate) spin cycle somewhere. I say moderate as the CPUs aren't exactly maxing out, but enough for the fan to constantly be on.

shikokuchuo commented 1 year ago
> sessionInfo()
R version 4.3.0 (2023-04-21)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Ubuntu 22.04.2 LTS

Matrix products: default
BLAS/LAPACK: /opt/intel/oneapi/mkl/2023.1.0/lib/intel64/libmkl_rt.so.2;  LAPACK version 3.10.1

locale:
 [1] LC_CTYPE=en_GB.UTF-8       LC_NUMERIC=C               LC_TIME=en_GB.UTF-8        LC_COLLATE=en_GB.UTF-8    
 [5] LC_MONETARY=en_GB.UTF-8    LC_MESSAGES=en_GB.UTF-8    LC_PAPER=en_GB.UTF-8       LC_NAME=C                 
 [9] LC_ADDRESS=C               LC_TELEPHONE=C             LC_MEASUREMENT=en_GB.UTF-8 LC_IDENTIFICATION=C       

time zone: Europe/London
tzcode source: system (glibc)

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] crew_0.1.1.9006

loaded via a namespace (and not attached):
 [1] vctrs_0.6.2         cli_3.6.1           knitr_1.42          rlang_1.1.1         xfun_0.39          
 [6] processx_3.8.1      targets_1.0.0.9001  data.table_1.14.8   glue_1.6.2          nanonext_0.8.3.9001
[11] mirai_0.8.7.9001    backports_1.4.1     ps_1.7.5            fansi_1.0.4         tibble_3.2.1       
[16] base64url_1.4       yaml_2.3.7          lifecycle_1.0.3     compiler_4.3.0      codetools_0.2-19   
[21] igraph_1.4.2        pkgconfig_2.0.3     rstudioapi_0.14     getip_0.1-3         digest_0.6.31      
[26] R6_2.5.1            tidyselect_1.2.0    utf8_1.2.3          pillar_1.9.0        callr_3.7.3        
[31] magrittr_2.0.3      tools_4.3.0   
shikokuchuo commented 1 year ago

How I might approach it is: (i) current crew_controller_local() arguments do not work, i.e.

  seconds_launch = 120,
  workers = 20L,
  seconds_idle = 5,
  seconds_exit = 5

but (ii) presumably some set of arguments do.

Start from the working example and change a single thing at a time until it breaks.

shikokuchuo commented 1 year ago

Following my own advice, I removed all arguments except workers = 20L and it completes without error.

However, oddly exactly 19 IDs get printed, but if I check controller$log it shows all 6,000 tasks popped, also controller$queue is an empty list.

So to check if all the results are actually returned, I tried the following to collect the names:

library(crew)
controller <- crew::crew_controller_local(
  workers = 20L,
)
controller$start()
for (i in seq_len(6000)) {
  print(i)
  controller$push(
    command = targets:::target_run(target, globalenv(), "_targets"),
    data = list(target = targets::tar_target(y, tibble::tibble(x = x, a = 1, b = 2, c = "3"))), 
  )
  tmp <- controller$pop()
}
names <- list()
while (!controller$empty()) {
  out <- controller$pop()
  if (!is.null(out)) {
    names[[length(names) + 1L]] <- out$name
  }
}
controller$terminate()

The really strange thing is when I inspect the list names at the end I have 42 results, yet controller$log shows:

> controller$log
# A tibble: 20 × 5
   popped_tasks popped_seconds popped_errors popped_warnings controller                              
          <int>          <dbl>         <int>           <int> <chr>                                   
 1         5730          6.63              0               0 2b1d327c845473edc574c83593ec49e174102dad
 2           97          0.146             0               0 2b1d327c845473edc574c83593ec49e174102dad
 3            8          0.025             0               0 2b1d327c845473edc574c83593ec49e174102dad
 4            4          0.042             0               0 2b1d327c845473edc574c83593ec49e174102dad
 5            7          0.043             0               0 2b1d327c845473edc574c83593ec49e174102dad
 6            6          0.025             0               0 2b1d327c845473edc574c83593ec49e174102dad
 7            8          0.019             0               0 2b1d327c845473edc574c83593ec49e174102dad
 8            4          0.014             0               0 2b1d327c845473edc574c83593ec49e174102dad
 9            6          0.02              0               0 2b1d327c845473edc574c83593ec49e174102dad
10           25          0.101             0               0 2b1d327c845473edc574c83593ec49e174102dad
11            9          0.025             0               0 2b1d327c845473edc574c83593ec49e174102dad
12            5          0.015             0               0 2b1d327c845473edc574c83593ec49e174102dad
13            5          0.019             0               0 2b1d327c845473edc574c83593ec49e174102dad
14           10          0.02              0               0 2b1d327c845473edc574c83593ec49e174102dad
15           11          0.026             0               0 2b1d327c845473edc574c83593ec49e174102dad
16           26          0.071             0               0 2b1d327c845473edc574c83593ec49e174102dad
17           13          0.028             0               0 2b1d327c845473edc574c83593ec49e174102dad
18            5          0.016             0               0 2b1d327c845473edc574c83593ec49e174102dad
19           10          0.026             0               0 2b1d327c845473edc574c83593ec49e174102dad
20           11          0.021             0               0 2b1d327c845473edc574c83593ec49e174102dad
shikokuchuo commented 1 year ago

This was again a bug in the testing script, and not so strange after all.

The log does not lie and the 'missing' names were actually retrieved when tmp <- controller$pop() is called. The below script simply stores those instances in a list:

library(crew)
controller <- crew::crew_controller_local(
  workers = 20L,
)
controller$start()
tmp <- list()
for (i in seq_len(6000)) {
  print(i)
  controller$push(
    command = targets:::target_run(target, globalenv(), "_targets"),
    data = list(target = targets::tar_target(y, tibble::tibble(x = x, a = 1, b = 2, c = "3"))), 
  )
  tmp[[length(tmp) + 1L]] <- controller$pop()
}
names <- list()
while (!controller$empty()) {
  out <- controller$pop()
  if (!is.null(out)) {
    names[[length(names) + 1L]] <- out$name
  }
}
controller$terminate()

The lists tmp and names together give the full 6,000 results.

wlandau commented 1 year ago

I wonder if it worked for you when you removed tmp <- controller$pop()? Just want to check that we're starting from the same point, in case there is some other corruption going on.

When I remove tmp <- controller$pop() from the example at https://github.com/wlandau/crew/issues/75#issue-1709009643, I get the "Error in attributes<-" error and the warnings listed. Digging further, I see the urls field of the compute profile looks off:

> as.list(environment(mirai::daemons)$..)[[controller$router$name]]$urls
 [1] "ws://10.0.0.100:44833/1/bd04f62038ee5a427854380633132a1e341d8df1" 
 [2] "ws://10.0.0.100:44833/2/e75a4d4cdf22a90aa1376f0cffb56c322826e148" 
 [3] "ws://10.0.0.100:44833/3/bcb54a27487e29c4937a2eaa4cf50c6d57fb9961" 
 [4] "\001"                                                             
 [5] "ws://10.0.0.100:44833/5/765cd0485c20f6134a91754857302fcbeeace267" 
 [6] "ws://10.0.0.100:44833/6/2a1edd40efb40728153fafc187dcb13b87638b8e" 
 [7] "ws://10.0.0.100:44833/7/54018f30cb88bc887166c3b20295e4a891d2d814" 
 [8] "ws://10.0.0.100:44833/8/091fade50440a56d139b3568d52bcbb8289ced4d" 
 [9] "ws://10.0.0.100:44833/9/c87e60f05e3847b018a9b4bae8e710badf8f620a" 
[10] "ws://10.0.0.100:44833/10/51558bf422b88aee4c46823e49f6d4271eba9ea5"
[11] "ws://10.0.0.100:44833/11/d298952ac3a432543e735c864743376df0e8411a"
[12] "ws://10.0.0.100:44833/12/69893838d7da504a154f2150c24d2c53ecadb89f"
[13] "ws://10.0.0.100:44833/13/cb6cf87635ce5a765d8034c95343e9015a2a8195"
[14] "ws://10.0.0.100:44833/14/eb948ce064ec9b3fed3f8bed1835631045856e7a"
[15] "ws://10.0.0.100:44833/15/f3ffd4ee05eaa8fcd68f1806b2320e9112266ba2"
[16] "ws://10.0.0.100:44833/16/e6c0159466116083760aca3dfdd1792d2563b504"
[17] "ws://10.0.0.100:44833/17/4403df7d7352f29718fdd4a0ad09054c73fdb576"
[18] "ws://10.0.0.100:44833/18/c952bb8aceff7e5b751b0b816edc2195cd9cadf3"
[19] "ws://10.0.0.100:44833/19/1fff8ebd2478e698f0a53a0068109f1b68f5df46"
[20] "ws://10.0.0.100:44833/20/941b05cf3cf3b6c5802ec278a8110ebc6cb05298"
wlandau commented 1 year ago

When it stops for me - it does after about 10 or so IDs, there is no error message. But my laptop fan turns on and if I check htop I see activity on all cores, even after quite some time and stays like this. This seems to suggest something is stuck in a (moderate) spin cycle somewhere. I say moderate as the CPUs aren't exactly maxing out, but enough for the fan to constantly be on.

That's quite early. On my end, it's usually the 5900'th task or so.

wlandau commented 1 year ago

How I might approach it is: (i) current crew_controller_local() arguments do not work, i.e. but (ii) presumably some set of arguments do.

Circumstances where I am confident:

  1. It's correlated with something about how crew is using mirai.
  2. The hanging only happens if the servers auto-scale down: for example, with idletime or maxtasks. If the server always stays running, then there is no hanging.
wlandau commented 1 year ago

I finally created a reproducible example that does not use targets at all. Each task is simple and just returns TRUE (inside a crew monad). Below, there is hanging on my Ubuntu machine if the tasks_max = 100 line is in effect, and the hanging goes away if I comment it out so the servers keep running. The code is quite a bit longer because I use a custom launcher plugin that uses mirai::launch_server() and not processx. This was helpful in eliminating some possibilities.

mirai_launcher_class <- R6::R6Class(
  classname = "mirai_launcher_class",
  inherit = crew::crew_class_launcher,
  public = list(
    launch_worker = function(call, launcher, worker, instance) {
      settings <- eval(parse(text = call)[[1L]]$settings)
      do.call(what = mirai::launch_server, args = settings)
    }
  )
)

crew_controller_mirai <- function(
  name = "mirai",
  workers = 1L,
  host = NULL,
  port = NULL,
  seconds_launch = 30,
  seconds_interval = 0.01,
  seconds_timeout = 5,
  seconds_idle = Inf,
  seconds_wall = Inf,
  seconds_exit = 1,
  tasks_max = Inf,
  tasks_timers = 0L,
  reset_globals = TRUE,
  reset_packages = FALSE,
  reset_options = FALSE,
  garbage_collection = FALSE,
  auto_scale = "demand"
) {
  router <- crew::crew_router(
    name = name,
    workers = workers,
    host = host,
    port = port,
    seconds_interval = seconds_interval,
    seconds_timeout = seconds_timeout
  )
  launcher <- mirai_launcher_class$new(
    name = name,
    seconds_launch = seconds_launch,
    seconds_interval = seconds_interval,
    seconds_timeout = seconds_timeout,
    seconds_idle = seconds_idle,
    seconds_wall = seconds_wall,
    seconds_exit = seconds_exit,
    tasks_max = tasks_max,
    tasks_timers = tasks_timers,
    reset_globals = reset_globals,
    reset_packages = reset_packages,
    reset_options = reset_options,
    garbage_collection = garbage_collection
  )
  controller <- crew::crew_controller(
    router = router,
    launcher = launcher,
    auto_scale = auto_scale
  )
  controller$validate()
  controller
}

library(crew)

controller <- crew_controller_mirai(
  workers = 20L,
  tasks_max = 100
)
controller$start()
names <- character(0L)
index <- 0L
n_tasks <- 6000L
while (index < n_tasks || !(controller$empty())) {
  if (index < n_tasks) {
    index <- index + 1L
    cat("submit", index, "\n")
    controller$push(
      name = as.character(index),
      command = TRUE
    )
  }
  out <- controller$pop()
  if (!is.null(out)) {
    cat("collect", out$name, "\n")
    names[[length(names) + 1L]] <- out$name
  }
}

# unresolved tasks
lapply(controller$queue, function(x) x$handle[[1L]]$data)

controller$terminate()
wlandau commented 1 year ago

As I mentioned, crew polls daemons() frequently for auto-scaling purposes. Because of some trouble I ran into a few weeks ago, it has a bit of code to handle the case when daemons() does not return a valid integer matrix:

https://github.com/wlandau/crew/blob/d696d433fb0ec77ab9fb4defd3ca016f3dd4315c/R/crew_router.R#L219-L225

https://github.com/wlandau/crew/blob/d696d433fb0ec77ab9fb4defd3ca016f3dd4315c/R/crew_router.R#L303-L305

Just now, when I ran https://github.com/wlandau/crew/issues/75#issuecomment-1547866076 with the following change to poll():

    poll = function() {
      out <- mirai::daemons(.compute = self$name)$daemons
      if (!daemons_valid(out)) {
        print(out)
        stop("invalid daemons")
      }
      self$daemons <- out
      invisible()
    }

I see:

'errorValue' int 5 | Timed out
Error in self$router$poll() : invalid daemons

I will have to see if this happens in the motivating targets pipeline from earlier.

shikokuchuo commented 1 year ago

I wonder if it worked for you when you removed tmp <- controller$pop()? Just want to check that we're starting from the same point, in case there is some other corruption going on.

When I remove tmp <- controller$pop() from the example at #75 (comment), I get the "Error in attributes<-" error and the warnings listed. Digging further, I see the urls field of the compute profile looks off:

I've run on multiple machines and don't get this error. Have you compared the versions against the sessionInfo() I posted? Or tested on another machine? This is not normal - seems the package has been corrupted.

shikokuchuo commented 1 year ago

Alternatively, do you get all 6000 results as I described using the code I posted earlier: https://github.com/wlandau/crew/issues/75#issuecomment-1546967283

shikokuchuo commented 1 year ago

I see:

'errorValue' int 5 | Timed out
Error in self$router$poll() : invalid daemons

I will have to see if this happens in the motivating targets pipeline from earlier.

It has always been the case that a daemons() call can time out (after 3 seconds), and this is usually indicative of dispatcher having died. If indeed the URL is somehow corrupted then dispatcher will error when it tries to listen to it! I would look at eliminating this error first - it's likely causing everything else.

wlandau commented 1 year ago

This is tough to untangle because the corrupted URL seems to only explain the "Error in attributes<-" error. And I only get 'errorValue' int 5 | Timed out some of the time. In the motivating targets pipeline where I first noticed this issue outside GitHub Actions, the URLs look good, the dispatcher is running, and yet the tasks still hang. Each of these reprexes might be demonstrating a different problem.

shikokuchuo commented 1 year ago

I am most concerned about this one: "Error in attributes<-" As I said - this is not normal. I'm afraid I can't really say we're looking at the same code / talking about the same issues until we clear this one.

shikokuchuo commented 1 year ago

I just ran on Posit cloud free to check it's reproducible and there's nothing special about my setup. I installed nanonext and mirai from r-universe and crew and targets from CRAN.

The following code succeeds without error and prints out all 6000 IDs. There are only 2 workers as otherwise it runs out of memory:

library(crew)
controller <- crew::crew_controller_local(
  seconds_launch = 120,
  workers = 2L,
  seconds_idle = 5,
  seconds_exit = 5
)
controller$start()
for (i in seq_len(6000)) {
  print(i)
  controller$push(
    command = targets:::target_run(target, globalenv(), "_targets"),
    data = list(target = targets::tar_target(y, tibble::tibble(x = x, a = 1, b = 2, c = "3"))), 
  )
  #tmp <- controller$pop()
}
while (!controller$empty()) {
  out <- controller$pop()
  if (!is.null(out)) {
    print(out$name)
  }
}
controller$terminate()
wlandau commented 1 year ago

Thanks for testing. I forgot to mention: except for GitHub Actions, these issues only seem to appear when the number of servers/workers is high, higher than probably necessary for the task load. With your version of the example from https://github.com/wlandau/crew/issues/75#issuecomment-1547996652 which uses only 2 workers, everything seems to work.

After https://github.com/wlandau/crew/commit/57c817470d19536529052b9f571bdd959fecc6ea, which implements https://github.com/wlandau/crew/issues/75#issuecomment-1547878464 permanently, the GitHub Actions targets tests show 'errorValue' int 5 | Timed out on failure instead of silent hanging: https://github.com/ropensci/targets/actions/runs/4981503262/jobs/8916252731.

wlandau commented 1 year ago

From https://github.com/ropensci/targets/actions/runs/4983866816/jobs/8921453397, I actually see a variety of issues on GitHub Actions. Sometimes tests show 'errorValue' int 5 | Timed out for daemons()$daemons, others show 'errorValue' int 7 | Object closed for the task, and others just time out at 360 seconds.

wlandau commented 1 year ago

I've run on multiple machines and don't get this error. Have you compared the versions against the sessionInfo() I posted?

I have compared sessionInfo()s, and while some packages differ, mirai and nanonext have the same versions.

Or tested on another machine?

My Ubuntu machine and my company's RHEL7 SGE cluster produce similar issues that present as hanging tasks. My Macbook does not seem to show issues.

This is not normal - seems the package has been corrupted.

I will try reinstalling mirai, nanonext, crew, and other dependencies. Other than that, could it be the versions of R and/or GCC? I'm not sure how else to fix a corrupted package except as the author of the code.

shikokuchuo commented 1 year ago

I've run on multiple machines and don't get this error. Have you compared the versions against the sessionInfo() I posted?

I have compared sessionInfo()s, and while some packages differ, mirai and nanonext have the same versions.

Or tested on another machine?

My Ubuntu machine and my company's RHEL7 SGE cluster produce similar issues that present as hanging tasks. My Macbook does not seem to show issues.

That's good to know. I was just afraid somehow you or I had different local code. For me I'm not able to reproduce like on your Macbook. If you have any other reprexes, please do share. It might be useful.

This is not normal - seems the package has been corrupted.

I will try reinstalling mirai, nanonext, crew, and other dependencies. Other than that, could it be the versions of R and/or GCC? I'm not sure how else to fix a corrupted package except as the author of the code.

Thanks, re-installing the packages will just help confirm. Sorry what I meant by corrupted was perhaps the installed bytecode somehow linked outdated dependencies.

From https://github.com/ropensci/targets/actions/runs/4983866816/jobs/8921453397, I actually see a variety of issues on GitHub Actions. Sometimes tests show 'errorValue' int 5 | Timed out for daemons()$daemons, others show 'errorValue' int 7 | Object closed for the task, and others just time out at 360 seconds.

Error 5 would be if client thinks there is a connection but dispatcher has crashed for instance or network is otherwise unavailable. Error 7 means that dispatcher has actually closed the socket so the client knows this was an intentional disconnection (could happen if for example the dispatcher tries to listen to an invalid URL, errors and exits 'gracefully'). Potentially symptoms of the same problem - I wouldn't read too much into the distinction at present.

shikokuchuo commented 1 year ago

I finally created a reproducible example that does not use targets at all. Each task is simple and just returns TRUE (inside a crew monad). Below, there is hanging on my Ubuntu machine if the tasks_max = 100 line is in effect, and the hanging goes away if I comment it out so the servers keep running. The code is quite a bit longer because I use a custom launcher plugin that uses mirai::launch_server() and not processx. This was helpful in eliminating some possibilities.

mirai_launcher_class <- R6::R6Class(
  classname = "mirai_launcher_class",
  inherit = crew::crew_class_launcher,
  public = list(
    launch_worker = function(call, launcher, worker, instance) {
      settings <- eval(parse(text = call)[[1L]]$settings)
      do.call(what = mirai::launch_server, args = settings)
    }
  )
)

crew_controller_mirai <- function(
  name = "mirai",
  workers = 1L,
  host = NULL,
  port = NULL,
  seconds_launch = 30,
  seconds_interval = 0.01,
  seconds_timeout = 5,
  seconds_idle = Inf,
  seconds_wall = Inf,
  seconds_exit = 1,
  tasks_max = Inf,
  tasks_timers = 0L,
  reset_globals = TRUE,
  reset_packages = FALSE,
  reset_options = FALSE,
  garbage_collection = FALSE,
  auto_scale = "demand"
) {
  router <- crew::crew_router(
    name = name,
    workers = workers,
    host = host,
    port = port,
    seconds_interval = seconds_interval,
    seconds_timeout = seconds_timeout
  )
  launcher <- mirai_launcher_class$new(
    name = name,
    seconds_launch = seconds_launch,
    seconds_interval = seconds_interval,
    seconds_timeout = seconds_timeout,
    seconds_idle = seconds_idle,
    seconds_wall = seconds_wall,
    seconds_exit = seconds_exit,
    tasks_max = tasks_max,
    tasks_timers = tasks_timers,
    reset_globals = reset_globals,
    reset_packages = reset_packages,
    reset_options = reset_options,
    garbage_collection = garbage_collection
  )
  controller <- crew::crew_controller(
    router = router,
    launcher = launcher,
    auto_scale = auto_scale
  )
  controller$validate()
  controller
}

library(crew)

controller <- crew_controller_mirai(
  workers = 20L,
  tasks_max = 100
)
controller$start()
names <- character(0L)
index <- 0L
n_tasks <- 6000L
while (index < n_tasks || !(controller$empty())) {
  if (index < n_tasks) {
    index <- index + 1L
    cat("submit", index, "\n")
    controller$push(
      name = as.character(index),
      command = TRUE
    )
  }
  out <- controller$pop()
  if (!is.null(out)) {
    cat("collect", out$name, "\n")
    names[[length(names) + 1L]] <- out$name
  }
}

# unresolved tasks
lapply(controller$queue, function(x) x$handle[[1L]]$data)

controller$terminate()

On this one, I see probably nearer what you're seeing - almost all get collected, bar about 10 unresolved at the end. Sometimes literally it stops at collect 6000.

Just want to check on the scaling logic - at the end all the servers get reinstated repeatedly I guess, as the queue is not empty and I guess that is what is keeping the cores busy. If it is not currently the case, try reinstating ALL of them as a test - could it be that the unresolved ones are stuck at one server that is somehow being left out?

wlandau commented 1 year ago

On this one, I see probably nearer what you're seeing - almost all get collected, bar about 10 unresolved at the end. Sometimes literally it stops at collect 6000.

Glad you see what I am seeing.

Thanks, re-installing the packages will just help confirm. Sorry what I meant by corrupted was perhaps the installed bytecode somehow linked outdated dependencies.

I tried reinstalling nanonext and mirai on both the SGE cluster and local Ubuntu machine, then running the big targets pipeline on the former and https://github.com/wlandau/crew/issues/75#issuecomment-1547866076 on the latter. I still see silent hangs on SGE and 'errorValue' int 5 | Timed out for the local Ubuntu machine (even though the dispatcher is still running)

Just want to check on the scaling logic - at the end all the servers get reinstated repeatedly I guess, as the queue is not empty and I guess that is what is keeping the cores busy. If it is not currently the case, try reinstating ALL of them as a test - could it be that the unresolved ones are stuck at one server that is somehow being left out?

Do you mean rescaling them up to the full 20 servers instead of just min(length(controller$queue), 20) servers?

shikokuchuo commented 1 year ago

Do you mean rescaling them up to the full 20 servers instead of just min(length(controller$queue), 20) servers?

Precisely, I mean without knowing more, it looks the same as if the mirai were sent to a URL where there is no active server instance and hence they just remain waiting for a server to come online.

Meanwhile I am seeing if I can make the daemons control socket more robust to rule that out.

shikokuchuo commented 1 year ago

I have committed c6f23c7 as v0.8.7.9003. This upgrades the bus sockets to req/rep.

While the "Error in attributes<-" error initially seemed highly implausible to me - it does seem to imply the send / recvs were getting out of sync, hence the results of saisei() was being received for a daemons() query etc. which would explain the errors/warnings.

Req/rep should prevent this. As the requests are synchronous i.e. one after the other, I should not think I need to be even safer and wrap everything in contexts - the global socket context should be sufficient.

While this does not solve the hanging per se, it should hopefully eliminate this type of error. Please have a try with your initial reprex removing tmp <- controller$pop(), see if you can now get it to complete on your Ubuntu machine.

Hopefully it brings us one step closer. Thanks!

shikokuchuo commented 1 year ago

Do you mean rescaling them up to the full 20 servers instead of just min(length(controller$queue), 20) servers?

Precisely, I mean without knowing more, it looks the same as if the mirai were sent to a URL where there is no active server instance and hence they just remain waiting for a server to come online.

I believe this is the issue! Using the 'big targets pipeline' https://github.com/wlandau/crew/issues/75#issuecomment-1548505773. If I interrupt when it hangs, then manually launch servers using controller$launch(1) until controller$summary() shows all workers as connected. Then do out <- controller$pop() repeatedly 9-10 times for the unresolved tasks, eventually it returns NULL - and the controller queue completely clears.

wlandau commented 1 year ago

Now that you mention it, I think this is part of the picture. I just reran the original motivating big targets pipeline on the SGE cluster (not posted here), and it got stuck again at a single task. crew kept resubmitting a single worker to run it, and bizarrely, that worker never picked up the task. But when I ran controller$launch(n = 50), one of those 50 new workers picked up the task and ran it, and I got a result back.

It's odd though, a single re-launched worker should have been enough to run a single task. 50 is too many.

wlandau commented 1 year ago

I ran it again, and this time I got 5 stuck tasks. When I ran controller$pop() or controller$launch(n = 5), 5 new workers launched to try to run the 5 unresolved tasks. When I checked controller$router$poll(); controller$router$daemons(), I could see those 5 workers hit the online and instance bits, but assigned and complete stayed at 0. When I checked controller$queue and controller$pop() again, no tasks had been run. I repeated this with controller$pop() and controller$launch(n = 5) several more times, and when the workers relaunched equals the number of unresolved stuck tasks, those stuck tasks don't run. Same thing with n = 6, 7, 9, 10, and 20 in controller$launch() (waiting about 15 to 30 seconds between tries). When I ramped it up to controller$launch(n = 25), then 2 of the 5 stuck tasks ran. At that point, 3 stuck tasks remained. Re-launching 3 workers for the 3 remaining stuck tasks didn't work, and neither did n = 10, 15, 19, 20, 21, 22, 23, 24, 25, 30, or 35 in controller$launch(). Bumping n up to 40, one more of the stuck tasks finished, but the other 2 remained stuck.

In the SGE cluster case, I think restarting all possible workers will cause the remaining stuck tasks to run. But I don't think it should require that many. If there are 3 stuck tasks and I submit 3 workers, those 3 workers should be enough.

It feels like some tasks may be dispatched to the incorrect websocket URLs. It makes sense as a hypothesis because when I re-launch more workers, it increases the number of URLs that can accept tasks, so it's more of a chance that the dispatcher will get lucky if it picks the wrong URL.

wlandau commented 1 year ago

If I interrupt when it hangs, then manually launch servers using controller$launch(1) until controller$summary() shows all workers as connected.

Yeah, calling controller$launch(n = 1) repeatedly until all workers are connected (which is just like controller$launch(n = your_max_prespecified_workers)) would indeed cause all tasks to finish in this case. But I bet if you call just one controller$launch(n = 1) then all tasks still stay stuck, which shouldn't happen. (The worker you launch should at least be able to run one of those stuck tasks, which is not what I see in the SGE cluster targets pipeline.)

shikokuchuo commented 1 year ago

This leaves us tantalisingly close. We've confirmed the hypothesis that we are simply launching the wrong servers - I'll explain below.

This can be so as mirai can already be assigned to a particular server (but is not always the case) [1] - in a nutshell as we are relying on a bit of NNG's reliable delivery magic [2]. If so, that particular server must be re-launched when scaling otherwise it will just remain there and never get done.

Luckily it is a straightforward solution. All that's needed is to add one step to crew's scaling logic - always launch at least those servers for which cumulative assigned > cumulative complete [3].

Once implemented, this issue falls away.

--

Details below for additional explanation. If any of it is not clear just let me know.

[1] if a task is sent to a server, but that server then shuts down by timing out etc., that task is retained at the NNG library level at that socket, when a new instance connects it is resent automatically. This could happen not just because there is an 'exitlinger' period, but it may not be possible to ensure a server closes it's connection (and for this to get registered at dispatcher) before dispatcher has had the chance to send it the next task, as everything happens asynchronously.

[2] It is possible to override this but then (i) any solution will necessarily be more brittle and potentially introduce a source of bugs and (ii) be less performant as we'd be attempting to re-implement in R what is already part of the C library.

[3] The cumulative stats are needed, not just the snapshot for each instance, as 'assigned' must equal 'complete' over all instances but not necessarily any particular instance. Using a max one task server as an example: first instance, 2 assigned, 1 complete. assigned > complete so re-launch this server. Second instance 0 assigned, 1 complete (server finishes the previous task, doesn't get sent a new one) or 1 assigned, 1 complete (server finishes the previous task, also gets sent a new one). Neither of those cases assigned > complete, but in the second case you'd want to re-launch as there is a task waiting to complete at that server.

wlandau commented 1 year ago

I am eager to try this, thanks for explaining. I will have questions about counters/stats later today, but for now I will ask: when I relaunch a server in this particular scenario when there is a task is still in NNG, should I avoid calling saisei() first? saisei() is super helpful in preventing conflicts with servers that time out on launch, so I always call it on relaunch. But it rotates the websocket URL, which seems like it may orphan a task which is permanently glued to a particular websocket at the NNG level.

shikokuchuo commented 1 year ago

That we solved before - as saisei() rotates the listener only, so feel free to call it. Even with the changed URL, the task is retained at the socket and will be automatically re-sent as many times as required to ensure it gets done. NNG knows whenever there is a disconnection that is part of the magic!

wlandau commented 1 year ago

That we solved before - as saisei() rotates the listener only, so feel free to call it. Even with the changed URL, the task is retained at the socket and will be automatically re-sent as many times as required to ensure it gets done. NNG knows whenever there is a disconnection that is part of the magic!

Great to know, that simplifies things.

All that's needed is to add one step to crew's scaling logic - always launch at least those servers for which cumulative assigned > cumulative complete...The cumulative stats are needed, not just the snapshot for each instance, as 'assigned' must equal 'complete' over all instances but not necessarily any particular instance.

Are the assigned and complete stats already cumulative enough, even though relaunching a server resets them to zero? Can I reason on those raw numbers as received directly from daemons()$daemons?

Using a max one task server as an example: first instance, 2 assigned, 1 complete. assigned > complete so re-launch this server.

Yes, I have observed this happening sometimes. If I see a server like this that is also "inactive" (online = 0 and either instance > 0 or launching timed out) I can make it a top priority for relaunch.

Second instance 0 assigned, 1 complete (server finishes the previous task, doesn't get sent a new one) or 1 assigned, 1 complete (server finishes the previous task, also gets sent a new one). Neither of those cases assigned > complete, but in the second case you'd want to re-launch as there is a task waiting to complete at that server.

So this is where the "cumulative" part comes in, right? Since the stats reset on relaunch, I will need to sum the existing stats with the cumulative stats before the last reset in order to reason about assigned tasks.

shikokuchuo commented 1 year ago

All that's needed is to add one step to crew's scaling logic - always launch at least those servers for which cumulative assigned > cumulative complete...The cumulative stats are needed, not just the snapshot for each instance, as 'assigned' must equal 'complete' over all instances but not necessarily any particular instance.

Are the assigned and complete stats already cumulative enough, even though relaunching a server resets them to zero? Can I reason on those raw numbers as received directly from daemons()$daemons?

I guess 'yes' is the answer. If you see online and instance as '0' with stats for 'assigned' and 'complete' then you know those are the 'cumulative' ones for the instance that just ended. When a new instance connects those are zeroed out.

Using a max one task server as an example: first instance, 2 assigned, 1 complete. assigned > complete so re-launch this server.

Yes, I have observed this happening sometimes. If I see a server like this that is also "inactive" (online = 0 and either instance > 0 or launching timed out) I can make it a top priority for relaunch.

Just so you are confident in what is going on here - at dispatcher when a task is sent to the server, 'assigned' is incremented. The receive is asynchronous, and can complete at any time thereafter. Now if the server times out and the instance changes, the task is retained and resent at the NNG level as I mentioned - this part is not seen by R hence it is not logged. When the result is eventually sent back, the receive completes - at this point 'complete' is incremented. That is why for any individual instance, you can have 'assigned' being different to 'complete'.

Second instance 0 assigned, 1 complete (server finishes the previous task, doesn't get sent a new one) or 1 assigned, 1 complete (server finishes the previous task, also gets sent a new one). Neither of those cases assigned > complete, but in the second case you'd want to re-launch as there is a task waiting to complete at that server.

So this is where the "cumulative" part comes in, right? Since the stats reset on relaunch, I will need to sum the existing stats with the cumulative stats before the last reset in order to reason about assigned tasks.

Precisely.

shikokuchuo commented 1 year ago

Are the assigned and complete stats already cumulative enough, even though relaunching a server resets them to zero? Can I reason on those raw numbers as received directly from daemons()$daemons?

I guess 'yes' is the answer. If you see online and instance as '0' with stats for 'assigned' and 'complete' then you know those are the 'cumulative' ones for the instance that just ended. When a new instance connects those are zeroed out.

Sorry just so there's no confusion - what I mean is actually if you see online as '0' with stats for 'assigned' and 'complete' then you know those are the 'cumulative' ones for the instance that just ended.

If you call saisei() at this point, the URL is regenerated and instance will also turn to '0'.

Then if a server is launched and connects into that URL, online and instance both turn to '1' and the 'assigned' and 'complete' stats are zeroed out.

wlandau commented 1 year ago

I think I might hit a race condition trying to update the cumulative versions of assigned and complete. The current launching/auto-scaling workflow is to:

  1. Store the result of daemons()$daemons.
  2. Find out which workers are "inactive", using the complete and instance counters from (1) (as well as the launching timeout).
  3. Call saisei() as needed on "lost" workers (the ones that time out trying to launch).
  4. For each worker selected for launch, call saisei() and then relaunch it.

It would be convenient to update the stats during (1) when polling happens, but then new tasks could complete between (1) and the next call to saisei(). To avoid a race condition, it seems like I would have to call saisei() first, then call daemons(), and then re-launch the worker, in that order. That would imply:

  1. Store the result of daemons()$daemons to make decisions about which workers need calls to saisei() in the first place.
  2. Find out which workers are "inactive", using the complete and instance counters from (1) (as well as the launching timeout).
  3. Call saisei() as needed on "lost" workers (the ones that time out trying to launch).
  4. Call saisei() on all the workers that are about to relaunch.
  5. For all the workers with rotated tokens, store the result of daemons()$daemons again, use it to get the most up-to-date and correct assigned and instance stats, and then update the cumulative stats.
  6. Re-launch the new group of workers.

Step (5) is needed in case any tasks complete between (1) and saisei().

This is all technically possible to implement, but it doubles the amount of polling required. Is there a more efficient way to keep track of the cumulative counts?

Could saisei() return the current assigned and complete stats for the server instance it just closed?

shikokuchuo commented 1 year ago

I think I might hit a race condition trying to update the cumulative versions of assigned and complete. The current launching/auto-scaling workflow is to:

  1. Store the result of daemons()$daemons.

  2. Find out which workers are "inactive", using the complete and instance counters from (1) (as well as the launching timeout).

  3. Call saisei() as needed on "lost" workers (the ones that time out trying to launch).

  4. For each worker selected for launch, call saisei() and then relaunch it.

It would be convenient to update the stats during (1) when polling happens, but then new tasks could complete between (1) and the next call to saisei(). To avoid a race condition, it seems like I would have to call saisei() first, then call daemons(), and then re-launch the worker, in that order. That would imply:

I am not sure if the following works: poll daemons() and update the cumulative stats only for those servers where online is '0' and instance is '1' ie for ended instances. We know for these it is not possible for any updates to the stats to occur as no server is connected at this point.

For all of these call saisei(). This changes the URL and zeroes instance. This means next time daemons() is polled you don't add the stats again as instance is not '1'.

Actually only launch servers for those instances where you determine you need to.

This is all technically possible to implement, but it doubles the amount of polling required. Is there a more efficient way to keep track of the cumulative counts?

Could saisei() return the current assigned and complete stats for the server instance it just closed?

This would be equally inefficient, as getting the stats involves a separate call to dispatcher equivalent to daemons(). I have made the 2 completely separate as one returns integer information and one character information. FYI I am retrieving real binary vectors here not a serialised R object, so these necessitate 2 separate nanonext sends.

I wouldn't be worried too much about additional calls as they have already been made as lean as possible. If it's easier in terms of logic to do them, I'd just go ahead.

wlandau commented 1 year ago

I am not sure if the following works: poll daemons() and update the cumulative stats only for those servers where online is '0' and instance is '1' ie for ended instances. We know for these it is not possible for any updates to the stats to occur as no server is connected at this point.

I think this almost works. Besides servers with online = 0 and instance = 1, there may be lurking servers with online = 0 and instance = 0 which launch and complete tasks between daemons() and saisei(). I think I could poll, saisei() these "lost" workers, poll again, and then then update the cumulative stats for servers with online = 0 and instance = 1.

Unless... would I ever see instance = 0 with assigned > 0 or complete > 0?

shikokuchuo commented 1 year ago

I am not sure if the following works: poll daemons() and update the cumulative stats only for those servers where online is '0' and instance is '1' ie for ended instances. We know for these it is not possible for any updates to the stats to occur as no server is connected at this point.

I think this almost works. Besides servers with online = 0 and instance = 1, there may be lurking servers with online = 0 and instance = 0 which launch and complete tasks between daemons() and saisei().

Is that true? I thought the use of saisei() prevented this. As soon as saisei() is called the URL changes and also instance is set to 0. So even if there is a lurking server it will just die - it will never connect or affect the stats. So every time you see online as 0 and instance as 1, you know a server has come and gone, irrespective of whether it has completed any tasks. Don't forget you only ever launch servers after calling saisei() first. Does that make sense?

shikokuchuo commented 1 year ago

Unless... would I ever see instance = 0 with assigned > 0 or complete > 0?

Sorry I missed this bit. No you wouldn't, instance would always be 1 once a server connects and there must be a server for there to be assigned or competed tasks. Not until you call saisei(), at that point the instance does zero out.

wlandau commented 1 year ago

Is that true? I thought the use of saisei() prevented this. As soon as saisei() is called the URL changes and also instance is set to 0. So even if there is a lurking server it will just die - it will never connect or affect the stats.

True, but in order to know which servers need saisei() in the first place, crew needs to poll daemons() beforehand. Otherwise, it could mistakenly shut out servers that are in the process of launching. From that reasoning, a one-poll workflow might look like this:

  1. A server is past its startup window, but it is still trying to launch, and it has online = 0 and instance = 0.
  2. crew polls daemons() and sees online = 0 and instance = 0 for the server.
  3. A moment later, the server dials in and completes tasks.
  4. crew calls saisei(). At this point, I agree, no server can dial in until (6).
  5. crew looks back at the outdated stats from (2) and fails to update cumulative assigned or completed.
  6. crew launches another instance of the server and loses the previous instance's assigned and completed.

To prevent the bug in (6), I think crew will need a fresh call to daemons() in (5) instead of recycling the stats from (2).

Sorry I missed this bit. No you wouldn't, instance would always be 1 once a server connects and there must be a server for there to be assigned or competed tasks. Not until you call saisei(), at that point the instance does zero out.

Very helpful, thanks.

shikokuchuo commented 1 year ago

Thanks for laying it out so clearly. I agree with your reasoning. You're getting really good at these!

I'd happily make the extra daemons() call at 5. As I mentioned, I've already updated these to be as efficient as possible to avoid serialisation/deserialisation costs and minimise the amount of bytes actually transmitted. 2 calls probably equals 1 call previously!

wlandau commented 1 year ago

As of https://github.com/wlandau/crew/commit/cfb4ce2b12baf90d22cf912f176dfcfdeda1616a, the hanging with long pipelines is solved! Thank you so much for the constant help, I could not have figured it out on my own.

I still get sporadic instances of 'errorValue' int 5 | Timed out, but I will open a separate issue for that.

shikokuchuo commented 1 year ago

What an outcome! It only took some 200 odd Github messages across this issue as well as shikokuchuo/mirai#53 and shikokuchuo/mirai#58!