HenrikBengtsson / doFuture

:rocket: R package: doFuture - Use Foreach to Parallelize via Future Framework
https://doFuture.futureverse.org
84 stars 6 forks source link

Understanding chunking and scheduling #48

Closed jakejh closed 4 years ago

jakejh commented 4 years ago

First, thanks for the great suite of future packages! I'm trying to understand a behavior of doFuture I've observed, in which the last several iterations in a %dopar% loop seem to be executed on the same worker, ignoring other available workers.

In my case, there are about 1000 iterations, each taking about 25 minutes. For the first 990 iterations or so, everything's fine, using all 14 registered workers. Eventually though, only one worker is working, even when more than one iteration remains (which I can tell from top and my log files). It seems like the iterations were already given to that worker, and now they're stuck waiting to be processed sequentially.

I've read the documentation on .options.future, but I confess I still don't understand how to change this behavior. Any help would be appreciated. Thanks.

HenrikBengtsson commented 4 years ago

Chunking takes place before any processing starts. With 14 workers, the default is that your 1000 iterations are split up into 14 equally-sizes chunks, e.g. chunks <- parallel::splitIndices(1000, 14). Then these chunks are distributed to the workers - one per worker. It sounds like 13 workers have completed there chunks and the remaining worker running still chug along on its chunk.

By splitting up into more chunks, the chunks will be smaller. For example, .options.future = list(scheduling = 4.0) will result in four times the number of chunks. With 14 workers, there will be 4*14 chunks. This will results in chunks 1-14 being submitted to the workers. Now say that workers 1-13 finish their chunks while worker 14 still keep on working, workers 1-13 can now take on another 13 chunks, i.e. chunks 15-27, and so on. This will lower the risk for one chunk holding up the workers for running. Since the chunks are now smaller, whatever delay there is, it will be shorter than before.

By setting .options.future = list(scheduling = Inf), or equivalently, .options.future = list(chunk.size = 1L), you'll get one element per chunk, i.e. 1,000 chunks. That would minimize the risk of having to wait for a few long running chunks. However, the tradeoff is that you are now spending more time from the overhead of orchastrating 1,000 parallel chunks instead of 14 parallel chunks.

jakejh commented 4 years ago

That makes sense, thanks! Given each of my iterations takes about 25 minutes, would you guess that the additional overhead of using scheduling = Inf would be relatively small? In case it matters, I'm passing very little data to the workers.

Also, is it chunk.size or chunk_size? The documentation in ?doFuture uses both.

HenrikBengtsson commented 4 years ago

Given each of my iterations takes about 25 minutes, would you guess that the additional overhead of using scheduling = Inf would be relatively small?

No, in that respect, I'd expect the parallelization overhead per chunk (=future) to be insignificant.

In case it matters, I'm passing very little data to the workers.

In some cases it matters, but if each iteration takes 25 minutes, even if it would add one minute to transfer data back and forth, it would still not add much overhead.

BTW, in case you're running things such as simulations, permutation tests, ..., make sure you use proper parallel RNGs. For foreach, the current best practice is to use `%dorng% from the doRNG for this.

Also, is it chunk.size or chunk_size? The documentation in ?doFuture uses both.

Only chunk.size. It's been corrected in the develop branch. Thanks for spotting.

jakejh commented 4 years ago

Thanks for the heads up. I'm not doing any random number generation, so I'm just using good ole %dopar%.