HenrikBengtsson / future

:rocket: R package: future: Unified Parallel and Distributed Processing in R for Everyone
https://future.futureverse.org
956 stars 83 forks source link

Is a tri-layered nested topology possible? #442

Closed roey-angel closed 3 years ago

roey-angel commented 3 years ago

I have a function that calls for three nested future.apply() functions. Trying to design its topology I assumed I should do something like: plan(list(tweak(multisession, workers = 4L), tweak(multisession, workers = 4L), tweak(multisession, workers = 4L))) But such a design gives an error.

Is a tri-layered nested topology even possible? What would be the best practice for such a case?

HenrikBengtsson commented 3 years ago

Yes, it should be supported

But such a design gives an error.

What's the error?

roey-angel commented 3 years ago

I'm getting Error in unserialize(node$con) : error reading from connection. I use an Ubuntu LXC container running on a server. If I only indicate two layers under plan does the third layer run as sequential by default?

HenrikBengtsson commented 3 years ago

Here's a proof of cocept that it works:

> library(future)
> plan(list(
    first = tweak(multisession, workers = 2), 
    second = tweak(multisession, workers = 2), 
    third = tweak(multisession, workers = 2)
  ))

> pids %<-% { second %<-% { third %<-% Sys.getpid(); c(second=Sys.getpid(), third=third) }; c(first=Sys.getpid(), second) }
> c(main=Sys.getpid(), pids)
  main  first second  third 
  6257   6807   6939   7027

You should be able to get the same.

Regarding, the error "Error in unserialize(node$con) : error reading from connection.": See my https://github.com/HenrikBengtsson/future/issues/351#issuecomment-652206076 and ?parallelly::makeClusterPSOCK. This can happen when your workers run old versions of R, e.g. connecting to another machine. That shouldn't happen with the above example though because it should pickup the exact same R version that the main R session runs.

HenrikBengtsson commented 3 years ago

If I only indicate two layers under plan does the third layer run as sequential by default?

Yes. The default is always 'sequential'. This is documented in https://cran.r-project.org/web/packages/future/vignettes/future-3-topologies.html

roey-angel commented 3 years ago

Yes, your proof of concept works on my machine. I'm running R 4.0.2 so that shouldn't be the cause of the problem. I'm not sure though how to use parallelly::makeClusterPSOCK to solve my issue here

HenrikBengtsson commented 3 years ago

I'm not sure though how to use parallelly::makeClusterPSOCK to solve my issue here

That's used internally by multisession; I just pointed to the help page for further explanation of one of the reasons for that error message.

Yes, your proof of concept works on my machine.

Since that works, we established that nested parallelization works for three levels to (any actually any number of levels). There's something else going on, e.g. it could be that you're crashing one of the workers. I cannot really help you without a small reproducible example.

I recommend that you try with different variants of nested plans, e.g.

plan(list(first = tweak(multisession, workers = 2), second = tweak(multisession, workers = 2), third = tweak(multisession, workers = 2)))

plan(list(first = tweak(multisession, workers = 2), second = tweak(multisession, workers = 2), third = sequential))

plan(list(first = tweak(multisession, workers = 2), second = sequential, third = tweak(multisession, workers = 2)))

plan(list(first = sequential, second = tweak(multisession, workers = 2), third = tweak(multisession, workers = 2)))

plan(list(first = sequential, second = sequential, third = tweak(multisession, workers = 2)))

plan(list(first = sequential, second = tweak(multisession, workers = 2), third = sequential))

to see if there's one layer where 'multisession' causes it to fail.

Also, and more importantly, ask yourself why you want to do nested parallelization? Do you expect it to increase overall CPU utilization so it finishes sooner? Could it be that for instance

plan(list(first = sequential, second = sequential, third = tweak(multisession, workers = 2*2*2)))

is equally fast as:

plan(list(first = tweak(multisession, workers = 2), second = tweak(multisession, workers = 2), third = tweak(multisession, workers = 2)))

?

HenrikBengtsson commented 3 years ago

I'm closing. Please feel free to continue here if you've got follow-up comments or quesions.

roey-angel commented 3 years ago

Apologies for not writing back. I tried tweaking with the different variants, but it always failed with three layers (and never failed with two parallel layers, and one sequential). Unfortunately, the function I'm running fills a matrix with the results of 1000 simulation runs, so a three-way parallel calculation is really necessary to speed things up. Eventually, I rewrote my function to use parallelly which worked fine.

HenrikBengtsson commented 3 years ago

Thanks for reporting back. I'm still puzzled though why it would work with three nested layers using parallelly directly and not when you do it via future that uses parallelly internally.

roey-angel commented 3 years ago

I have no explanation for this myself. All I can say is that I've systematically tested all options under future and that I'm running parallelly using the same number of cores in total.

HenrikBengtsson commented 3 years ago

How do you set up your three-layers of cluster workers using parallelly?

roey-angel commented 3 years ago

Using three nested mclapply calls with mc.cores set to 6, 4 and 4. I'd be happy to send you both codes (the future and parallelly versions

HenrikBengtsson commented 3 years ago

I see.

So, note that mclapply() corresponds to using 'multicore' and neither involve parallelly - they're directly from parallel.

'multisession' involves parallelly::makeClusterPSOCK() which is basically the same as parallel::makeCluster().

Either way nested futures should work with both 'multisession' and 'multicore' but it could be that it will work better for you with 'multicore'.