tidyverse / multidplyr

A dplyr backend that partitions a data frame over multiple processes
https://multidplyr.tidyverse.org
Other
641 stars 75 forks source link

removing a cluster doesn't free memory #116

Open Erinaceida opened 3 years ago

Erinaceida commented 3 years ago

ReadMe and other multidplyr tutorials created by the authors do not mention how to close clusters after these have been initiated.

I apologise, as I'm probably not using the package correctly, but it's a bit difficult to find a solution from the documentation. My code looks like this:

cluster <- new_cluster(12)
cluster_library(cluster, c("dplyr", "lubridate"))

transfers_mod_los <- transfers_mod %>%
  group_by(hadm_id) %>%
  partition(cluster)

transfers_mod_los <- transfers_mod_los %>%
  dplyr::mutate(LOS = time_length(difftime(outtime, intime), "days")) %>%
  collect()

running cluster_rm(cluster) throws an error Error in stopCluster(cluster) : could not find function "stopCluster" while using rm(cluster) doesn't solve the issue of R holding on to RAM.

How can I close my cluster?

image

rexdouglass commented 3 years ago

Can confirm, it's really hit or miss. I'm on 0.1.0.9 and Ubuntu 20, and the cluster won't free memory after finishing an operation and then collect. So I have hack that checks the available free memory and if it gets below a threshold tries to shut down and reset the cluster. And that also isn't freeing the memory either.

hadley commented 3 years ago

To cleanup, you can just delete the cluster and then it'll be cleaned up on the next gc(). This works for me on mac; can you please run the code below and report what you see?

library(multidplyr)

cl <- new_cluster(4)
ps::ps_children()
#> [[1]]
#> <ps::ps_handle> PID=10623, NAME=R, AT=2021-04-30 13:24:35
#> 
#> [[2]]
#> <ps::ps_handle> PID=10627, NAME=R, AT=2021-04-30 13:24:36
#> 
#> [[3]]
#> <ps::ps_handle> PID=10631, NAME=R, AT=2021-04-30 13:24:36
#> 
#> [[4]]
#> <ps::ps_handle> PID=10635, NAME=R, AT=2021-04-30 13:24:36

rm(cl)
gc()
#>           used (Mb) gc trigger (Mb) limit (Mb) max used (Mb)
#> Ncells  761424 40.7    1311987 70.1         NA  1311987 70.1
#> Vcells 1408671 10.8    8388608 64.0      32768  2355271 18.0
ps::ps_children()
#> list()

Created on 2021-04-30 by the reprex package (v2.0.0)

rexdouglass commented 3 years ago

I can confirm the toy example works correctly.

I'm sorry I only have a production example, and I'm conflating two different but possibly related problems (1) nodes not freeing memory while being used and (2) rm(cluster) not reliably destroying a cluster.

I have a for loop that subsets a parquet file loaded with arrow based on a variable, sends it to partitions with multidplyr to be summarized, pulls the results back, and saves it.

Each iteration of that for loop, memory use increases. Asking nodes to destroy their objects and gc() only frees a small amount. Destroying the entire cluster with rm() doesn't always shut down those nodes and free their memory as in this run below.

Today I tried explicitly calling kill first and that appears to guarantee destroying the cluster, which solves problem 2.

lapply(cluster, FUN=function(x) x$kill()); gc()

Here is a sample of output that shows memory use creeping up each iteration of the for loop, and then the final rm() call not destroying the cluster and freeing its memory.

[1] "fromcdc_byreport_cases_confirmed_cumsum" Using partial cluster of size 29 7.98 sec elapsed [1] "143.829188 GB of free memory" [1] "143.816088 GB of free memory, can't get the partitions to free their memory" [1] "fromcdc_byreport_cases_cumsum" Using partial cluster of size 50 13.692 sec elapsed [1] "139.734024 GB of free memory" [1] "139.838656 GB of free memory, can't get the partitions to free their memory" [1] "fromcdc_byreport_cases_probable_cumsum" Using partial cluster of size 29 6.746 sec elapsed [1] "139.64672 GB of free memory" [1] "139.660772 GB of free memory, can't get the partitions to free their memory"

print(memFree %>% paste(" GB of free memory, final shutdown")) [1] "139.660772 GB of free memory, final shutdown" rm(cluster); gc() used (Mb) gc trigger (Mb) max used (Mb) Ncells 3540491 189.1 6463637 345.2 6463637 345.2 Vcells 6566650 50.1 33847463 258.3 242232480 1848.1 memFree <- as.numeric(system("awk '/MemFree/ {print $2}' /proc/meminfo", intern=TRUE)) / 1000000 #gb print(memFree %>% paste(" GB of free memory, memory freed")) [1] "139.647392 final GB of free memory, memory freed"

Non-reproducible production example showing attempts to free memory, and new explicit kill command

#Library Loads
library(dplyr)
library(glue)
library(janitor)
library(arrow)
library(tictoc)
library(multidplyr) 

#there's a conflict between multidplyr and spark. Make sure there's no spark connection and sparklyr is unloaded.
try({spark_disconnect(sc)}) 
try({ detach("package:sparklyr", unload=TRUE) })

#User function to summarize data
seasonal_decomp <- function(x) { 
  try({
    return( x %>% ts(frequency = 7)  %>% stR::AutoSTR(confidence = NULL) %>% stR::components() %>% as.data.frame() %>% 
              janitor::clean_names() %>%
              #dplyr::select(-data) %>%
              rename(value_fd_clean_autostr_trend=trend, value_fd_clean_autostr_seasonality_7=seasonality_7, value_fd_clean_autostr_residual=  random) %>%
              mutate(value_fd_clean_autostr_trend=pmax(value_fd_clean_autostr_trend,0, na.rm = TRUE)) %>% 
              dplyr::mutate(t=row_number() )
    ) 
  })
  n=length(x)
  x <- NULL
  return( matrix(NA, nrow = length(x), ncol = 4, dimnames=list(c(),c('data','value_fd_clean_autostr_trend','value_fd_clean_autostr_seasonality_7','value_fd_clean_autostr_residual'))) %>% data.frame() %>% mutate(t=row_number()) )
}

#Construct the cluster the first time
cluster <- new_cluster(n=64) 
cluster_library(cluster,"dplyr")
cluster_library(cluster,"janitor")
cluster_library(cluster,"stR")
cluster_copy(cluster,"seasonal_decomp")

#Main for loop
for(v in unique_variable){
  print(v)
  filename=glue("./smoothed/smoothed_{v}.parquet")

  if(!file.exists(filename)){
    ccc_rectangular_added <- arrow::open_dataset(sources="./data_temp/ccc_rectangular_added/") 

    smoothed <- ccc_rectangular_added %>%
                          dplyr::filter(variable == v) %>% 
                          tidylog::select(fips,source,variable,date, value_fd_clean) %>%
                          collect() %>% #collection from arrow

                          dplyr::group_by(variable,source,fips) %>%
                            partition(cluster) %>%  
                              arrange(date) %>%
                              summarise(seasonal_decomp(value_fd_clean)) %>% 
                            collect() %>%  #collection from multidplyr
                          arrow::write_parquet(filename)

    smoothed <- NULL;  gc() #try to destroy objects in the main R session

    memFree <- as.numeric(system("awk '/MemFree/ {print $2}' /proc/meminfo", intern=TRUE)) / 1000000 #gb
    print(memFree %>% paste(" GB of free memory"))

    #if free memory falls below 100GB destroy the cluster, otherwise just try to free memory
    if(memFree<100){ 
      print("CLEARING MEMORY BY RESTARTING CLUSTER")
      a <- cluster_call(cluster, {rm(list = ls())}) #try to get the r sessions to clear their memory  
      a <- cluster_call(cluster, {gc()}) #Try to get the r sessions to garbage collect
      a <- lapply(cluster, FUN=function(x) x$kill()); gc() #newly added kill command which appears to work reliably
      rm(cluster); gc(); #suggested way that only sometimes works

      #Restart the cluster
      cluster <- new_cluster(n=64) #
      cluster_library(cluster,"dplyr")
      cluster_library(cluster,"janitor")
      cluster_library(cluster,"dtw")
      cluster_copy(cluster,"seasonal_decomp")
    } else{
      a <- cluster_call(cluster, {rm(list = ls())}) #try to get the r sessions to clear their memory 
      a <- cluster_call(cluster, {gc()}) #Try to get the r sessions to garbage collect
      a <- cluster_copy(cluster,"seasonal_decomp") #have to copy this back
      memFree <- as.numeric(system("awk '/MemFree/ {print $2}' /proc/meminfo", intern=TRUE)) / 1000000 #gb
      print(memFree %>% paste(" GB of free memory, can't get the partitions to free their memory"))
    }
  }
}
print(memFree %>% paste(" GB of free memory, final shutdown"))
a <- lapply(cluster, FUN=function(x) x$kill()); gc() #newly added kill command which appears to work reliably
rm(cluster); gc()
ccc_rectangular_added <- NULL; gc()
memFree <- as.numeric(system("awk '/MemFree/ {print $2}' /proc/meminfo", intern=TRUE)) / 1000000 #gb
print(memFree %>% paste(" final GB of free memory,"))
hadley commented 3 years ago

Ok, sounds like I should kill explicitly when the cluster is gc()d, and add an explicit cluster_kill function.

x1o commented 3 years ago

Try walk(cluster, ~.x$kill())

Fredo-XVII commented 1 year ago

Hello!

I am having the same issue as @Erinaceida on Windows machine where rm(cluster) did not shut down the cluster. But then I noticed that I still had a partition object in my environment in RStudio. So I rm(cluter_partitioned_df), then followed that with the rm(cluster) and gc(), the cluster shut down. Here is an example code; because I am working with production code, I don't have a reprex.

One thing to note is that even though the cluster shut down, the RStudio R Session memory usage continued to grow. So, I ended with no cluster, but still had the large memory usage. This is just an FYI, hoping that it might lead to some clue as to why the memory keeps growing after cluster shutdown.

nested_df <- df %>% tidyr::nest()

cluster <- build_cluster(cores = cores) # custom function

# Send packages to cluster
multidplyr::cluster_library(cluster, 'purrr')
.
.
.

cluster_partitioned_df <- multidplyr::partition(nested_df, cluster = cluster)

# Run Cluster Code

multidplyr::cluster_call(cluster, code = rm(list = ls())
rm(cluster_partitioned_df) # This is was the new code that shut down the cluster
rm(cluster)
gc()