ropensci / drake

An R-focused pipeline toolkit for reproducibility and high-performance computing
https://docs.ropensci.org/drake
GNU General Public License v3.0
1.34k stars 129 forks source link

Consider optional within-target parallelism #295

Closed wlandau closed 5 years ago

wlandau commented 6 years ago

Maybe with a cores, threads, or jobs column in the workflow plan data frame.

rkrug commented 6 years ago

Just to clarify: This additional column would specify the number of jobs created in each target, e.g. by using mclapply(... no.cores = 7) would require to set this column to 7, which would result that, if make(..., jobs = 8) only this target would be updated in parallel with one additional with a value in this column of 1.

wlandau commented 6 years ago

This additional column would specify the number of jobs created in each target, e.g. by using mclapply(... no.cores = 7) would require to set this column to 7...

This is the tricky part. How an individual target uses those 7 jobs is not so clear. To run an individual target with parallelism, you need to write extra code in your command anyway. It could be as simple as

drake_plan(
  mcmc = rstan::stan(file_in("model_file.txt"), cores = 7)
)

or

drake_plan(
  list_of_analyses = mclapply(1:8, run_analysis, mc.cores = 7)
)

But either way, it is your responsibility to write commands that can use parallelism, and the way you configure that parallelism varies widely. You might use mclapply() with the mc.cores argument, or you might call another function with its own cores or jobs argument. So it's difficult to have a general way to tell each target how many jobs it can use unless you configure your commands that way in drake_plan(). Does that make sense?

e.g. by using mclapply(... no.cores = 7) would require to set this column to 7, which would result that, if make(..., jobs = 8) only this target would be updated in parallel with one additional with a value in this column of 1.

I am not exactly sure what you mean, but I think I should explain what the jobs argument really does. Jobs is about how many targets can run simultaneously. Take the following workflow.

plan <- drake_plan(
  datasets = mclapply(1:8, get_dataset, mc.cores = 8),
  analyses = parLapply(datasets, analyze_dataset, cl = my_psock_cluster),
  summaries = lapply(analyses, summarize_analysis)
)

This workflow is inherently sequential.

vis_drake_graph(drake_config(plan), targets_only = TRUE, ncol_legend = 0)

rplot01

Whether we call make(plan, jobs = 1) or make(plan, jobs = 1024), the targets run in sequence: datasets runs, then analyses runs, then summaries runs. In other words, the jobs argument has no effect. On the other hand, you can write your commands so that the targets themselves use parallelism: for example, the mc.cores argument for datasets or the PSOCK cluster you pass to parLapply() for analyses. Either way, the jobs argument has nothing to do with this kind of parallelism. It has to do with how many targets you run at a time.

wlandau commented 6 years ago

299 is a better solution to this problem. Closing, but will still discuss.

rkrug commented 6 years ago

I completely agree - the jobs will (and should) not change anything in the running of the target. If the target uses parallelism, it uses parallelism, independent of the jobs argument. That is fine. The idea is now, that each target can have a certain cores_needed value assigned. In other words, in your example above, datasets = mclapply(1:8, get_dataset, mc.cores = 8), could get a cores_needed value of 8, as it uses a maximum of eight cores. Now, the sum of all cores_required in all parallel running targets should not be more than the value used in jobs. So it would be a kind of weighting. At the moment each target has a weighting of 1, but afterwards, I can assign a higher weighting.

But drake should not interfere with the running itself of the target.

rkrug commented 6 years ago

No - I don't think so. The problem is that with the use of the jobs argument, make() could be slower, if the number of cores are already used by a single target, while it may benefit in other parts.

wlandau commented 6 years ago

Ah, I think I finally understand the problem you are trying to solve. For local multicore parallelism, you want to make sure drake respects a set maximum number of cores. Is that right?

If so, I hesitate to implement this as a feature because jobs does not always mean "cores" or "threads". For example, future::plan(batchtools_slurm, template = "batchtools.slurm.tmpl"); make(..., parallelism = "future", jobs = 16) uses 16 nodes on a SLURM cluster, not 16 cores. It would be messy to account for this with a required_cores column and balance it with jobs. Drake would need to make complex decisions based on the type of parallelism used (the parallelism argument, and possibly the current future::plan(), and possibly a shell.sh used with "Makefile" parallelism). And there are additional complex scheduling decisions based on the number of free cores. For example, if you have a target with a large required_cores, do you restrict the number of targets you deploy, or do you wait to submit a large job until enough cores are available? I think a human could do a better job with these tradeoffs.

rkrug commented 6 years ago

That was what I was thinking about. And I see your point. But I also see the advantage of having, specifically on a on a local machine, running certain targets in parallel, and others not. One could possibly simplify this, by having a column which says allow_parallel and which specifies if this specific target should run in parallel with other targets or not? Effectively, forcing that this specific target is build using jobs = 1, while the others are build in parallel.

krlmlr commented 6 years ago

I've only skimmed through the discussion, let me add my two cents anyway.

In the workshop, I was proposing to tweak the plan to achieve this:

plan <- drake_plan(
  a = 1,
  b = 2,
  c = 3,
  bottleneck = mclapply(list(a, b, c)),
  downstream_1 = { bottleneck; a + b },
  downstream_2 = { bottleneck; b + c }
)

The bottleneck target is guaranteed to have access to all cores, because this is how the plan was designed.

Maybe we could provide a function local_cores() that returns the number of cores currently available (detectCores() - currently_running_local_jobs()). If a command queries this function, drake reserves the number of cores it has returned and will not use them until that command finishes:

plan <- drake_plan(
  a = 1,
  b = 2,
  c = 3,
  downstream_1 = a + b,
  downstream_2 = b + c,
  heavy = mclapply(list(a, b, c), mc.cores = local_cores())
)

The three targets downstream_1, downstream_2 and heavy can run in parallel. On a four-core machine, if downstream_1|2 have started already, local_cores() returns 2 when evaluating heavy, avoiding overcommitment of CPU cores. (In a HPC scenario, local_cores() always returns detectCores().)

wlandau commented 6 years ago

I really like both these ideas, Kirill, both the bottlenecking (which may belong in the best practices vignette) and a local_cores() function. I think local_cores() will be easiest to implement once we have proper scheduler (#285).

wlandau commented 5 years ago

Hmm... what about a more general interface to refer to any custom column in the plan? I think we can improve on the name, "local_setting", but it is a start.

library(drake)
drake_plan(
  a = target(
    mclapply(tasks, fun, local_setting("mc.cores"), fun_arg = local_setting("fun_arg")),
    mc.cores = 4,
    fun_arg = "with_debug"
  )
)
#> # A tibble: 1 x 4
#>   target command                                          mc.cores fun_arg 
#>   <chr>  <chr>                                               <dbl> <chr>   
#> 1 a      "mclapply(tasks, fun, local_setting(\"mc.cores\…        4 with_de…

Created on 2019-01-15 by the reprex package (v0.2.1)

wlandau commented 5 years ago

Moving to #677.

wlandau commented 5 years ago

Proposed API function from_plan():

drake_plan(
  x = target(
    mclapply(1:8, stuff, mc.cores = from_plan("cores")),
    cores = 2
  )
)