DavisVaughan / furrr

Apply Mapping Functions in Parallel using Futures
https://furrr.futureverse.org/
Other
695 stars 39 forks source link

when large matrix or df split into list, does furrr workers only access list element? #257

Closed alperyilmaz closed 1 year ago

alperyilmaz commented 1 year ago

first of all, thanks for this great tool.. when I need to parallelize furrr is my go to package. just putting future_ in front of a map and getting all cpus working is great joy.

mat is a large matrix (60483x1222), columns are patient id and rows are gene names. A function extract_and_process takes whole matrix and patient id (or just column no) and then it will extract that column from the matrix and will do some calculations and save the results as a text file with patient name.

This is slow so I wanted to speed this up by parallelizing it. My first attempt was making a loop over the column numbers

1:ncol(mat) %>%
  future_iwalk(~ extract_and_process(mat, .x)) 

As expected, each worker got a copy of the large matrix and I barely finished processing using 12 cores, my RAM was almost full.

Then I thought, how can I make this more RAM friendly. I thought if I split the matrix beforehand (into list of columns) and then use furrr to process the list elements, there won't be need for copying large matrix to each worker, just the element of the list being processed should be sent to worker.

My updated code will look like this:


mat %>%
  split(rep(colnames(mat), each = nrow(mat))) %>%
  map(set_names, rownames(mat)) %>%
  future_walk2(., names(.), ~process_extracted_column(.x,.y))

When I run this, I face exactly same problem, each worker taking up 1.1Gb memory. So, is there a practical way to process a large data frame or matrix without each worker having the copy of the whole data? If we split the data into a list, shouldn't each element processed in a worker without need for the whole list?

DavisVaughan commented 1 year ago

I don't think that furrr is the problem here. When you split and add the rownames on, your resulting list of matrix columns is much larger than the original matrix due to the repetition of the rowname values. So you end up shipping larger pieces than is necessary to the workers.

If you try to avoid adding the rownames onto the slices, it might improve the results, but I'm not sure.

library(purrr)
library(rlang)

dim <- c(50, 1000)
mat <- array(seq_len(prod(dim)), dim = dim)
colnames(mat) <- seq_len(ncol(mat))
rownames(mat) <- seq_len(nrow(mat))

# do the splitting
data <- mat |>
  split(rep(colnames(mat), each = nrow(mat))) |>
  map(set_names, rownames(mat))

object.size(mat)
#> 267688 bytes
object.size(data)
#> 3680208 bytes

# Over 13 times larger!
unclass(object.size(data) / object.size(mat))
#> [1] 13.74812

# The problem seems to be the repeated row names, so lets try not
# adding those on
data <- mat |>
  split(rep(colnames(mat), each = nrow(mat)))

# That is more what you'd expect since you are just sharding a matrix.
# The extra ~.2 is from the memory overhead of the list
unclass(object.size(data) / object.size(mat))
#> [1] 1.196199

Created on 2023-04-05 with reprex v2.0.2.9000