tar_make_clustermq taking a very long time to get going compared to tar_make #169

Closed kendonB closed 4 years ago

kendonB commented 4 years ago



I tried getting a clustermq job going a couple of times. It successfully got going quickly and I canceled them. Now it is just stuck before getting going. I'm guessing the cache is corrupted somehow.

It would get going very quickly when running with tar_make immediately before.

Output code

# tar_make() is up here ^
# I wasn't able to highlight further up
[kendon.bell@mahuika02 farm_profits]$ R

> library(targets)
> tar_make_clustermq(
  +   reporter = "timestamp",
  +   garbage_collection = TRUE,
  +   workers = 2
  + )
kendonB commented 4 years ago

It kept going!

Still much much slower to check the targets with tar_cluster_mq but at least it was not stuck

kendonB commented 4 years ago

And here tar_make gets allocating targets within a few seconds vs 10 minutes using clustermq.

kendonB commented 4 years ago

This is getting delayed on a dynamic target that is not that big - 300 groups.

kendonB commented 4 years ago

I also tried removing the cache and starting again so the cache doesn't seem corrupted. It is just very slow to check that dynamic target and get going.

wlandau commented 4 years ago

@kendonB, I mocked up a quick example based on what you described and tried it on an SGE cluster, and I cannot reproduce what you see.

# _targets.R
options(clustermq.scheduler = "sge", clustermq.template = "cmq.tmpl")
  tar_target(x, seq_len(300)),
  tar_target(y, x, pattern = map(x))
# console
system.time(tar_make(reporter = "silent")) # runs locally
#> user  system elapsed 
#> 1.858   0.434   5.858 
system.time(tar_make(reporter = "silent")) # skips
#> user  system elapsed 
#> 1.254   0.252   1.936 
system.time(tar_make_clustermq(reporter = "silent")) # skips
#> user  system elapsed 
#> 1.285   0.230   1.867 
system.time(tar_make_clustermq(reporter = "silent")) # runs on SGE
#> Master: [9.4s 15.3% CPU]; Worker: [avg 19.6% CPU, max 251.9 Mb]
#> user  system elapsed 
#> 2.133   0.513  10.673 
system.time(tar_make_clustermq(reporter = "silent")) # skips
#> user  system elapsed 
#> 1.323   0.256   2.235 
wlandau commented 4 years ago

I am surprised this is happening for you because tar_make_clustermq() actually runs just like tar_make() right up until it finds an outdated target with deployment = "remote".

wlandau commented 4 years ago

So I really do need a runnable reprex so I can replicate the bottleneck myself and to get to the bottom of it.

In the meantime, it might be easier to profile your example and post the flame graphs. For profiling purposes, you will need to set callr_function = NULL.

px <- pprof(
  tar_make_clustermq(callr_function = NULL),
  host = "", # So you can navigate a browser to the head node of the cluster
  port = 8888
)["nodename"] # used to build the URL of the current node
wlandau commented 4 years ago

I also recommend updating to the latest version of targets. A while back, I fixed an issue that was causing the slice dependencies of branches to get re-hashed every time.

kendonB commented 4 years ago

The targets version is recent:

 targets     * 2020-09-21 [1] Github (wlandau/targets@17a2346)

This is what I get when trying to profile.

> browseURL("")
> xdg-open: no method available for opening ''

Let me see if I can pull out just the public data from the project and send it to you.

kendonB commented 4 years ago

polygons_point_sample above isn't small (~2GB) so it might be that it's loading the whole lot before checking with cluster_mq vs regular make?

kendonB commented 4 years ago

Actually the behaviour would be consistent with checking the dependencies for all the slices before sending work out. Is there an easier way for me to step through what is being run on the head process from an interactive session?

wlandau commented 4 years ago

Have you updated your installation of targets since Because if _targets/meta/meta was produced before that commit, that could easily explain the bottleneck you're seeing now when it comes to skipping up-to-date targets.

wlandau commented 4 years ago

If you run any of the tar_make*() functions with callr_function = NULL, it will run the master process in the current R session and all the known debugging tools will be available to you. And profiling with proffer will work.

kendonB commented 4 years ago

I'm running the current version and removed the targets folder and remade.

What can I put inside debug(.) to get it to browse after it starts checking dependencies? I'm struggling to step through

wlandau commented 4 years ago

First of all, is the problem gone after the update? If not, would you please post a reprex that I can run so I can figure out where the bottleneck is coming from?

If not, I recommend profiling with proffer so we're really sure we're debugging where we need to. It will might involve tar_load_dep(), but we should really profile to be sure.

kendonB commented 4 years ago

I have been running the latest version. To be sure, I also removed the targets folder yesterday and remade. Still seeing the problem. profiling with proffer doesn't seem to be working for me on my cluster:

I should be able to send the project to you - I'm just in the process of getting it to work with just the public data

wlandau commented 4 years ago

What does proffer::pprof_sitrep() say? You may need to locally install Go and pprof with proffer::install_go(). See and for details.

kendonB commented 4 years ago

Got it - proffer won't work because I would need to authenticate through the browser and I have no way to do that on my cluster

wlandau commented 4 years ago

You could run record_pprof() on the cluster, download the profiling samples, and then run serve_pprof() locally.

kendonB commented 4 years ago

As per Murphy, when I remove the non-public part of the project it gets going pretty fast... I will try recording the prof.

So there seems to be some interaction with downstream targets

kendonB commented 4 years ago

@wlandau I think I need to give up on this one as it's taking a bit too much time. I made the targets a bit bigger (and fewer) and now targets seems to run fine. I will revisit if I come across it again.

wlandau-lilly commented 4 years ago

Sure, just let me know.

wlandau commented 4 years ago

For reference, here is a counter-reprex with dynamic branching over subsets of a large data frame.:

# _targets.R
  tidyverse.quiet = TRUE,
  clustermq.scheduler = "sge",
  clustermq.template = "sge.tmpl"
tar_option_set(format = "fst_tbl")

big_data <- function(groups = 10, reps = 100) {
    tar_group = seq_len(groups),
    rep = seq_len(reps)
  ) %>%
    mutate(value = rnorm(n()))

mean_data <- function(data) {
  data %>%
    summarize(group = tar_group[1], mean = mean(value))

    big_data(300, 4e5),
    iteration = "group",
    deployment = "local"
  tar_target(mean, mean_data(data), pattern = map(data))


Up-to-date targets get skipped fast enough.

> library(targets)
> system.time(tar_make_clustermq(workers = 10))
kendonB commented 4 years ago

I now think that it was partly due to garbage_collection = TRUE being on. Does this run in between subtargets that get checked but not run?

kendonB commented 4 years ago

The targets each take around a minute when running in a single process or with 100 workers. When I bump it up to 200, the whole system just seems to stall after the first round of allocated targets. Have you tried a project with 200 workers going at once?

Every log I examine shows this:

2020-09-23 13:01:53.719059 | > WORKER_WAIT (0.001s wait)
2020-09-23 13:01:53.719415 | waiting 8.60s
2020-09-23 13:02:02.357401 | > WORKER_WAIT (0.000s wait)
2020-09-23 13:02:02.357942 | waiting 8.60s
2020-09-23 13:02:10.970166 | > WORKER_WAIT (0.001s wait)
2020-09-23 13:02:10.970613 | waiting 8.60s
2020-09-23 13:02:19.605107 | > WORKER_WAIT (0.001s wait)
2020-09-23 13:02:19.605544 | waiting 8.60s
2020-09-23 13:02:40.884501 | > DO_CALL (0.403s wait)
Registered S3 method overwritten by 'pryr':
  method      from
  print.bytes Rcpp
237 MB # This is printed by the target code
2020-09-23 13:04:39.639825 | eval'd: target_run_remotetargetgarbage_collection

and the master is idle: CPU = 0%. Same as the one I posted before, where not all of the targets (172/200) initially got allocated.

Note that this is without garbage collection on.

wlandau commented 4 years ago

I now think that it was partly due to garbage_collection = TRUE being on. Does this run in between subtargets that get checked but not run?

Thanks for catching that. Garbage collection should not run for skipped targets. Should be fixed in b1f1a3b16239d5dafa112b7e63e9ecb4b618edde.

wlandau commented 4 years ago

An inexplicable stall sounds a lot like a memory consumption issue to me. Once again, I really need a reprex.

Also, like I said in, more workers do not always help. You might not actually be using them all. The master process has a lot of work to do, especially if you task it with the data management (default), and it may not be able to keep up. Please keep eye on those clustermq occupancy messages and consider batching. One minute per branch is actually quite fast, and you may be creating overhead, which in turn means the idle workers need to stay up longer. In fact, my company's sys admins recently told us to keep it to 100 workers or fewer.

Now for some guesswork about that stall: tar_option_set(storage = "remote", retrieval = "remote") might help (equivalent to drake::make(caching = "worker")). But keep in mind that if you do that, each worker is going to have to load the entire upstream dataset to get the slice it needs. Local branching early on could potentially address this by breaking up the data into dedicated branch targets first. Sketch:

# _targets.R
# ...
tar_option_set(storage = "remote", retrieval = "remote")
    big_data(300, 4e5),
    iteration = "group",
    deployment = "local"
  tar_target(slice, data, pattern = map(data), deployment = "local"),
  tar_target(analysis, run_analysis(slice), pattern = map(slice))
wlandau commented 4 years ago

I did try the following pipeline on 200 workers on SGE, and it completed just fine.

  tidyverse.quiet = TRUE,
  clustermq.scheduler = "sge",
  clustermq.template = "sge.tmpl"
tar_option_set(format = "fst_tbl")

big_data <- function(groups = 10, reps = 100) {
    tar_group = seq_len(groups),
    rep = seq_len(reps)
  ) %>%
    mutate(value = rnorm(n()))

mean_data <- function(data) {
  # This sleep line just makes sure the whole array job dequeues before all the targets complete.
  # The pipeline runs fine for me with or without sleeping.
  data %>%
    summarize(group = tar_group[1], mean = mean(value))

    big_data(300, 1e5),
    iteration = "group",
    deployment = "local"
  tar_target(slice, data, pattern = map(data), deployment = "local"),
  tar_target(mean, mean_data(slice), pattern = map(slice))
wlandau commented 4 years ago

Here's another discussion of batching:

kendonB commented 4 years ago

I did try the following pipeline on 200 workers on SGE, and it completed just fine.

  tidyverse.quiet = TRUE,
  clustermq.scheduler = "sge",
  clustermq.template = "sge.tmpl"
tar_option_set(format = "fst_tbl")

big_data <- function(groups = 10, reps = 100) {
    tar_group = seq_len(groups),
    rep = seq_len(reps)
  ) %>%
    mutate(value = rnorm(n()))

mean_data <- function(data) {
  # This sleep line just makes sure the whole array job dequeues before all the targets complete.
  # The pipeline runs fine for me with or without sleeping.
  data %>%
    summarize(group = tar_group[1], mean = mean(value))

    big_data(300, 1e5),
    iteration = "group",
    deployment = "local"
  tar_target(slice, data, pattern = map(data), deployment = "local"),
  tar_target(mean, mean_data(slice), pattern = map(slice))

Did you find that all the workers ended up doing something here? You could monitor using reporter = "summary"?

wlandau commented 4 years ago

The workers in the array job on my cluster queue slowly, and I observed that new targets deployed pretty much as fast as new SGE workers initialized. So it looks like all the workers ended up doing something.