wlandau / crew

A distributed worker launcher
https://wlandau.github.io/crew/
Other
123 stars 4 forks source link

Incorrect cumulative “complete” stats on Linux #90

Closed wlandau closed 1 year ago

wlandau commented 1 year ago

Related to #88. When I run https://github.com/wlandau/crew/blob/main/tests/throughput/test-backlog-tasks_max.R on my local Ubuntu machine, I always see incorrect (too low) assigned and complete stats. I occasionally see complete too low by 1 or 2 on a remote RHEL 7 node. Can't reproduce it on my local Macbook.

wlandau commented 1 year ago

I added a new test script in https://github.com/wlandau/crew/blob/main/tests/mirai/test-tallies.R (also inline below) which reproduces the bug on my Ubuntu and RHEL 7 machines. It faithfully represents what crew is doing, but it barely uses crew itself, and it completely exposes all direct calls to mirai . I think this example will help us figure out if the bug is a problem with the way crew uses mirai, or if it is an issue in mirai.

The only piece crew used in the test is the schedule class. I only include it because it is a convenient/efficient way to juggle mirai objects, and because it has a convenient implementation of throttling (to be nice to the mirai dispatcher). The schedule class calls nanonext::.unresolved() to collect resolved tasks, but otherwise it uses nothing from mirai or nanonext. And because it is so easy to test, I am very confident that it works correctly.

library(crew)
library(mirai)

# Implements throttling to avoid overburdening the {mirai} dispatcher.
throttler <- crew::crew_schedule()

# Efficient and convenient data structure to keep track of {mirai} tasks.
# It has a hash table for new tasks and a first-in/first-out linked list
# for resolved tasks. It calls nanonext::.unresolved() to collect resolved
# tasks, but otherwise it does not rely on {mirai}/{nanonext}. I highly doubt
# it is the source of the {crew} bugs in #88 or #89.
schedule <- crew::crew_schedule()
schedule$start()

# Start the {mirai} client.
n <- 20L
mirai::daemons(
  n = n,
  url = "ws://127.0.0.1:5000",
  dispatcher = TRUE,
  token = TRUE
)

# Mutable structure with {crew} worker info. This is the primary
# data structure of each {crew} launcher.
workers <- new.env(parent = emptyenv()) # For mutability.
workers$workers <- tibble::tibble(
  handle = replicate(n, new.env(), simplify = FALSE), # callr::r_bg() handles
  socket = environment(mirai::daemons)$..$default$urls, # starting URLs
  launches = rep(0L, n), # number of times a worker was launched at this index
  launched = rep(FALSE, n), # FALSE if the worker is definitely done.
  assigned = rep(0L, n), # Cumulative "assigned" stat to check backlog (#79).
  complete = rep(0L, n) # Cumulative "complete" stat to check backlog (#79).
)

# For {mirai} servers with online == 0L and instance == 1L,
# rotate the websocket URL. Also set workers$launched to FALSE,
# which signals that tally() can safely update the cumulative
# "assigned" and "complete" statistics (#79).
rotate <- function(workers) {
  info <- mirai::daemons()$daemons
  done <- which(info[, "online"] < 1L & info[, "instance"] > 0L)
  for (index in done) {
    socket <- mirai::saisei(i = index, force = FALSE)
    if (!is.null(socket)) {
      workers$workers$socket[index] <- socket # Next launch is at this URL.
      workers$workers$launched[index] <- FALSE # Lets tally() update stats.
    }
  }
}

# For workers that are definitely done and not going to dial in until the
# next launch, update the cumulative "assigned" and "complete" which {crew}
# uses to detect backlogged workers (#79). A backlogged worker is a {mirai}
# server with more assigned than complete tasks. Detecting the backlog
# is important becuase if a worker is disconnected and backlogged,
# then {crew} will need to relaunch it so the backlogged tasks can run.
tally <- function(workers) {
  info <- mirai::daemons()$daemons
  # New stats from daemons()
  new_assigned <- as.integer(info[, "assigned"])
  new_complete <- as.integer(info[, "complete"])
  # Current cumulative stats.
  old_assigned <- workers$workers$assigned
  old_complete <- workers$workers$complete
  # Not all worker stats can be safely updated. We need to make sure
  # the worker is completely done and the websocket is rotated.
  # Otherwise, the counts could change between now and the next official
  # launch of the worker. It is tricky to avoid this race condition.
  index <- !(workers$workers$launched) # Workers safe to update.
  workers$workers$assigned[index] <- old_assigned[index] + new_assigned[index]
  workers$workers$complete[index] <- old_complete[index] + new_complete[index]
  invisible()
}

# In {crew}, the scale() method of the launcher class
# re-launches all backlogged non-launched workers,
# and then it may launch additional non-launched workers
# in order to meet the demand of the task load.
# The scale() function below is a simplified version which launches
# all non-launched workers.
scale <- function(workers) {
  for (index in which(!workers$workers$launched)) { # non-launched workers
    # I would have used mirai::launch_server() here, but callr::r_bg()
    # allows me to manually terminate the server without calling
    # mirai::daemons(n = 0L). This is important for updating the final
    # assigned and complete tallies later on.
    workers$workers$handle[[index]] <- callr::r_bg(
      func = function(url) mirai::server(url = url, maxtasks = 100L),
      args = list(url = workers$workers$socket[index])
    )
    # Increment the launch count.
    workers$workers$launches[index] <- workers$workers$launches[index] + 1L
    # Signal to tally() to wait for this worker to complete
    # instead of updating the cumulative assigned and complete stats.
    workers$workers$launched[index] <- TRUE
  }
}

index <- 0L # current task
n_tasks <- 60000L # all tasks
while (index < n_tasks || schedule$nonempty()) { # while there is work to do
  if (!throttler$throttle()) { # avoid overburdening the {mirai} dispatcher
    rotate(workers) # Rotate the URLs of done workers.
    tally(workers) # Update the cumulative stats for done workers.
    scale(workers) # Re-launch all the done workers.
  }
  # If there are still tasks to launch, launch one.
  if (index < n_tasks) {
    index <- index + 1L
    cat("push", index, "\n")
    task <- mirai(index, index = index)
    # The "schedule" is nothing fancy for the purposes of #88 and #89,
    # it is just a fast data structure for bookkeeping {mirai} objects
    # without the other frills in {crew}.
    schedule$push(task)
  }
  # Try to process the results of finished tasks.
  if (schedule$nonempty()) { # If there are still tasks to process...
    # Call nanonext::.unresolved() and move resolved tasks
    # from the hash table in schedule$pushed to the first-in/first-out
    # linked list in schedule$collected.
    schedule$collect()
    task <- schedule$pop() # Return a task that was resolved and collected.
    # pop() returns NULL if there is no resolved/collected task.
    if (!is.null(task)) {
      cat("pop", task$data, "\n")
    }
  }
}

# Manually terminate the workers without calling mirai::daemons(n = 0L).
# This allows the final tally to be updated correctly.
for (handle in workers$workers$handle) {
  if (inherits(handle, "r_process") && handle$is_alive()) {
    handle$kill()
  }
}

# Update the final tally and clean up the dispatcher.
rotate(workers)
tally(workers)
daemons(n = 0L)

# The cumulative assigned and complete statistics should be equal for
# each worker.
print("worker info")
print(workers$workers[, c("launches", "assigned", "complete")])

# Should equal n_tasks.
print("total assigned")
print(sum(workers$workers$assigned))

# Should equal n_tasks.
print("total complete")
print(sum(workers$workers$complete))

# The backlog should be 0 for all workers.
print("backlog per worker")
print(table(workers$workers$assigned - workers$workers$complete))
wlandau commented 1 year ago

Yes, #87 broke things, but it also made it so easy to create reproducible examples like the one above.

wlandau commented 1 year ago

The discrepancy between assigned and complete is much more pronounced and much more reproducible with a higher value of n_tasks. I just increased n_tasks from 6000 to 60000 in https://github.com/wlandau/crew/blob/main/tests/mirai/test-tallies.R and in the comment above.

wlandau commented 1 year ago

I tested https://github.com/wlandau/crew/commit/34800d9f7e47184726b9d48b3399704d8f80fbef (just before #87) and it turns out that the cumulative stats issue was present before the major refactor I did last week.

@shikokuchuo, I have been assuming that the correct complete stat is available and up to date as soon as a mirai server disconnects and no other servers are connected to the websocket. Is it possible that there is a delay in how complete gets updated on the dispatcher, e.g. if the task has not fully arrived on the dispatcher (similar to https://github.com/shikokuchuo/mirai/issues/42)? When exactly in the life cycle of a task does the complete stat get incremented (i.e. when the server begin to send it, or when the full return value is buffered at the dispatcher)?

shikokuchuo commented 1 year ago

The stats on dispatcher are actually not complicated. They are just counters that are incremented:

This is all done in R, so nothing to do with buffered messages or anything like that.

wlandau commented 1 year ago

That helps, thanks. As you say, straightforward and not at the C or NNG level.

I started keeping track of individual snapshots of the assigned and complete stats, rather than just the sum. In the reprex below, there are 60000 tasks, and each server has maxtasks = 100.

Some servers show 101 tasks assigned, which makes sense because task dispatch and server termination can be asynchronous. And indeed, the total assigned stat has always shown the correct number of tasks, so I don't think there is anything to worry about with regard to assigned.

But for the complete statistics, some servers show a count of 99. Is this an expected result when maxtasks = 100 and online == 0 and instance == 1? (Either I am missing something, which is likely, or maybe there is a task being sent back without being counted.)

library(crew)
library(mirai)

packageVersion("nanonext")
#> [1] ‘0.9.0.9008’
packageVersion("mirai")
#> [1] ‘0.8.7.9027’
packageVersion("crew")
#> [1] ‘0.2.1.9011’

throttler <- crew::crew_schedule()
schedule <- crew::crew_schedule()
schedule$start()

mirai::daemons(
  n = 20L,
  url = "ws://127.0.0.1:5000",
  dispatcher = TRUE,
  token = TRUE
)

# Start all servers right away to simplify things.
for (url in environment(mirai::daemons)$..$default$urls) {
  launch_server(url = url, maxtasks = 100L)
}

envir <- new.env(parent = emptyenv())
envir$assigned <- list()
envir$complete <- list()
envir$workers <- list()

# scale() takes a snapshot of the stats of the servers.
# If the server is online (or never launched) then 
# assigned and complete are 0. Otherwise, we note the values from
# daemons() at that moment. So any stats above 0 are the final
# stats from the last completed instance of the server.
scale <- function(envir) {
  info <- as.data.frame(daemons()$daemons)
  done <- info$online < 1L & info$instance > 0L
  envir$assigned[[length(envir$assigned) + 1L]] <- done * info$assigned
  envir$complete[[length(envir$complete) + 1L]] <- done * info$complete
  for (index in which(done)) {
    url <- saisei(i = index)
    if (!is.null(url)) {
      envir$workers[[index]] <- callr::r_bg(
        func = function(url) mirai::server(url = url, maxtasks = 100L),
        args = list(url = url)
      )
    }
  }
}

# Run the tasks.
index <- 0L
n_tasks <- 60000L
while (index < n_tasks || schedule$nonempty()) {
  if (!throttler$throttle()) {
    scale(envir)
  }
  if (index < n_tasks) {
    index <- index + 1L
    cat("push", index, "\n")
    task <- mirai(index, index = index)
    schedule$push(task)
  }
  if (schedule$nonempty()) {
    schedule$collect()
    task <- schedule$pop()
    if (!is.null(task)) {
      cat("pop", task$data, "\n")
    }
  }
}

# Terminate all the servers and take 1 final snapshot.
purrr::walk(envir$workers, ~.x$kill())
scale(envir)

# Matrices with 1 column per server and 1 row per snapshot.
assigned <- do.call(rbind, envir$assigned)
complete <- do.call(rbind, envir$complete)

sum(assigned) # The assigned stats are okay.
#> [1] 60000

sum(complete) # The "complete" stats look too low because all the tasks actually finished.
#> [1] 59940

# Let's look at the distribution of tasks of workers that completed,
# excluding the ad hoc snapshot at the end.
# Sometimes more than 100 tasks were assigned, which makes sense.
table(assigned[-nrow(assigned), ])
#>    0  100  101 
#> 5882  578   20

# But what would cause some self-terminating servers to count 99 complete tasks
# even though maxtasks = 100?
table(complete[-nrow(complete), ])
#>    0   99  100 
#> 5882   59  539 

daemons(0L)
shikokuchuo commented 1 year ago

That doesn't look right.

As this is a throughput test, it might be possible something like this is happening:

A different way to do this would be for dispatcher to record the cumulative stats and not reset. The in the above case:

As it seems this statistic is not driving anything, it is just a matter of if you want to ensure 'accounting' accuracy on your end from using these stats - it is also possible to log complete stats through counting the resolved tasks themselves for example.

shikokuchuo commented 1 year ago

Incidentally https://github.com/shikokuchuo/mirai/commit/2200fac9e28ad4109d324aaa080ffcf6c226cbe4 helps, reducing the discrepancy a lot, but not eliminating altogether.

wlandau commented 1 year ago

A different way to do this would be for dispatcher to record the cumulative stats and not reset.

It would be amazing if I could get cumulative assigned and complete stats directly from the dispatcher. As well as robustly solving this issue so crew can more accurately auto-scale, this would simplify crew a whole lot and reduce pooling.

wlandau commented 1 year ago

As it seems this statistic is not driving anything, it is just a matter of if you want to ensure 'accounting' accuracy on your end from using these stats

I think I am doing all I can using the information from the dispatcher. Accurate accounting matters for auto-scaling because as we found earlier, backlogged workers need to be re-launched. Workers incorrectly flagged as backlogged will have unnecessarily launches, and that could waste resources.

wlandau commented 1 year ago

it is also possible to log complete stats through counting the resolved tasks themselves for example.

This would need to be done on a server-by-server basis. Unless I am missing something, I think it would be difficult to attribute all resolved tasks to the servers they ran on for the purposes of assessing backlog. I would need to download all the data from all the resolved tasks and then look for the metadata from crew which tracks this using its own environment variables. For large data objects or lots of tasks, I do not think crew could keep up.

shikokuchuo commented 1 year ago

A different way to do this would be for dispatcher to record the cumulative stats and not reset.

It would be amazing if I could get cumulative assigned and complete stats directly from the dispatcher. As well as robustly solving this issue so crew can more accurately auto-scale, this would simplify crew a whole lot and reduce pooling.

This is done in https://github.com/shikokuchuo/mirai/commit/1251a5492e6af43dfe1b1ac3a20e08633c87e941 Interestingly this doesn't actually break {crew} tests, but you will have to re-work your scaling logic based on this.

I also tested your original example above https://github.com/wlandau/crew/issues/90#issuecomment-1593495734 and it still completes fine. If you call daemons() at the end you get a perfect set of 60,000 assigned and complete.

wlandau commented 1 year ago

Wow, this is absolutely perfect! Thank you so much!

I think it doesn't break crew because the throughput tests are not automated, and errors in the counts would probably only cause extra servers to launch. I use mocking on a made-up daemons matrix to make sure crew's internal bookkeeping is correct.

shikokuchuo commented 1 year ago

Great that it all worked out as it simplifies things on both ends! I would have suggested this sooner but I was under the impression you needed the per instance stats to drive your autoscaling.

I am going to strip out refhook (for now) and attempt another CRAN release of mirai as soon as you confirm https://github.com/wlandau/crew/issues/88 is fixed. This should simplify development going forward and provide a good base for adding further features such as TLS (almost ready in nanonext).

shikokuchuo commented 1 year ago

Similarly - I should just ask - do you need the instance counter to reset to 0 when saisei() is called or can that just keep incrementing when a new server connects?

That seems to be the only other place these stats reset. This shouldn't cause any issues, but with all the async stuff that is going on, it seems cleaner if this can be avoided.

wlandau commented 1 year ago

Great that it all worked out as it simplifies things on both ends! I would have suggested this sooner but I was under the impression you needed the per instance stats to drive your autoscaling.this

Similarly - I should just ask - do you need the instance counter to reset to 0 when saisei() is called or can that just keep incrementing when a new server connects?

I have no need for assigned or complete to ever reset for a given mirai client, even during saisei(). As of mirai 0.8.7.9029, all 4 counters perfectly suit the needs of crew.

I am going to strip out refhook (for now) and attempt another CRAN release of mirai as soon as you confirm https://github.com/wlandau/crew/issues/88 is fixed. This should simplify development going forward and provide a good base for adding further features such as TLS (almost ready in nanonext).

A CRAN release of mirai would be incredible! After the next CRAN releases of mirai and crew, I will eagerly start to publicly spread the word about this powerful tool stack we built together. I will check about #88 when I get home from StanCon. I should know by Saturday if everything works. (Given that I have not seen any hanging in the throughput tests on my Ubuntu laptop since you implemented https://github.com/shikokuchuo/mirai/commit/ce6b92f0b2f3cae67307152ac7476d66d3654a42, I am optimistic).

shikokuchuo commented 1 year ago

Great that it all worked out as it simplifies things on both ends! I would have suggested this sooner but I was under the impression you needed the per instance stats to drive your autoscaling.this Similarly - I should just ask - do you need the instance counter to reset to 0 when saisei() is called or can that just keep incrementing when a new server connects?

I have no need for assigned or complete to ever reset for a given mirai client, even during saisei(). As of mirai 0.8.7.9029, all 4 counters perfectly suit the needs of crew.

Yes, I was referring specifically to instance resetting on saisei(). I can't see it causing any issues so I am relaxed either way but it would be slightly cleaner if it didn't reset. But then you would probably need to keep track of instance number to know whether it has incremented. So on balance probably better as it is.

I am going to strip out refhook (for now) and attempt another CRAN release of mirai as soon as you confirm #88 is fixed. This should simplify development going forward and provide a good base for adding further features such as TLS (almost ready in nanonext).

A CRAN release of mirai would be incredible! After the next CRAN releases of mirai and crew, I will eagerly start to publicly spread the word about this powerful tool stack we built together. I will check about #88 when I get home from StanCon. I should know by Saturday if everything works. (Given that I have not seen any hanging in the throughput tests on my Ubuntu laptop since you implemented shikokuchuo/mirai@ce6b92f, I am optimistic).

That's a good timeframe. I am all prepared on my end. Also for the record I re-implemented a more ambitious fix in https://github.com/shikokuchuo/mirai/commit/2200fac9e28ad4109d324aaa080ffcf6c226cbe4, which aims not to waste any cycles.