wlandau / crew

A distributed worker launcher
https://wlandau.github.io/crew/
Other
129 stars 4 forks source link

Performance of {crew} vs {clustermq} #81

Closed wlandau closed 1 year ago

wlandau commented 1 year ago

Because of the blazing speed of mirai, I think crew has the potential to reach speeds comparable to clustermq. But so far, crew version 0.2.1, looks to be slower.

crew performance

In the following example, the timed part of the code took 10.477 seconds to complete on my 4-core Ubuntu machine.

library(crew)
controller <- crew_controller_local(workers = 4L)
controller$start()
controller$launch(n = 4L)
names <- character(0L)
index <- 0L
n_tasks <- 6000L
system.time(
  while (index < n_tasks || !(controller$empty())) {
    if (index < n_tasks) {
      index <- index + 1L
      controller$push(
        name = as.character(index),
        command = TRUE
      )
    }
    out <- controller$pop()
  }
)
controller$terminate()

Replacing system.time() with proffer::pprof(), it looks like task management is the bottleneck. There may be a way to rework the R code to make this more efficient.

crew

clustermq performance

The equivalent clustermq example only took 3.060 seconds.

library(clustermq)
options(clustermq.scheduler = "multicore")
f <- function(i) {
  crew::crew_eval(quote(TRUE))
}
system.time(out <- Q(fun = f, i = seq_len(6000), n_jobs = 4, verbose = FALSE))

And the flame graph:

Screenshot_20230523_081514

wlandau commented 1 year ago

Looking at the crew flame graph, I wonder if easy speedups can be reached if I use:

  1. .subset() instead of $ (and something equally fast to replace $<-), and
  2. Lists instead of tibbles for task output.
wlandau commented 1 year ago

.subset2() seems to be able to select fields of R6 classes.

nviets commented 1 year ago

I think R itself is going to be the limiting factor. Are there any bits that good be rewritten in C++?

brendanf commented 1 year ago

It seems like the largest single chunk is the updates to self$log.

If I'm not mistaken, you can use get() and assign() on an R6 object as replacements for $ and $<- which avoid dispatch. However I don't know any way in R to avoid using $ or [[ in the left hand side of something like self$log$popped_tasks[index] <- self$log$popped_tasks[index] + 1L. (Except of course doing it in C or C++).

As an aside, should this

         self$log$popped_errors[index] <- self$log$popped_errors[index] +
            !anyNA(out$error)
          self$log$popped_warnings[index] <-
            self$log$popped_warnings[index] + !anyNA(out$error)

be out$warning at the end of the last line?

shikokuchuo commented 1 year ago

I noticed you classed your crew_monad but it seems only to add a print method. If you don't class it, you can use [[ on it without the extra method dispatch time, rather than the clunky .subset2(). This applies generally.

Using lists would be faster than tibbles, but it seems would negate some of the appeal of the nice interface that crew offers.

shikokuchuo commented 1 year ago

I'll offer up my fast df/tibble creator. If you are creating the tibbles yourself, you can cut out all the validation code for a dramatic speedup. All you need is to add the following attributes to a list: (i) names, (ii) class, (iii) rownames. Illustrative example below.

cols <- 8L
colnames <- letters[seq_len(cols)]
rows <- 100L

df <- vector(mode = "list", length = cols)
for (i in seq_along(df))
  df[[i]] <- runif(rows)

attributes(df) <- list(names = colnames,
                       class = c("tbl_df", "tbl", "data.frame"),
                       row.names = .set_row_names(rows)))

Comes courtesy of ichimoku::matrix_df() a fast matrix to data frame converter.

wlandau commented 1 year ago

Thanks everyone, this is such helpful advice. Changing/limiting interactions with tibbles seems to be improving things already.

wlandau commented 1 year ago

I also wonder how much speed can be gained by simply removing the class attribute of all the R6 classes. This would probably require a downstream update to targets, which would take time to roll out at this point, but it would get there eventually.

wlandau commented 1 year ago

I also wonder how much speed can be gained by simply removing the class attribute of all the R6 classes.

Doesn't seem to make a difference.

wlandau commented 1 year ago

At this point, this is what the flame graph looks like on my Macbook:

Screenshot 2023-05-24 at 1 26 29 PM

And maybe the remaining bottlenecks come from the fact that crew is polling-based and single-task-based, so methods push(), pop(), and empty() need to be called a lot.

wlandau commented 1 year ago

A more fair comparison is the following test, and it runs in about 3.4 seconds locally (the system.time() part):

library(crew)
controller <- crew_controller_local(workers = 4L)
controller$start()
controller$launch(n = 4L)
names <- character(0L)
index <- 0L
n_tasks <- 6000L
Sys.sleep(5)
system.time({
  for (task in seq_len(n_tasks)) {
    controller$push(
      name = as.character(index),
      command = TRUE,
      scale = FALSE,
      seed = 0L
    )
  }
  while (length(controller$queue) > 0L) {
    controller$collect()
  }
})
controller$terminate()
shikokuchuo commented 1 year ago

At this point, this is what the flame graph looks like on my Macbook:

Screenshot 2023-05-24 at 1 26 29 PM

And maybe the remaining bottlenecks come from the fact that crew is polling-based and single-task-based, so methods push(), pop(), and empty() need to be called a lot.

As sample showed up on the chart, I made a quick search as I know this to be slow. It is used for setting the seed in a few places with a random number:

seed = sample.int(n = 1e9L, size = 1L)

You can just simply replace with

seed = NULL

as set.seed(seed = NULL) re-initialises the RNG.

or if you need to record the actual seed, you could use

seed = as.integer(random() / 2)

for a c. 8x speed up. The range of nanonext::random() is from 0 to 2x max integer value (as the underlying C value is a 32bit unsigned int).

wlandau commented 1 year ago

Thanks, @shikokuchuo. Implemented just now.

My original thinking was that the default seed should be tied to the RNG state of the calling session, but I think crew can let go of that because the submission/collection order of tasks is not reproducible anyway.

wlandau commented 1 year ago

After 7cb51113bbb8141d6adae8c51b7b51b07a126012, the example from https://github.com/wlandau/crew/issues/81#issuecomment-1561805942 looks like this (on the Macbook):

Screenshot 2023-05-24 at 4 51 17 PM

collect() is faster now because of improvements to vectorization. This makes me think crew controllers should have multi-push and multi-pop methods. Maybe that's the right way to handle functional programming, as opposed to a purrr-like package (c.f. #62).

wlandau commented 1 year ago

Moving to a more focused issue about multi-push and multi-pop.

wlandau commented 1 year ago

On second thought, efficiency improvements may also require a more efficient approach to queueing on crew's end.

wlandau commented 1 year ago

I should probably try using environments/hash tables for the queues instead of lists that get incremented 1 task at a time.

brendanf commented 1 year ago

x[1] <- NULL does result in a complete copy of x in every iteration; you are presumably also causing a copy every time an element is added. Assuming my computer is about as fast as yours, this is a few percent of the total time. (I don't know the whole structure of the list elements, so I just made something up which is more complicated than a single number.)

> x <- rep(list(list(error = NULL, result = 3, warning = "")), 6000)
> system.time(while (length(x) > 0) x[1] <- NULL)
   user  system elapsed 
  0.117   0.010   0.127

How would you implement a queue in an environment? My first stab at it is much slower than the list version:

> x <- new.env()
> for (i in 1:6000) assign(sprintf("q%04d", i), list(error = NULL, result = 3, warning = ""), x)
> system.time(while (length(x) > 0) remove(list = ls(x)[1], pos = x))
   user  system elapsed 
  5.212   0.000   5.220 

The docs for ls() warn that a large fraction of the execution time is sorting the results; experiment verifies that this is the case, and using ls(sort = FALSE) is much faster than the list version. This one also includes timing on filling the queue:

> x <- new.env()
> system.time(for (i in 1:6000) assign(sprintf("q%04d", i), list(error = NULL, result = 3, warning = ""), x))
   user  system elapsed 
  0.016   0.000   0.016
> system.time(while (length(x) > 0) remove(list = ls(x, sorted = FALSE)[1], pos = x))
   user  system elapsed 
  0.002   0.000   0.003

If the results aren't coming out sorted, then it's not really a queue... but since we are doing asynchronous execution anyway, maybe it doesn't matter if pop() is FIFO.

Using a non-hash environment seems to yield a stack, but then it's slower than the list version:

> x <- new.env(hash = FALSE)
> system.time(for (i in 1:6000) assign(sprintf("q%04d", i), list(error = NULL, result = 3, warning = ""), x))
   user  system elapsed 
  0.119   0.000   0.119
> ls(x, sort = FALSE)[1]
[1] "q6000"
> ls(x, sort = FALSE)[6000]
[1] "q0001"
>system.time(while (length(x) > 0) remove(list = ls(x, sorted = FALSE)[1], pos = x))
   user  system elapsed 
  0.690   0.007   0.699

And popping the last element rather than the first (so it's FIFO) is even slower:

system.time(while (length(x) > 0) remove(list = ls(x, sorted = FALSE)[length(x)], pos = x))
   user  system elapsed 
  1.001   0.000   1.002
wlandau commented 1 year ago

x[1] <- NULL does result in a complete copy of x in every iteration; you are presumably also causing a copy every time an element is added.

That certainly explains a lot.

How would you implement a queue in an environment?

I found [[<- to be much faster than assign(), and I found names() to be much faster than ls() even when sort = FALSE. As of f8290b5a030d6986cfba17a5a80eb55aa02c339c, I am using environments as hash tables for controller$queue and controller$results where mirai objects are stored. Now, as far as adding/removing objects in those environments, the only non-instantaneous part seems to be crew::crew_random_name() to generate unique IDs. There is still an overall bottleneck, but more on that later.

wlandau commented 1 year ago

As I mentioned, in f8290b5a030d6986cfba17a5a80eb55aa02c339c I switched crew to using environments to store mirai objects. It's not that much faster, but it does nicely consolidate all the slowness into a single identifiable bottleneck.

Now when I run:

library(crew)
controller <- crew_controller_local(workers = 4L)
controller$start()
controller$launch(n = 4L)
Sys.sleep(5)
index <- 0L
n_tasks <- 6000L
Sys.sleep(5)
px <- proffer::pprof({
  for (task in seq_len(n_tasks)) {
    controller$push(
      name = as.character(index),
      command = TRUE,
      scale = FALSE,
      seed = 0L
    )
  }
  while (length(controller$queue) > 0L) {
    controller$collect()
  }
})
controller$terminate()

I see the same clear bottleneck in both my Macbook and Ubuntu machines (although slightly less severe on the Macbook for some reason):

output

This bottleneck happens in the following lines of code. Below, queue is a new.env(hash = TRUE, parent = emptyenv()) environment containing mirai tasks. (And for what it's worth, crew gives each mirai object special attributes like "name" and "command" for internal bookkeeping purposes.)

https://github.com/wlandau/crew/blob/f8290b5a030d6986cfba17a5a80eb55aa02c339c/R/crew_controller.R#L391-L398

eapply(queue, .unresolved) is a more concise version of the above, but it's a tiny bit slower.

I don't know if there is a way in R to loop over an environment of mirai objects any faster, but if I believe the flame graph, then a solution to this bottleneck would make crew up to 8x faster in clustermq-like cases.

shikokuchuo commented 1 year ago

This bottleneck happens in the following lines of code. Below, queue is a new.env(hash = TRUE, parent = emptyenv()) environment containing mirai tasks. (And for what it's worth, crew gives each mirai object special attributes like "name" and "command" for internal bookkeeping purposes.)

https://github.com/wlandau/crew/blob/f8290b5a030d6986cfba17a5a80eb55aa02c339c/R/crew_controller.R#L391-L398

eapply(queue, .unresolved) is a more concise version of the above, but it's a tiny bit slower.

I don't know if there is a way in R to loop over an environment of mirai objects any faster, but if I believe the flame graph, then a solution to this bottleneck would make crew up to 8x faster in clustermq-like cases.

I shouldn't think the hashed environment gives you much advantage as you don't need to retrieve or replace values by name/key.

I might be over-simplifying, but I think you can revert to using a list, and then it seems just this will do:

as.logical(lapply(queue, .unresolved))

Or refactor your code so that's all you need?

.unresolved() is guaranteed to return TRUE or FALSE in any case.

brendanf commented 1 year ago

eapply(queue, .unresolved) is a more concise version of the above, but it's a tiny bit slower.

That is depressing, this seems like the prototypical use case for eapply(). Presumable it's because it doesn't have an option to specify the type for the result (as vapply), so it can't preallocate.

I don't know if there is a way in R to loop over an environment of mirai objects any faster, but if I believe the flame graph, then a solution to this bottleneck would make crew up to 8x faster in clustermq-like cases.

Does that mean that you think the actual calls to .unresolved() should be much faster? If the potential savings is just the rest of vapply() then it looks more like a 2x speedup.

Did you try vapply(X=as.list(queue), FUN = .unresolved, FUN.VALUE = logical(1L), USE.NAMES = TRUE)?

brendanf commented 1 year ago

I think that my suggestion would require allocating an extra list for the result of as.list(). The question is whether that is faster than looking up every element of the queue by name.

wlandau commented 1 year ago

I just tried not_done <- lapply(X = as.list(queue), FUN = .unresolved), and the example yields the following flame graph on my Macbook:

Screenshot 2023-05-25 at 12 08 36 PM

as.list(queue) is only about 16% of the execution time, and there are other bottlenecks as well.

shikokuchuo commented 1 year ago

I just tried not_done <- lapply(X = as.list(queue), FUN = .unresolved), and the example yields the following flame graph on my Macbook:

Screenshot 2023-05-25 at 12 08 36 PM

as.list(queue) is only about 16% of the execution time, and there are other bottlenecks as well.

Forget my suggestion about reverting to lists - I forget you were trying to avoid the other duplication costs. Just on the above, lapply() should be able to iterate over the environment without needing as.list(), although I'm not at a computer to check at the moment.

brendanf commented 1 year ago

I just tried not_done <- lapply(X = as.list(queue), FUN = .unresolved), and the example yields the following flame graph on my Macbook: Screenshot 2023-05-25 at 12 08 36 PM as.list(queue) is only about 16% of the execution time, and there are other bottlenecks as well.

Forget my suggestion about reverting to lists - I forget you were trying to avoid the other duplication costs. Just on the above, lapply() should be able to iterate over the environment without needing as.list(), although I'm not at a computer to check at the moment.

You're right! vapply() and lapply() also work on environments. I never knew that!

brendanf commented 1 year ago

How is the speed comparing to clustermq at this point?

wlandau commented 1 year ago

Forget my suggestion about reverting to lists - I forget you were trying to avoid the other duplication costs. Just on the above, lapply() should be able to iterate over the environment without needing as.list(), although I'm not at a computer to check at the moment.

Yes, great point. Further profiling shows lapply() converts the environment to a list behind the scenes anyway, so the flame graph look the same. But the bottleneck is much more concise now, which is really helpful.

https://github.com/wlandau/crew/blob/3f1dfcaafd0dd25e8114637a8d669c6683d81883/R/crew_controller.R#L390

How is the speed comparing to clustermq at this point?

On my Macbook,crew is still a little slower than clustermq, about 3.5-4 seconds for the most recent example. Haven't tried the latest crew on my Ubuntu machine, but lately it has been about 8 seconds or so, mainly due to more time spent in that lapply()/eapply() bottleneck.

wlandau commented 1 year ago

You know what? Maybe I am optimizing the wrong thing, or reading the profiler output wrong. If I add a sleep to let the tasks finish:

for (task in seq_len(n_tasks)) {
    controller$push(
      name = as.character(index),
      command = TRUE,
      scale = FALSE,
      seed = 0L
    )
  }
  Sys.sleep(3)
  while (length(controller$queue) > 0L) {
    controller$collect(throttle = TRUE)
  }

then the controller$collect() "bottleneck", including lapply()/eapply(), vanishes:

Screenshot 2023-05-25 at 1 59 58 PM

Alternatively, when I turn on throttling in collect():

  for (task in seq_len(n_tasks)) {
    controller$push(
      name = as.character(index),
      command = TRUE,
      scale = FALSE,
      seed = 0L
    )
  }
  while (length(controller$queue) > 0L) {
    controller$collect(throttle = TRUE)
  }

then the "bottleneck" appears to be in length(controller$queue):

Screenshot 2023-05-25 at 2 05 28 PM

These may not be bottlenecks at all, they may just be the places where the profiler just happens to be sampling while crew waits for mirai tasks to resolve.

But I am pretty sure deparse_safe() and crew_random_name() are true profiler findings, so I will try to find faster alternatives to those.

wlandau commented 1 year ago

After solving #83, here is what I see in the flame graph after returning to the original crew example from https://github.com/wlandau/crew/issues/81#issue-1721943139.

new

I do not believe controller$empty() is a real bottleneck because all it does is check the length of a couple environments. I am pretty sure it is just waiting for tasks at that point.

https://github.com/wlandau/crew/blob/ef35e068f84d66be3233832b1e4f2c8eaa4a501c/R/crew_controller.R#L149-L151

wlandau commented 1 year ago

Speeds are looking really good if we remove delays waiting for tasks. The following is on my Ubuntu machine, which has proved much slower than my Macbook for these tests.

library(crew)
controller <- crew_controller_local(workers = 4L)
controller$start()
controller$launch(n = 4L)
Sys.sleep(5)

index <- 0L
n_tasks <- 6000L
system.time({
  for (index in seq_len(n_tasks)) {
    controller$push(command = TRUE)
  }
})
#>    user  system elapsed 
#>   0.571   0.022   1.122 

Sys.sleep(5)
system.time(controller$collect())
#>    user  system elapsed 
#>   0.024   0.000   0.025 

Sys.sleep(5)
system.time({
  for (index in seq_len(n_tasks)) {
    controller$pop()
  }
})
#>    user  system elapsed 
#>   0.570   0.000   0.614 

controller$terminate()
wlandau commented 1 year ago

And the analogous flame graphs:

library(crew)
controller <- crew_controller_local(workers = 4L)
controller$start()
controller$launch(n = 4L)
index <- 0L
n_tasks <- 6000L

When pushing tasks, deparse_safe() and rlang::call2() have the more noticeable bottlenecks.

Sys.sleep(5)
proffer::pprof({
  for (index in seq_len(n_tasks)) {
    controller$push(command = TRUE)
  }
})

1

collect() on 6000 completed tasks is a breeze.

Sys.sleep(5)
proffer::pprof(controller$collect())

2

And for pop(), I suppose I could think about replacing as.list() with something else, but users are probably not going to notice the difference.

Sys.sleep(5)
proffer::pprof({
  for (index in seq_len(n_tasks)) {
    controller$pop()
  }
})

3

shikokuchuo commented 1 year ago

If we're in the business of eking out gains, one thing before I forget these details - you may pass the 'data' variables in crew_eval() to the '...' argument in mirai() again - these get used directly, whereas there is some language manipulation going on for using .args.

wlandau commented 1 year ago

Thanks! I implemented your suggestion and replaced rlang::call2() with base::as.call(). Both seem to really help (.895s down from 1.122s in the push() phase).

wlandau commented 1 year ago

The only 2 other things I can think of are to

  1. Make deparsing and storing the command optional and turned off by default.
  2. Make crew monads lists instead of environments so they can be turned into tibbles quickly.

After that, I think I will have done all the optimization I can from crew's perspective.

wlandau commented 1 year ago

After (2), the pop() phase in https://github.com/wlandau/crew/issues/81#issuecomment-1563506498 only takes 0.249 seconds.

shikokuchuo commented 1 year ago

The only 2 other things I can think of are to

1. Make deparsing and storing the command optional and turned off by default.

Deparsing is expensive. Why not just store the call (language object) - it still prints.

2. Make `crew` monads lists instead of environments so they can be turned into `tibble`s quickly.

I like this idea!

wlandau commented 1 year ago

Deparsing is expensive. Why not just store the call (language object) - it still prints.

From experience with targets, I don't like the way large sets of language objects consume memory. Probably not a huge deal, but it adds up to MBs sometimes.

wlandau commented 1 year ago

Implemented (1). The push() phase is now 0.764s, and the flame graph is beautiful.

Screenshot_20230525_174648

shikokuchuo commented 1 year ago

Deparsing is expensive. Why not just store the call (language object) - it still prints.

From experience with targets, I don't like the way large sets of language objects consume memory. Probably not a huge deal, but it adds up to MBs sometimes.

I see, in which case, I can offer the following from mirai itself for when it deparses the call from a tryCatch error:

deparse(x, backtick = TRUE, control = NULL, nlines = 1L)

Specifying the arguments led to a real speedup (I forget of what magnitude). You can specify nlines = -1L if you want to ensure you capture absolutely everything.

wlandau commented 1 year ago

Thanks! I may take you up on that eventually. For now, however, the existing deparse_safe() is nearly as fast, and paste(deparse(x, backtick = TRUE, control = NULL, nlines = -1L) seems to break my tests for reasons I do not understand.