HenrikBengtsson / future

:rocket: R package: future: Unified Parallel and Distributed Processing in R for Everyone
https://future.futureverse.org
951 stars 83 forks source link

PSOCK fails with slower functions #524

Open pachadotdev opened 3 years ago

pachadotdev commented 3 years ago

Describe the bug When I try a fast function, like a regression with mtcars, PSOCK woks well on DigitalOcean droplets. But, if I connect to an S3 space, PSOCK returns this error

  Failed to retrieve the value of ClusterFuture (<none>) from cluster RichSOCKnode #1 (PID 2009 on ‘143.110.238.244’). The reason reported was ‘error reading from connection’

Reproduce example

library(analogsea)
library(dplyr)

s <- "c2-2vcpu-4gb" # 8gb + 4 dedicated CPUs, run sizes()
droplet1 <- droplet_create("RDemo1", region = "sfo3", size = s, image = "rstudio-20-04", wait = F)
droplet2 <- droplet_create("RDemo2", region = "sfo3", size = s, image = "rstudio-20-04", wait = T)

droplet1 <- droplet(droplet1$id)
droplet2 <- droplet(droplet2$id)

Sys.sleep(30)

install_r_package(droplet1, "eflm")
install_r_package(droplet2, "eflm")

ip1 <- droplet(droplet1$id)$networks$v4[[2]]$ip_address
ip2 <- droplet(droplet2$id)$networks$v4[[2]]$ip_address
ips <- c(ip1, ip2)

library(furrr)

ssh_private_key_file <- "~/.ssh/id_rsa"

cl <- makeClusterPSOCK(
  ips,
  user = "root",
  rshopts = c(
    "-o", "StrictHostKeyChecking=no",
    "-o", "IdentitiesOnly=yes",
    "-i", ssh_private_key_file
  ),
  dryrun = FALSE,
  connectTimeout = 1500
)

plan(cluster, workers = cl)

fit_model <- function(y, m) {
  message(paste(y,m))

  suppressMessages(library(arrow))
  suppressMessages(library(dplyr))
  suppressMessages(library(eflm))

  space <- S3FileSystem$create(
    anonymous = TRUE,
    scheme = "https",
    endpoint_override = "sfo3.digitaloceanspaces.com"
  )

  d <- open_dataset(
    space$path("nyc-taxi"),
    partitioning = c("year", "month")
  )

  d <- d %>%
    filter(year == y, month == m) %>%
    select(total_amount, trip_distance) %>%
    collect()

  fit <- elm(total_amount ~ trip_distance, data = d)
  return(fit)
}

ym <- expand.grid(y = 2009:2019, m = 1:12) %>% arrange(y)
fitted_models <- future_map2(ym$y, ym$m, ~fit_model(.x, .y))

droplet_delete(droplet1)
droplet_delete(droplet2)

Expected behavior It should have returned lm object. I tested the code by running it from ssh (terminal) on each droplet , and it works.

Session information

I tested both on laptop and server

Laptop: Pop!_OS 21.04

R version 4.1.0 (2021-05-18)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Pop!_OS 21.04

Matrix products: default
BLAS:   /usr/lib/x86_64-linux-gnu/openblas-pthread/libblas.so.3
LAPACK: /usr/lib/x86_64-linux-gnu/openblas-pthread/libopenblasp-r0.3.13.so

locale:
 [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C               LC_TIME=en_US.UTF-8        LC_COLLATE=en_US.UTF-8    
 [5] LC_MONETARY=en_US.UTF-8    LC_MESSAGES=en_US.UTF-8    LC_PAPER=en_US.UTF-8       LC_NAME=C                 
 [9] LC_ADDRESS=C               LC_TELEPHONE=C             LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C       

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] dplyr_1.0.7     furrr_0.2.3     future_1.21.0   analogsea_1.0.0

loaded via a namespace (and not attached):
 [1] parallelly_1.26.1 knitr_1.33        magrittr_2.0.1    tidyselect_1.1.1  R6_2.5.0          rlang_0.4.11     
 [7] fansi_0.5.0       httr_1.4.2        globals_0.14.0    tools_4.1.0       sys_3.4           parallel_4.1.0   
[13] xfun_0.24         utf8_1.2.1        ellipsis_0.3.2    askpass_1.1       openssl_1.4.4     yaml_2.2.1       
[19] digest_0.6.27     tibble_3.1.2      lifecycle_1.0.0   crayon_1.4.1      ssh_0.8.0         purrr_0.3.4      
[25] vctrs_0.3.8       credentials_1.3.0 codetools_0.2-18  curl_4.3.2        glue_1.4.2        pillar_1.6.1     
[31] compiler_4.1.0    generics_0.1.0    jsonlite_1.7.2    listenv_0.8.0     pkgconfig_2.0.3  

Server: Ubuntu Server 20.04

R version 4.1.0 (2021-05-18)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Ubuntu 20.04.2 LTS

Matrix products: default
BLAS:   /usr/lib/x86_64-linux-gnu/openblas-pthread/libblas.so.3
LAPACK: /usr/lib/x86_64-linux-gnu/openblas-pthread/liblapack.so.3

locale:
 [1] LC_CTYPE=C.UTF-8       LC_NUMERIC=C           LC_TIME=C.UTF-8        LC_COLLATE=C.UTF-8     LC_MONETARY=C.UTF-8   
 [6] LC_MESSAGES=C.UTF-8    LC_PAPER=C.UTF-8       LC_NAME=C              LC_ADDRESS=C           LC_TELEPHONE=C        
[11] LC_MEASUREMENT=C.UTF-8 LC_IDENTIFICATION=C   

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] dplyr_1.0.7     furrr_0.2.3     future_1.21.0   analogsea_1.0.0

loaded via a namespace (and not attached):
 [1] parallelly_1.26.1 magrittr_2.0.1    tidyselect_1.1.1  R6_2.5.0          rlang_0.4.11      fansi_0.5.0      
 [7] httr_1.4.2        globals_0.14.0    tools_4.1.0       sys_3.4           parallel_4.1.0    utf8_1.2.1       
[13] DBI_1.1.1         ellipsis_0.3.2    askpass_1.1       assertthat_0.2.1  openssl_1.4.3     yaml_2.2.1       
[19] digest_0.6.27     tibble_3.1.2      lifecycle_1.0.0   crayon_1.4.1      ssh_0.8.0         purrr_0.3.4      
[25] vctrs_0.3.8       credentials_1.3.0 codetools_0.2-18  curl_4.3.2        glue_1.4.2        pillar_1.6.1     
[31] compiler_4.1.0    generics_0.1.0    jsonlite_1.7.2    listenv_0.8.0     pkgconfig_2.0.3  
HenrikBengtsson commented 3 years ago

When I try a fast function, like a regression with mtcars, PSOCK woks well on DigitalOcean droplets. But, if I connect to an S3 space, PSOCK returns this error ...

It sounds like you're implying that a "slow" function gives the error. I don't see how the processing time would result in error reading from connection. There is a timeout argument to parallelly::makeClusterPSOCK() that would trigger a timeout error for very long run times, but we're talking about 30 days or more.

Instead, I guess that one of your parallel workers crashes/core dumps for one reason or the other, e.g. running out of memory. If that happens, you'll get the same error message, e.g.

> cl <- parallelly::makeClusterPSOCK("remote.worker.org")
> cl
Socket cluster with 1 nodes where 1 node is on host 'hb-x1-2016' (R version 3.6.3 (2020-02-29), platform x86_64-pc-linux-gnu)
> library(future)
> plan(cluster, workers = cl)
> f <- future(quit("no"))
> value(f)
Error in unserialize(node$con) : 
  ClusterFuture (<none>) failed to receive results from cluster RichSOCKnode #1 (PID 4893 on 'hb-x1-2016'). The reason reported was 'error reading from connection'

> traceback()
6: stop(ex)
5: receiveMessageFromWorker(future, ...)
4: result.ClusterFuture(future)
3: result(future)
2: value.Future(f)
1: value(f)

BTW, what does traceback() report after you get that error.

I recommend that you try to reproduce the problem with a smaller example using a smaller data set with fewer package dependencies. You could also add near-live progress updates using progressr to narrow how far the function calls get before you get that error.