HenrikBengtsson / future.batchtools

:rocket: R package future.batchtools: A Future API for Parallel and Distributed Processing using batchtools
https://future.batchtools.futureverse.org
84 stars 9 forks source link

steps to speed up job submission? #36

Open brendanf opened 5 years ago

brendanf commented 5 years ago

I'm using future.batchtools via drake, and just got my first plan running on the cluster. It seems to take about one minute for each job submitted, and since I'm trying to submit several hundred jobs, that's not ideal (although it's not a deal-breaker, because I expect each job to take many hours to finish). I'm not sure what I might be able to change in order to speed this up. I haven't dived into the code, but my idea of what needs to happen to start a worker is:

  1. analyse the code to find dependencies
  2. submit the job to the scheduler (SLURM in my case)
  3. wait for the job to be allocated
  4. wait for the worker to start up (and load libraries?)
  5. send data to the worker (and libraries?)

Is this basically accurate?

Does the worker load libraries already installed on its node, or are all libraries sent to the worker by the master? If the latter, then reducing library dependencies seems like a potential avenue to try.

HenrikBengtsson commented 5 years ago

Hi. It's only the first two steps;

  1. analyse the code to find dependencies
  2. submit the job to the scheduler (SLURM in my case)

that is handled by R and that affects how long it takes before your jobs/tasks appear on the queue (i.e. before jobs are submitted). Based on this, if you observe:

It seems to take about one minute for each job submitted

then I would say it is Step 2 that takes most of that time. I would be surprised if the static-code analysis (Step 1) for identifying code dependencies (globals and packages) would take that long - anything more than 1-2 seconds for that step would be surprising - even for very very large code bases.

Instead, I suspect that you might have large global objects being "exported" in Step 2 that might cause the slowdown. Since we're utilizing batchtools here, this means that those globals are serialized to files first (part of "the batchtools registry"). Writing large objects to file can take time and if there are lots of processes writing at the same time (e.g. in a multi-tenant HPC environment), then the shared file system might be the bottleneck.

Moreover, it might be the scheduler thresholds how many jobs you can submit within a certain period of time. If this is the case, I think, but not 100% sure, then batchtools will keep trying until each job submission is accepted. This could also explain the slow submission.

FYI, as soon as they're on the queue, it's all up to the job scheduler to decide when they're allocated and started on a job queue. When a job is launched, then batchtools reads its registry into R, i.e. it loads required R packages and the globals into the R workers session and evaluates the future expression. When completed, the results are written back to file and eventually read by the main R session (all via the batchtools API).

Does the worker load libraries already installed on its node, or are all libraries sent to the worker by the master? If the latter, then reducing library dependencies seems like a potential avenue to try.

The former; batchtools, and therefore future.batchtools, assumes that the R package library on the worker's machine has the required packages installed, which is typically the case because we're assuming a shared file system where packages lives. The packages are loaded just as in a regular R session, i.e. library(pkgA). Packages are never "exported" - only global objects.

HenrikBengtsson commented 5 years ago

Forgot to clarify that from the perspective of future.batchtools, the following steps:

  1. wait for the job to be allocated
  2. wait for the worker to start up (and load libraries?)
  3. send data to the worker (and libraries?)

become:

  1. check if job is done
  2. if done, load the results
brendanf commented 5 years ago

It seems like there must be something more than SLURM throttling submissions going on. The plan is complete now and I don't really want to run it again unless I have to, but here were some observations:

This all seems consistent with your suggestion that some global object, which was gradually increasing in size with each submitted job, was getting serialized each time. This may actually be an issue on the drake side.

I didn't set up any particular logging settings, and the .future directory now seems empty except for a sessioninfo.txt file. Is there any way now (or in future runs?) to see what is being exported to the workers?

HenrikBengtsson commented 5 years ago

Thanks for the additional details. If not immediately, it'll be helpful for others looking into this problem.

  • The same jobs are dispatched much more quickly using future with "multiprocess".

This all seems consistent with your suggestion that some global object, which was gradually increasing in size with each submitted job, was getting serialized each time. This may actually be an issue on the drake side.

Note that the batchtools backend is "communicating" all data between main R process and workers via the file system. That is by design. If the file system is the bottleneck, then, yes, in-memory backends such as multisession (PSOCK; communicates via sockets) or multicore (forking) are faster. One argument behind the clustermq package is to avoid the file system and rely on ZeroMQ communication for faster HPC processing. FYI, there's a plan, and some progress have been made, to implement future.clustermq.

About "benchmark" stats etc:

kendonB commented 5 years ago

Is it feasible to speed up the transmission of the files using the qs package? https://cran.r-project.org/web/packages/qs/index.html

HenrikBengtsson commented 5 years ago

@kendonB, if so, then it would be something that needs to be implemented upstreams, i.e. in the batchtools package.

zia1138 commented 5 years ago

@HenrikBengtsson Curious if this issue has anything to do with scheduler.latency in makeClusterFunctionsSlurm().

yimyom commented 4 years ago

I don't know if this issue has been solved but I observed the same problem. In a clean environment, I did the following, (I've omitted the mandatory steps, like the registry, etc...) on a 10 nodes x 40 cores cluster:

  1. f=function(id) { Sys.sleep(10); data.frame(host=Sys.info()['nodename'],pid=Sys.getpid(),time=Sys.time(),id=id) }
  2. reg$cluster.functions=makeClusterFunctionsSlurm()
  3. ids= batchMap(f,1:1000)
  4. done=submitJobs(ids)
  5. z=reduceResults(rbind) Then a reduce to get my results, and this was done in practically no time. I monitored with watch -n 1 squeue and could see my jobs coming in and out as fast as possible (spending their 10 seconds in the queue)

Then I tried the same with future.batchtools and future.apply:

  1. plan(batchtools_slurm)
  2. z=future_sapply(1:1000, f)

And this was abysmally slow! The same watch command was showing jobs coming one every second or more.

My .batchtools.slurm.tmpl is also terribly simple. But other template files I tried didn't make any difference in the startup time.

#!/bin/bash

module load R
Rscript -e 'batchtools::doJobCollection("<%= uri %>")'

I'm sure there is something fundamentally simple I'm missing in all of this, but expect if it's a real bug, I couldn't find any doc on it either.

Any idea?

brendanf commented 4 years ago

I've come to the conclusion that the slowness for me was because my cluster is running the Lustre file system. Lustre has its advantages, but it has a high cost per file i/o operation. This is fine when you mostly read and write a few really large files in big chunks, but it seems to slow down future because of its use of the file system to pass data to workers.

HenrikBengtsson commented 4 years ago

To help clarify the observed difference in time: The batchtools batchMap()+submitJobs() approach will result in a single batchtools registry folder (= a folder structure exported from R with all exported globals, logs, and more). In contrast, the future.apply w/ future.batchtools approach will produce nbrOfWorkers() * batchtools registry folders. The default number of workers when using future.batchtools (>= 0.9.0) is 100. In other words, using the future approach, will require ~100 times more files/folders when it comes to setting up HPC jobs.

As @brendanf points out, if the file system on your HPC system is "slow"(*), this difference will be noticeable/significant.

Long-term roadmap: I'm working on an internal redesign at the lowest level of the Future API that is likely to allow map-reduce APIs such as future.apply to produce a single batchtools registry folder without even knowing about the batchtools package. I think it can work and even be introduced without breaking backward compatibility but it will require lots of work, so it's unlikely it'll be implemented with the next 12 months. It'll be part of a bigger re-design that will make it possible to implement other feature requests.

(*) It's actually really hard to produce a fast file system that can serve hundreds of users and jobs in parallel. I've seen this on parallel BeeGFS systems where lots of jobs and processes doing lots of disk I/O can slow down the performance quite a bit. It'll never be as fast as a local disk.

JZL commented 2 years ago

Hi, I was working on adding a new parallel backend to batchtools and my favorite part is how it can then automatically integrate with the broader future ecosystem.

However, exacerbated by large shared globals (but also due to the internals of the new backend) , the difference between 1 registry folder vs nbrOfWorkers()registry folders is a big difference to me. I have some hacks in mind which I thinkI can use, but I was curious if there was any news from the big redesign mentioned above? I didn't think I saw any mention of it in the NEWS or the code I've been reading, but maybe not

(Unrelated, matrixStats is awesome, I use it constantly!)

HenrikBengtsson commented 2 years ago

Cool that you're getting another batchtools backend going. I'm keen to hear about it when you get it published.

Yeah, sorry, no update on this (but bits and pieces here and there in the future ecosystem have been updated towards it).

Happy to hear you find matrixStats useful. Cheers.

JZL commented 2 years ago

Thanks for your amazingly fast response.

One pretty common use-case I hit on is having pretty small local function state, except for one moderately large shared global. It seems a slight shame to re-compress it for every registry folder/rds export per job with using future.batchtools. Do you have any advice to avoid that redundancy? I'm happy to rewrite my own things in batchMap with a single globals list but other libraries I use contain internal future's, with this same issue.

One hacky solution I've been tossing around is to just take care of loading that one variable separately. So, save that large global out into its own rds file and replace the global with just a file path to the worker function. Then in each worker, check if the variable is a file path; if so, replace it (globally) with the readRDS of that path.

At the risk of maintaing a fork of the repo, I can even do this transparently to the worker function by going through the globals list when it loads in the registry, and check for a filepath class, replacing with the readRDS before farming out.