wlandau / crew

A distributed worker launcher
https://wlandau.github.io/crew/
Other
129 stars 4 forks source link

Superfluous launches in the case of fully transient workers #51

Closed wlandau closed 1 year ago

wlandau commented 1 year ago

Promoting https://github.com/wlandau/crew/discussions/50 to an issue so it is more visible. See reprex below. I am not sure where the problem comes from. Things may improve with https://github.com/shikokuchuo/mirai/discussions/38, and they may improve we can figure out a way to use mirai's own sockets instead of custom bus sockets to detect when particular new instances of worker processes connect.

library(crew)
crew_session_start()
x <- crew_controller_callr(
  tasks_max = 1L,
  workers = 1L
)
x$start()
for (index in seq_len(100L)) {
  name <- paste0("task_", index)
  x$push(name = name, command = index, data = list(index = index))
  message(paste("push", name))
}
x$wait()
results <- NULL
while (!is.null(out <- x$pop(scale = FALSE))) {
  if (!is.null(out)) {
    results <- dplyr::bind_rows(results, out)
  }
}
length(unique(results$socket_session))
#> [1] 100
utils::stack(x$summary(-contains(c("controller", "seconds"))))
#> values              ind
#> 1  ws://10.0.0.9:54499    worker_socket
#> 2                FALSE worker_connected
#> 3                FALSE      worker_busy
#> 4                  114  worker_launches # It is not usually this high, but it is always a little over 100 (e.g. 105 in other test runs)
#> 5                  101 worker_instances
#> 6                    0   tasks_assigned
#> 7                    1   tasks_complete
#> 8                  100     popped_tasks
#> 9                    0    popped_errors
#> 10                   0  popped_warnings
x$terminate()
wlandau commented 1 year ago

To do: test in a temporary fork that uses mirai sockets instead of bus sockets to look for active workers. That may help isolate the problem in crew if the bug is on my end.

wlandau commented 1 year ago

Update: although other performance tests and benchmarks have improved immensely with mirai 0.8.1.9002 and nanonext 0.8.0.9001, this particular test is still showing too many launches for the amount of tasks completed by persistent workers.

wlandau commented 1 year ago

In https://github.com/wlandau/crew/compare/main..096e6727cf2715983e7cb347c05748ca103a31aa, I try switching to mirai::daemons() to monitor connectivity instead of my custom NNG sockets, and it doesn't appear to work.

wlandau commented 1 year ago

I did do some work to clean up the controller and launcher logic, now in branch main. I repeated the test above, and I still notice a couple superfluous worker launches. In addition, I occasionally see errors under results$error:

"'miraiError' chr Error in envir[[\".expr\"]]: subscript out of bounds"

Maybe I can reproduce the error with just mirai outside crew?

wlandau commented 1 year ago

Got a reprex, not of superfluous worker launches, but of the mirai errors:

library(mirai)
library(nanonext)
daemons(n = 1L, url = "ws://127.0.0.1:5000")
tasks <- lapply(seq_len(100L), function(x) {
  mirai(x, x = x)
})
results <- list()
px <- NULL
launches <- 0L
while(length(results) < 100L) {
  if (is.null(px) || !px$is_alive()) {
    px <- callr::r_bg(function() {
      mirai::server("ws://127.0.0.1:5000", maxtasks = 1L)
    })
    launches <- launches + 1L
  }
  done <- integer(0L)
  for (i in seq_along(tasks)) {
    if (!.unresolved(tasks[[i]])) {
      done <- c(done, i)
      results[[length(results) + 1L]] <- tasks[[i]]
    }
  }
  tasks[done] <- NULL
}
print(launches)
#> [1] 100
data <- as.character(lapply(results, function(x) x$data))
print(data)
#> [1] "1"                                                   
#> [2] "2"                                                   
#> [3] "3"                                                   
#> [4] "4"                                                   
#> [5] "5"                                                   
#> [6] "6"                                                   
#> [7] "7"                                                   
#> [8] "Error in envir[[\".expr\"]]: subscript out of bounds"
#> ...
sum(grepl("^Error", data))
#> [1] 24
daemons(0L)
wlandau commented 1 year ago

Just posted https://github.com/shikokuchuo/mirai/issues/43 with the above reprex. After it's fixed, I will try the original reprex from this thread again and see how much closer to solved it is.

wlandau commented 1 year ago

Smaller version of the same test code is below. Seems to work on my Macbook.

library(mirai)
library(nanonext)
daemons(n = 1L, url = "ws://127.0.0.1:5000")
launches <- 0L
pids <- integer(0L)
while (length(pids) < 100L) {
  if (!exists("px") || !px$is_alive()) {
    px <- callr::r_bg(\() mirai::server("ws://127.0.0.1:5000", maxtasks = 1L))
    launches <- launches + 1L
  }
  if (!exists("m") || !.unresolved(m)) {
    if (exists("m")) pids <- c(pids, m$data)
    m <- mirai(ps::ps_pid())
  }
}
print(launches)
daemons(n = 0L)
wlandau commented 1 year ago

After the improvements I made yesterday, I have observed nearly all of the superfluous worker launches went away when I removed manual worker termination. (In tests/throughput/test-transient.R, there is one final worker which launches but completes no tasks, and I am not sure why or if it can be avoided.)

I think I may need to implement an exit delay similar to the exitlinger argument of mirai::server(). This can be fully asynchronous if I move exiting workers into a special queue and then manually terminate them only after a timeout is reached.

Or maybe I should only force-quit the workers that never connect within the startup window. Once a worker connects to a custom crew bus socket, we are reasonably sure it can connect to mirai as well, and then mirai can close it. But if R never starts, then the worker could be stuck in a crashed state and might need help exiting.

wlandau commented 1 year ago

@shikokuchuo, in these tests, I consistently see mirai servers with tasks_assigned = 0 and tasks_complete = 1. I did not think this would be possible because I thought all completed tasks would have had to be assigned first. I realize this is a challenging question without a reprex, but you know what might be happening?

shikokuchuo commented 1 year ago

This is probably just a function of when the information snapshot is taken. I mean the important thing is that the work is actually being done, and I am confident from my own tests that it is. This is probably a next milestone thing: whether it makes sense to maintain a cumulative record at mirai of all server instances.

wlandau commented 1 year ago

Thanks for explaining.

I added some thoughts to https://github.com/wlandau/crew/issues/51#issuecomment-1485280148:

Or maybe I should only force-quit the workers that never connect within the startup window. Once a worker connects to a custom crew bus socket, we are reasonably sure it can connect to mirai as well, and then mirai can close it. But if R never starts at all, then the worker could be stuck in a crashed state and might need help exiting.

It would really help with this to have an optional connection timeout in server(). Would this be feasible in mirai? That way, if server() starts and finds no client to connect to, it can exit after a short time without consuming unnecessary resources. In normal usage, crew will never launch a worker without a mirai client already running, but it is possible for the user to launch a worker and then terminate the whole controller before the worker starts.

wlandau commented 1 year ago

Alternatively, I would understand if you prefer to use the existing idle time for this. Just though a separate connection timeout could be shorter and add additional safety to completely persistent workers (which have idletime = Inf).

wlandau commented 1 year ago

Hmm...maybe I don't need an extra connection timeout in mirai::server(). I think I could actually implement one in crew::crew_worker() using the custom crew bus sockets you proposed in a previous thread. (I am still thankful for that critical workaround!) That would avoid starting mirai::server() in the first place unless crew is listening for the specific worker instance with the right token (previously UUID). I actually like this workaround more.

shikokuchuo commented 1 year ago

OK. What you propose in terms of the timeout is certainly feasible. Currently everything seems very robust in terms of tests etc. so I would like to get a stable version of 'mirai' on to CRAN this week as soon as possible, and then start making changes from there. So you have a bit of time to think on it.

wlandau commented 1 year ago

Awesome! From my experiments, I am suspecting more and more that that the original issue in this thread comes from crew. There is still a mystery worker which does not seem to run or accept any tasks, but that's not as big a problem as the original.

wlandau commented 1 year ago

On second thought, I actually do think it would very much help to have a mirai::server() connection timeout. Otherwise I can foresee a race condition:

  1. mirai client starts.
  2. crew bus socket on the worker successfully connects.
  3. mirai client exits.
  4. crew worker calls mirai::server().
wlandau commented 1 year ago

I think I almost have this one solved now. Most of the superfluous launches are gone. There is just one dangling worker that seems to want to show up at the very end. I am not yet sure why, but this is now a simpler and less serious problem.

wlandau commented 1 year ago

This bug, if it even is a bug anymore, is becoming extremely hard to reproduce (a very good problem).

wlandau commented 1 year ago

Based on earlier hard-to-pin-down test failures, maybe this smaller case could help:

library(crew)
crew_session_start()
x <- crew_controller_callr(workers = 1L, tasks_max = 1L)
x$start()
x$push(ps::ps_pid())
x$wait(mode = "all")
pid_out <- x$pop(scale = FALSE)$result[[1]]
pid_exp <- x$launcher$workers$handle[[1]]$get_pid()
identical(pid_out, pid_exp)
x$terminate()
crew_session_terminate()
wlandau commented 1 year ago

And a controller group version:

library(crew)
crew_session_start()
for (i in seq_len(100)) {
  print(i)
  x <- crew_controller_group(
    crew_controller_callr(workers = 1L, tasks_max = 1L, name = "a"),
    crew_controller_callr(workers = 1L, tasks_max = 1L, name = "b")
  )
  x$start()
  x$push(ps::ps_pid(), controller = "b")
  x$wait(mode = "all")
  pid_out <- x$pop(scale = FALSE)$result[[1]]
  pid_exp <- x$controllers[["b"]]$launcher$workers$handle[[1]]$get_pid()
  stopifnot(identical(pid_out, pid_exp))
  x$terminate()
}
crew_session_terminate()
wlandau commented 1 year ago

It takes several iterations, but eventually I get a PID mismatch in the above test. When that happens, there is a worker still running with pid_exp, but there is no process remaining with pid_out. Even though the process with pid_out no longer exists, I am confident that pid_out once belonged to a valid worker because the task reports valid socket_data and socket_session fields which could have only been set inside crew_worker().

I think this means there was a slight mismatch between workers and tasks and the task got done anyway. At worst, it's an off-by-one error because crew does not micromanage the dispatch of tasks to mirai daemons.

wlandau commented 1 year ago

I wonder if I can reproduce this with just mirai, or if it has to do with something in crew.

wlandau commented 1 year ago

This appears to be working well:

library(callr)
library(crew) # for crew_wait()
library(mirai)
daemons(n = 1L, url = "ws://127.0.0.1:5000", .compute = "a")
daemons(n = 1L, url = "ws://127.0.0.1:5001", .compute = "b")
for (i in seq_len(100)) {
  print(i)
  m <- mirai(ps::ps_pid(), .compute = "b")
  px <- r_bg(\() mirai::server("ws://127.0.0.1:5001", maxtasks = 1L))
  crew_wait(~!unresolved(m), seconds_interval = 0.001, seconds_timeout = 5)
  stopifnot(identical(m$data, px$get_pid()))
}
daemons(n = 0L, .compute = "a")
daemons(n = 0L, .compute = "b")
wlandau commented 1 year ago

This example with just mirai seems to produce the correct number of launches.

library(callr)
library(mirai)
library(nanonext)
library(purrr)
daemons(n = 4L, url = "ws://127.0.0.1:5005")
urls <- rownames(daemons()$daemons)
tasks <- map(seq_len(200), ~mirai(ps::ps_pid()))
launches <- rep(0L, 4L)
workers <- as.list(rep(FALSE, 4L))
names(launches) <- urls
names(workers) <- urls
while ((pending <- sum(map_lgl(tasks, .unresolved))) > 0L) {
  online <- daemons()$daemons[, "status_online"]
  disconnected <- names(online)[online < 1L]
  relaunch <- head(disconnected, n = pending)
  for (url in relaunch) {
    w <- workers[[url]]
    elapsed <- 10
    if (!isFALSE(w)) {
      elapsed <- difftime(Sys.time(), w$get_start_time(), units = "secs")
    }
    if (isFALSE(w) || (!w$is_alive() && (elapsed > 5))) {
      px <- r_bg(\(url) mirai::server(url, maxtasks = 1L), args = list(url = url))
      workers[[url]] <- px
      launches[url] <- launches[url] + 1L
    }
  }
  Sys.sleep(0.001)
}
daemons(n = 0L)
wlandau commented 1 year ago

Successfully reproduced the mismatch without controller groups:

library(crew)
crew_session_start()
x <- crew_controller_callr(workers = 1L, tasks_max = 1L, name = "a")
x$start()
for (i in seq_len(100)) {
  print(i)
  x$push(ps::ps_pid(), scale = FALSE)
  x$wait(mode = "all")
  pid_out <- x$pop(scale = FALSE)$result[[1]]
  pid_exp <- x$launcher$workers$handle[[1]]$get_pid()
  stopifnot(identical(pid_out, pid_exp))
}
x$terminate()
crew_session_terminate()
wlandau commented 1 year ago

https://github.com/wlandau/crew/commit/5a1f823cc366ecc4d56f100d90414b11819f0cea eliminates the remaining superfluous worker launches: right before a new launch, do a last-minute check to see if there is already a worker connected. I think this works because auto-scaling data may be out of date by the time the actual launch is reached.

wlandau commented 1 year ago

Or maybe that just masks the problem. The current which_active() checks "connected" status before it checks "discovered" status. Between those two checks, a worker could dial in.

shikokuchuo commented 1 year ago

What is your latency here - is it because you are only polling occasionally or do you need faster updates than calling stat()? The latter can be addressed by registering pipe events (pipe_notify()) with a cv - you get instant real time updates.

Sorry this probably doesn't help, but this new feature is just so cool :)

wlandau commented 1 year ago

51 itself does not need a faster alternative to stat(), but the prospect you describe does sound really exciting for performance! I am not sure I understand how to use condition variables myself, but I would be happy to take any suggestions for the utilities where listeners and dialers check each other:

https://github.com/wlandau/crew/blob/18a8b28d48b46cd3683b6eb6f9f5249eccd9a7e7/R/utils_nanonext.R#L51-L69

For #51 itself, the underlying issue turned out to be a really tricky race condition in the crew auto-scaling logic (not a problem in mirai at all). The following scenario was happening:

  1. Worker is not connected to the bus socket.
  2. Client checks stat(connection$listener[[1]], "pipes") and observes no worker is connected.
  3. Worker connects.
  4. Client checks stat(connection$listener[[1]], "accept") and observes that a worker connected at some point in the past.
  5. Client incorrectly reasons from (2) and (4) that a worker connected and then disconnected.
  6. Meanwhile, the worker continues to be connected.
  7. Client incorrectly re-launches a new (superfluous) worker at the socket.

All I needed to do was switch the order of (2) and (4). Implemented in https://github.com/wlandau/crew/commit/18a8b28d48b46cd3683b6eb6f9f5249eccd9a7e7. Before https://github.com/wlandau/crew/commit/18a8b28d48b46cd3683b6eb6f9f5249eccd9a7e7, I could reliably reproduce the issue using https://github.com/wlandau/crew/issues/51#issuecomment-1487853250. Now, I do not see any superfluous launches in the same test, or in tests/throughput/test-transient.R.

What a relief to have solved this!

shikokuchuo commented 1 year ago

Yes, that's great, indeed swapping the order seems to work.

There might be some benefit from using condition variables, but only if latency or performance is important. You are basically registering callbacks to happen on each event - this is all asynchronous so it doesn't slow anything down, but you get the NNG stats 'for free', so I wouldn't implement unless there's a need.

But having said that, in the above, you are calling stat twice on each socket? If you have 500 sockets, that could be quite slow right? Reading the value of a condition variable would be almost instantaneous. I can give you pointers if you want to go down this route.

wlandau commented 1 year ago

Yes, for "discovered" workers, I do end up calling stat() twice. I haven't had the chance to load test 500 workers yet because I have only tried local process workers, but I will take your word for it that it could be slow. {crew} currently assumes these checks are practically instantaneous, so any slight inefficiency at scale may be noticeable.

I would love to try condition variables for the {crew} utilities I linked to. If you would be willing to help me get started on cv-powered drop-in replacements for stat(), I would really appreciate it.

wlandau commented 1 year ago

Opened #57 for this.