HenrikBengtsson / future

:rocket: R package: future: Unified Parallel and Distributed Processing in R for Everyone
https://future.futureverse.org
951 stars 83 forks source link

Nested Futures #544

Closed wfmueller29 closed 3 years ago

wfmueller29 commented 3 years ago

Hi Henrik,

Thank you for the awesome package. It is such an elegant and powerful way to parallelize. I have a question regarding nested parallelization. I am developing some functions which can be simplified to fun_a below.

fun_a <- function(x){
  a <- listenv()
  for(i in 1:x){
    a[[i]] <- future(Sys.sleep(5))
  }
  a <- as.list(a)
  return(a)
}

I can then call the function sequentially while the function itself is processed asynchronously without any problems:

ncpus <- availableCores()
cl <- makeClusterPSOCK(ncpus)
plan(cluster, workers = cl)
b <- fun_a(10)
c <- fun_a(15)
e <- fun_a(13)

However it would be great if I could also run calls b, c, and e in parallel. I am trying to do this as shown below:

plan(list(tweak(cluster, workers = 3), tweak(cluster, workers = (length(cl) - 3) %/% 3)))

b %<-% fun_a(10)
c %<-% fun_a(15)
e %<-% fun_a(13)

done <- as.list(value(b),
        value(c),
        value(e))

However I get the error in your Common Issues Vignette:

Error: Invalid usage of futures: A future (here ‘ClusterFuture’) whose value has not yet been collected can only be queried 
by the R process (d3bd14ca-0e69-51f1-57df-6ff5ebd64ed4; pid 50037 on localhost) that created it, not by any other R 
processes (b8240ee6-1935-40ed-2b69-d177edf0b624; pid 48446 on localhost): Sys.sleep(5)

Here is my session info:

R version 4.1.0 (2021-05-18)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: CentOS Linux 7 (Core)

Matrix products: default
BLAS/LAPACK: /usr/local/intel/compilers_and_libraries_2020.2.254/linux/mkl/lib/intel64_lin/libmkl_rt.so

locale:
 [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C               LC_TIME=en_US.UTF-8        LC_COLLATE=en_US.UTF-8    
 [5] LC_MONETARY=en_US.UTF-8    LC_MESSAGES=en_US.UTF-8    LC_PAPER=en_US.UTF-8       LC_NAME=C                 
 [9] LC_ADDRESS=C               LC_TELEPHONE=C             LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C       

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] listenv_0.8.0 future_1.22.1

loaded via a namespace (and not attached):
 [1] compiler_4.1.0    parallelly_1.28.1 fastmap_1.1.0     parallel_4.1.0    htmltools_0.5.2   tools_4.1.0      
 [7] yaml_2.2.1        codetools_0.2-18  rmarkdown_2.11    knitr_1.34        xfun_0.26         digest_0.6.27    
[13] globals_0.14.0    rlang_0.4.11      evaluate_0.14    

Any help would be much appreciated.

HenrikBengtsson commented 3 years ago

You can't pass futures between processes, so you can't create (future()) them in a parallel worker and ask them to be resolved (value()) in another. Instead, use:

fun_a <- function(x){
  a <- list()
  for(i in 1:x){
    a[[i]] <- future(Sys.sleep(5))
  }
  value(a)
}

or, equivalently using implicit future assignments syntax:

fun_a <- function(x){
  a <- listenv()
  for(i in 1:x){
    a[[i]] %<-% Sys.sleep(5)
  }
  as.list(a)
}

See also https://future.futureverse.org/articles/future-3-topologies.html