mschubert / clustermq

R package to send function calls as jobs on LSF, SGE, Slurm, PBS/Torque, or each via SSH
https://mschubert.github.io/clustermq/
Apache License 2.0
146 stars 27 forks source link

Delaying "send" events to stagger jobs #213

Closed rimorob closed 3 years ago

rimorob commented 3 years ago

I have run into a strange but very understandable problem. I run iterative jobs that grow larger but not too large. In particular, after a few iterations, each job starts sending down the pipe around 1GB of data to each slave node. Since my master node doesn't have any swap space, and since I use high parallelism, at some point the total buffer size allocated simultaneously by the memcpy in zmq::send exceeds the amount of RAM on the machine and memcpy fails. The solution seems to be obvious - send should wait for RAM to become available, or jobs should be staggerable, but I can't see how to implement it outside of foreach call. Is there anything that I can do to stagger the jobs being scheduled by the foreach backend?

rimorob commented 3 years ago

Looks like I was a little naive. Having looked at the package code, I now realize that all workers get set up before any network communications get launched (if I correctly understand what happens in Q_rows in this line of code: workers = workers(n_jobs, data=data, reuse=FALSE, template=template, log_worker=log_worker, verbose=verbose)

This is a potential problem with my idea - the workers get launched synchronously. It's probably also a limitation of the package if the remote jobs require a lot of data.

mschubert commented 3 years ago

We had this problem in the past where too many messages were queued for sending, filling up the main process memory. However, since then sending is already waiting for previous calls to finish before sending new ones (using ZerMQ's dont_wait=FALSE). This may still work with parallel buffers, as specified in the ZeroMQ context threads. However, these are set to 1, so they should be handled serially.

So, the expected memory of the master process is the sum of:

If your main process is using much more than that, can you please provide a reproducible example that illustrates the problem (using fake data by e.g. random sampling), incl. what memory usage you expect and what you observe instead?

Overall, there should be a better way than sending 1GB of data with each worker call. But I can't really comment on that without a proper description of the problem you are trying to solve.

rimorob commented 3 years ago

I'm working on a reproducible example. Meantime, a quick comment on 1GB of data. I need to send much less than that, actually; the problem is, I'm parallelizing inside an R6 object and I have a ton of coding dependencies that, whenever I export almost any function to foreach, pull in the whole R6 object. I'm working to clean those up as well. That said, 1GB is not crazy unreasonable in this day and age, which is why I decided to report this as a potential issue.

mschubert commented 3 years ago

I'm closing this for inactivity. If you can present a way to reproduce your problem, please reopen