DavisVaughan / furrr

Apply Mapping Functions in Parallel using Futures
https://furrr.futureverse.org/
Other
699 stars 40 forks source link

Return `NULL` from each function call when using `future_walk()` #205

Closed albert-ying closed 2 years ago

albert-ying commented 3 years ago

Hi,

I'm using future_walk for a simple task:

Join a list of large tables with a large table in the memory.

library(furrr)
library(tidyverse)
large_df_1 = read_tsv("....")

plan(multicore, workers = 15, gc = TRUE)
options(future.globals.maxSize = 10 * 1024 ** 3)

future_walk(ListOfFilePath, ~{
  large_df_2 = read_tsv(.x)
  final_df = inner_join(large_df_1, large_df_2)
  write_tsv(final_df, paste0(.x, "processed"))
})

At beginning, the RAM usage is arround 50G, which is acceptable. But with the time, the RAM usage gradually increased to ~ 250G and finally fill all of the RAM.

Now I have to manually stop the process after it ran for a while and restart to avoid the crash. I'm wondering what may cause this and how can I prevent it?

Thank you!

[ins] r$> sessionInfo()
R version 4.1.0 (2021-05-18)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Ubuntu 16.04.6 LTS

Matrix products: default
BLAS/LAPACK: /home/kying/anaconda3/envs/r-reticulate/lib/libopenblasp-r0.3.12.so

locale:
 [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C              
  [3] LC_TIME=en_US.UTF-8        LC_COLLATE=en_US.UTF-8    
   [5] LC_MONETARY=en_US.UTF-8    LC_MESSAGES=en_US.UTF-8   
    [7] LC_PAPER=en_US.UTF-8       LC_NAME=C                 
     [9] LC_ADDRESS=C               LC_TELEPHONE=C            
     [11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C       

     attached base packages:
     [1] stats     graphics  grDevices datasets  utils     methods  
     [7] base     

     other attached packages:
     [1] furrr_0.2.3      future_1.22.1    tidylog_1.0.2   
     [4] ggtext_0.1.1     ggsci_2.9        hrbrthemes_0.8.0
     [7] ggplot2_3.3.5   

     loaded via a namespace (and not attached):
      [1] clisymbols_1.2.0  tidyselect_1.1.1  xfun_0.24        
       [4] purrr_0.3.4       listenv_0.8.0     colorspace_2.0-2 
        [7] vctrs_0.3.8       generics_0.1.0    htmltools_0.5.2  
        [10] utf8_1.2.2        rlang_0.4.11      gridtext_0.1.4   
        [13] pillar_1.6.2      glue_1.4.2        withr_2.4.2      
        [16] DBI_1.1.1         gdtools_0.2.3     lifecycle_1.0.0  
        [19] munsell_0.5.0     gtable_0.3.0      codetools_0.2-18 
        [22] evaluate_0.14     knitr_1.33        fastmap_1.1.0    
        [25] extrafont_0.17    parallel_4.1.0    fansi_0.5.0      
        [28] Rttf2pt1_1.3.9    Rcpp_1.0.7        scales_1.1.1     
        [31] jsonlite_1.7.2    parallelly_1.27.0 systemfonts_1.0.2
        [34] digest_0.6.27     dplyr_1.0.7       grid_4.1.0       
        [37] tools_4.1.0       magrittr_2.0.1    tibble_3.1.4     
        [40] crayon_1.4.1      extrafontdb_1.0   tidyr_1.1.3      
        [43] pkgconfig_2.0.3   ellipsis_0.3.2    xml2_1.3.2       
        [46] assertthat_0.2.1  rmarkdown_2.9     R6_2.5.1         
        [49] globals_0.14.0    compiler_4.1.0   
DavisVaughan commented 3 years ago

future_walk() is essentially a wrapper around future_map() that then just returns NULL. So keep in mind that the internal future_map() call is going to collect all of the results from each worker and return them back to the main process.

Since write_tsv() actually returns its input, you are returning every single final_df object back to the main process too (at the very least they can't be garbage collected).

Is it better if you do something like this?

future_walk(ListOfFilePath, ~{
  large_df_2 = read_tsv(.x)
  final_df = inner_join(large_df_1, large_df_2)
  write_tsv(final_df, paste0(.x, "processed"))
  NULL
})
DavisVaughan commented 3 years ago

Note to self that if that is the actual issue, I can probably fix it for the walk functions by adjusting this:

https://github.com/DavisVaughan/furrr/blob/4068c95342a54522e9fa1594e63c3b3ac6bc8652/R/template.R#L23-L34

to be more like:

...furrr_fn_wrapper <- function(...) { 
     !!expr_seed_update 
     !!expr_progress_update 

     out <- ...furrr_fn(...) 

     !!expr_result
   } 

Where !!expr_result would be out if we aren't doing walk(), and NULL if we are to ensure that we only pass a list of NULL back to the main process. I would have to add an is_walk boolean argument, which should be straightforward.

albert-ying commented 3 years ago

Since write_tsv() actually returns its input, you are returning every single final_df object back to the main process too (at the very least they can't be garbage collected).

Oh, I didn't know that! This makes a lot of sense to me now. Thank you so much!!!

albert-ying commented 2 years ago

Actually, I still see RAM build-up in this setting. Any idea why?

library(furrr)
plan(multicore, workers = 8, gc = T)
options(future.globals.maxSize = 10 * 1024 ** 3)

# remove precessed rows
future_map(1:10000, ~{
  df = read_tsv("large.tsv")
  df2 = mutate(df, new_col = 1)
  write_tsv(df2, "processed.tsv")
  return(NULL)
})
DavisVaughan commented 2 years ago

Can you provide a full reprex that also generates the large tsv file?

albert-ying commented 2 years ago

Hi, this is an example that reproduces my actual workflow. In real life, I have 10k different files to be read and processed.

library(tidyverse)
library(furrr)
plan(multicore, workers = 8, gc = T)
options(future.globals.maxSize = 10 * 1024 ** 3)

system("wget https://broad-ukb-sumstats-us-east-1.s3.amazonaws.com/round2/additive-tsvs/100240.gwas.imputed_v3.both_sexes.tsv.bgz -O gwas.tsv.bgz")
system("wget https://broad-ukb-sumstats-us-east-1.s3.amazonaws.com/round2/annotations/variants.tsv.bgz -O variants.tsv.bgz")

df = read_tsv("variants.tsv.bgz", col_types = "ccdccc---d--d------------")
# remove precessed rows
future_map(1:10000, ~{
  gwas = read_tsv("gwas.tsv.bgz")
  gwas2 = gwas |>
    inner_join(df, by = "variant")
  write_tsv(gwas, "test.tsv")
  return(NULL)
})

1 min after running

image

3 min after running

image

6 min after running

image

10 min

image

You can see the RAM usage is quickly doubled and keep building up.

DavisVaughan commented 2 years ago

Is this on a linux? Also, how much RAM do you actually have available on your computer?

albert-ying commented 2 years ago

Yes it is. We have 251 Gb, but this code will eventually occupy all the RAM and crash the machine (as I ran it overnight last night)

I think you can still get similar behavior with a smaller dataset.

Best, Albert

On Mon, Jan 24, 2022 at 3:44 PM Davis Vaughan @.***> wrote:

Is this on a linux? Also, how much RAM do you actually have available on your computer?

— Reply to this email directly, view it on GitHub https://github.com/DavisVaughan/furrr/issues/205#issuecomment-1020530364, or unsubscribe https://github.com/notifications/unsubscribe-auth/AOIS5MX27IT4TYK5BNNEFUTUXW23PANCNFSM5FA63VIA . You are receiving this because you authored the thread.Message ID: @.***>

DavisVaughan commented 2 years ago

Oh and how many physical cores?

albert-ying commented 2 years ago

72, here I used 8.

DavisVaughan commented 2 years ago

I tried with 2 workers on my 32gb machine and I can't reproduce.

It is very possible that you are having an issue with the combination of furrr's parallelism and readr/vroom's parallelism.

Are you aware that both read_tsv() and write_tsv() run in parallel? And by default they use the same number of threads as number of virtual cores that you have? This means that you are running over 8 cores using furrr, and within that each one of those cores is trying to read/write with 144 threads (72 physical * 2 = 144 virtual, try readr:::readr_threads() to confirm). This is probably overloading your system in some way.

You could try setting num_threads = 1 in both readr functions. That would probably help? We generally do not recommend mixing parallelism like this.

albert-ying commented 2 years ago

Hi Davis, I just tried to set all read* and write* functions with num_threads = 1. It seems that the RAM usage still builds up over time like before. Did you also used readr functions in your test?

albert-ying commented 2 years ago

This problem seems to be resolved by replacing readr functions with base R functions. Still not sure what is happening under the hood.

DavisVaughan commented 2 years ago

Oh really? That's interesting.

Did you also used readr functions in your test?

I did, and tried multiple combinations of num_threads but couldn't really reproduce on my end

DavisVaughan commented 2 years ago

What version of readr are you using? And what version of vroom?

albert-ying commented 2 years ago

I was using readr 2.0.1 and vroom 1.5.7. Maybe it is related to lazy reading behavior? But I'm pretty happy that base R functions do not have this problem :)

DavisVaughan commented 2 years ago

Oh yea you have an older version of readr. In readr 2.1.0 we switched back to reading eagerly by default. So maybe that is why I can't reproduce.

Can you try either:

  1. Updating readr to the latest version
  2. Setting lazy = FALSE explicitly (and num_threads = 1)

Even if 2 doesn't solve the problem, I'd love to hear if upgrading readr helps because there have been a number of fixes between your version and now

DavisVaughan commented 2 years ago

It would also be very useful to know which step had the potential memory leak. i.e. does the memory grow if you do this?

future_map(1:10000, ~{
  gwas = read_tsv("gwas.tsv.bgz")
  return(NULL)
})

And if you replace future_map() with map(), does the memory usage still grow? That would 100% rule out furrr, which I'm already pretty confident we can do since the base R reading functions work.