tidyverse / multidplyr

A dplyr backend that partitions a data frame over multiple processes
https://multidplyr.tidyverse.org
Other
641 stars 75 forks source link

Error: Computation failed #83

Closed brianmsm closed 3 years ago

brianmsm commented 5 years ago

I am excited about the potential of this package. However I had some difficulties.

I have a large database (256,000 rows), where each row contains a data frame. This is in tibble format and I use map iterations to make calculations. When using multidplyr, the time is reduced considerably ... After a long wait I get an error. I tried reducing to only 5000 rows and the error persists. However, when testing the same code with only 3000 rows (slice), the calculations are done correctly.

I enclose the code and errors. Code:

items_temp <- items_temp %>% 
  partition(cluster) %>% 
  mutate(
    alfa_psych = purrr::map(data,
                            ~ psych::alpha(dplyr::select(., -dplyr::ends_with("Item")))),
    alfa_coef  = purrr::map_dbl(alfa_psych,
                                ~ purrr::pluck(.x, "total", "raw_alpha")),
    alfa_ord_psych = purrr::map(data,
                                ~ psych::alpha(psych::polychoric(dplyr::select(., dplyr::ends_with("Item")))$rho)),
    alfa_ord_coef  = purrr::map_dbl(alfa_ord_psych,
                                    ~ purrr::pluck(.x, "total", "raw_alpha"))
  ) 

Error (row > 5000):

Error: Computation failed
Parents:
 ─callr subprocess failed: unable to fork, possible reason: Recurso no disponible temporalmente
 ─unable to fork, possible reason: Recurso no disponible temporalmente
rlang::last_error()
<error>
message: Computation failed
class:   `rlang_error`
backtrace:
  1. multidplyr::partition(., cluster)
 10. dplyr::mutate(...)
 11. multidplyr:::shard_call(.data, "mutate", enquos(...))
<error: parent>
message: callr subprocess failed: unable to fork, possible reason: Recurso no disponible temporalmente
class:   `callr_status_error`
<error: parent>
message: unable to fork, possible reason: Recurso no disponible temporalmente
class:   `callr_remote_error`
backtrace:
  1. withCallingHandlers(...)
 20. .handleSimpleError(...)
 21. h(simpleError(msg, call))
Call `rlang::last_trace()` to see the full backtrace
rlang::last_trace()
     █
  1. └─`%>%`(...)
  2.   ├─base::withVisible(eval(quote(`_fseq`(`_lhs`)), env, env))
  3.   └─base::eval(quote(`_fseq`(`_lhs`)), env, env)
  4.     └─base::eval(quote(`_fseq`(`_lhs`)), env, env)
  5.       └─`_fseq`(`_lhs`)
  6.         └─magrittr::freduce(value, `_function_list`)
  7.           ├─base::withVisible(function_list[[k]](value))
  8.           └─function_list[[k]](value)
  9.             ├─dplyr::mutate(...)
 10.             └─multidplyr:::mutate.multidplyr_party_df(...)
 11.               └─multidplyr:::shard_call(.data, "mutate", enquos(...))
hadley commented 5 years ago

Is there any way you could make a reprex using random data? Otherwise it's basically impossible for me to debug it.

brianmsm commented 5 years ago

Hello, sorry for the delay in responding.

I have the reproducible script, but at the time of doing the reprex I get an error. This rendering error appears only when I execute what was written in the first entry. So I guess it's not pandoc's problem.

> reprex:::reprex_addin() # With the calculation code that gives problem
Rendering reprex...
Error: callr subprocess failed: pandoc document conversion failed with error 127

> reprex:::reprex_addin() # Without the code that gives problem
Rendering reprex...
Rendered reprex is on the clipboard.

I attach the reproducible code here.

# Load packages
library(dplyr)
library(MASS)

# Generated data
set.seed(2019)
r <- seq(0.2, 0.3, 0.1) ## Correlation from 0.2 to 0.3
n <- c(50, 100) ## size sample
replic <- 500
# make list blank
sigma <- list()
temp <- list()

# Bucle
for(i in 1:3) { ## Esta primera lista almacena las matrices de una misma cantidad de ítems
  sigma[[i]] <- list() ## Así, el [[1]] es de 3 ítems, y así sucesivamente
  temp[[i]] <- list() ## hasta [[8]] que contiene la información  de 10 ítems
  for (j in seq_along(r)) { ## Este es el segundo subíndice, almacena los diferentes correlaciones
    sigma[[i]][[j]] <- matrix(rep(c(1, rep(r[j], i+2)), i+2), # Esta repitición se cumple siempre y cuando las correlaciones
                              # sean uniformes en la matriz
                              i+2, i+2) # esto adapta el tamaño de la matriz
    temp[[i]][[j]] <- list()
    for (k in seq_along(n)) {
      temp[[i]][[j]][[k]] <- list()
      for(l in 1:replic) {
        temp[[i]][[j]][[k]][[l]] <- mvrnorm(n[k], # tamaño de muestra
                                             mu = rep(0, i+2), 
                                             Sigma = sigma[[i]][[j]]) %>% # Agarra una de las 64 matrices
          as_tibble() %>% 
          mutate_all(list(Item = ~ findInterval(., c(-Inf, -2,  -1, 1,  2, Inf)))) # Hace un escalamiento "simétrico" de los datos
      }
    }
  }
}

# Borrar los índices temporales
rm(i, j, k, l, replic, sigma)

# Make format tidy
items <- list()
for(i in 1:3) {
  items[[i]] <- list()
  for(j in seq_along(r)) {
    items[[i]][[j]] <- list()
    for(k in seq_along(n)) {
      items[[i]][[j]][[k]] <- temp[[i]][[j]][[k]] %>% 
        bind_rows(.id = "replic")
    }
    items[[i]][[j]] <- items[[i]][[j]] %>% 
      bind_rows(.id = "n") %>% 
      mutate(n = recode(n, "1" = 50, "2" = 100,
                        "3" = 500, "4" = 1000))
  }
  items[[i]] <- items[[i]] %>% 
    bind_rows(.id = "correlacion") %>% 
    mutate(correlacion = recode(correlacion, "1" = 0.2,
                                "2" = 0.3, "3" = 0.4,
                                "4" = 0.5, "5" = 0.6,
                                "6" = 0.7, "7" = 0.8,
                                "8" = 0.9)) %>% 
    group_nest(correlacion, n, replic)
}

items <- items %>% ## Finalmente tendremos las 256 dataframes generados en un solo objeto
  bind_rows(.id = "items") %>% 
  mutate(items = recode(items, "1" = "3 items",
                        "2" = "4 items", "3" = "5 items"),
         items = factor(items),
         replic = as.numeric(replic)) %>% 
  arrange(items, correlacion, n, replic)

rm(i, j, k, n, r, temp)

# Calculate reliability with multidiplyr

library(multidplyr)
cluster <- new_cluster(parallel::detectCores())

items <- items %>% 
  partition(cluster) %>% 
  mutate(
    alfa_psych = purrr::map(data,
                            ~ psych::alpha(dplyr::select(., -dplyr::ends_with("Item")))),
    alfa_coef  = purrr::map_dbl(alfa_psych,
                                ~ purrr::pluck(.x, "total", "raw_alpha")),
    alfa_ord_psych = purrr::map(data,
                                ~ psych::alpha(psych::polychoric(dplyr::select(., dplyr::ends_with("Item")))$rho)),
    alfa_ord_coef  = purrr::map_dbl(alfa_ord_psych,
                                    ~ purrr::pluck(.x, "total", "raw_alpha"))
  ) %>% 
  collect()
cristianvaldez commented 5 years ago

I am having a similar issue, i defined a function, then when i call it, it says it cannot find it.

`> data_all <- vta_filtered %>%

jlfsjunior commented 4 years ago

@cristianvaldez Have you assigned forecast_function to the cluster (using multidplyr::cluster_assign)?

sschloss1 commented 4 years ago

I had the same problem as above with custom functions. There's an easy solution:

cluster=new_cluster(); cluster_copy(cluster,"my_function")

where "my_function" is the quoted name of your function.

Automatically loading the global environment and all loaded packages in each cluster would be a nice feature to add to cluster().

hadley commented 3 years ago

I think I've fixed the root cause of this issue, which is that the error reporting didn't actually tell you what the problem was.