tidyverse / multidplyr

A dplyr backend that partitions a data frame over multiple processes
https://multidplyr.tidyverse.org
Other
641 stars 75 forks source link

multidplyr runs significantly slower when wrapped in user-defined function #87

Open dzhang32 opened 4 years ago

dzhang32 commented 4 years ago

I would like to wrap multidplyr code into a function, since I am applying the parallelised multidplyr code to multiple datasets. However, this significantly slows the processing speed, to a point where the parallelisation of the code is not advantageous anymore in terms of run time.

Is there a fix for this? Am I doing something incorrectly? I believe that this may have something to do with the global environment vs local environment to the user-defined function.

Below is an example to illustrate the effect of wrapping multidplyr functions into a user-defined function in terms of run time.

**Note:** I understand that for this particular example itself, it is not advantageous to use the multidplyr commands vs dplyr since the overhead will outweigh the advantage of parallelising however, the issue I want to highlight is the doubling of the run time when wrapping the identical multidplyr code into a user defined function. ```r library(tidyverse) library(multidplyr) library(stringr) df <- tibble(index = rep(1:100000, 3), to_concat = (rep(1:100000, 3))) cluster <- new_cluster(2) system.time(expr = { df %>% group_by(index) %>% partition(cluster) %>% summarise(concat = stringr::str_c(to_concat, collapse = "_")) %>% collect() }) rm(cluster) ``` `user system elapsed 0.444 0.068 2.753` ```r user_defined_func <- function(df, cluster_num){ cluster <- new_cluster(cluster_num) df <- df %>% group_by(index) %>% partition(cluster) %>% summarise(concat = stringr::str_c(to_concat, collapse = "_")) %>% collect() rm(cluster) return(df) } system.time(expr = { user_defined_func(df, cluster_num = 2) }) ``` `user system elapsed 1.432 0.288 4.353`
hadley commented 4 years ago

Don't create an destroy the cluster in your function, instead pass it in as an argument so you can re-use it.

dzhang32 commented 4 years ago

Thanks a lot for the quick response and tip! This makes a lot of sense and yes, the run time comparison was unfair since the first didn't take into account cluster creation/destruction.

However, even after removal of cluster creation/destruction, this increase in run time when wrapping multidplyr code into a user-defined function still persists. Also the lag seems to be cumulatively worse with an increased number of workers. See example below for 4 workers:

library(tidyverse)
library(multidplyr)
library(stringr)

df <- tibble(index = rep(1:100000, 3), 
             to_concat = (rep(1:100000, 3)))

cluster <- new_cluster(4)

system.time(expr = {

  df %>% 
    group_by(index) %>% 
    partition(cluster) %>% 
    summarise(concat = stringr::str_c(to_concat, collapse = "_")) %>% 
    collect()

})

rm(cluster)

user system elapsed 0.466 0.046 2.110


user_defined_func <- function(df, cluster){

  df <- 
    df %>% 
    group_by(index) %>% 
    partition(cluster) %>% 
    summarise(concat = stringr::str_c(to_concat, collapse = "_")) %>% 
    collect()

  return(df)

}

cluster <- new_cluster(4)

system.time(expr = {

  user_defined_func(df, cluster)

})

rm(cluster)

user system elapsed 1.847 0.061 3.484

hadley commented 4 years ago

I can't replicate your results:

library(tidyverse)
library(multidplyr)
library(stringr)

df <- tibble(
  index = rep(1:100000, 3),
  to_concat = (rep(1:100000, 3))
)

cluster <- new_cluster(2)

system.time(expr = {
  df %>%
    group_by(index) %>%
    partition(cluster) %>%
    summarise(concat = stringr::str_c(to_concat, collapse = "_")) %>%
    collect()
})
#>    user  system elapsed 
#>   0.614   0.058   2.807

f <- function(df, cluster) {
  df %>% 
    group_by(index) %>% 
    partition(cluster) %>% 
    summarise(concat = stringr::str_c(to_concat, collapse = "_")) %>% 
    collect()
}

system.time(f(df, cluster))
#>    user  system elapsed 
#>   1.258   0.055   2.697

Created on 2019-11-16 by the reprex package (v0.3.0)

dzhang32 commented 4 years ago

I think this is because the lag when using only 2 cores, given the scale of the example, is minimal.

See below where I've increased the size of input data by 5x and run across 2-5 cores to emphasise the lag time. Hopefully this should be reproducible.

PS. Thanks for introducing me to reprex - really useful!


library(tidyverse)
library(multidplyr)
library(stringr)

df <- tibble(
  index = rep(1:500000, 3),
  to_concat = (rep(1:500000, 3))
)

for(i in 2:5){

  print(str_c("Number of cores: ", i))

  cluster <- new_cluster(i)

  print("Outside f:")

  print(system.time(expr = {
    df %>%
      group_by(index) %>%
      partition(cluster) %>%
      summarise(concat = stringr::str_c(to_concat, collapse = "_")) %>%
      collect()
  }))

  f <- function(df, cluster) {
    df %>% 
      group_by(index) %>% 
      partition(cluster) %>% 
      summarise(concat = stringr::str_c(to_concat, collapse = "_")) %>% 
      collect()
  }

  print("Inside f:")

  print(system.time(f(df, cluster)))

}
#> [1] "Number of cores: 2"
#> [1] "Outside f:"
#>    user  system elapsed 
#>   2.557   0.235   9.804 
#> [1] "Inside f:"
#>    user  system elapsed 
#>   5.937   0.230  12.450 
#> [1] "Number of cores: 3"
#> [1] "Outside f:"
#>    user  system elapsed 
#>   2.077   0.187   7.281 
#> [1] "Inside f:"
#>    user  system elapsed 
#>   7.429   0.231  12.218 
#> [1] "Number of cores: 4"
#> [1] "Outside f:"
#>    user  system elapsed 
#>   2.013   0.165   6.352 
#> [1] "Inside f:"
#>    user  system elapsed 
#>   9.242   0.247  13.461 
#> [1] "Number of cores: 5"
#> [1] "Outside f:"
#>    user  system elapsed 
#>   2.066   0.166   6.359 
#> [1] "Inside f:"
#>    user  system elapsed 
#>  11.060   0.278  14.354

Created on 2019-11-16 by the reprex package (v0.3.0)

hadley commented 4 years ago

Ok, now I see what you mean:

library(tidyverse)
library(multidplyr)

df <- tibble(
  index = rep(1:500000, 3),
  to_concat = (rep(1:500000, 3))
)

cluster <- new_cluster(5)

print(system.time({
  df %>%
    group_by(index) %>%
    partition(cluster) %>%
    summarise(concat = stringr::str_c(to_concat, collapse = "_")) %>%
    collect()
}))
#>    user  system elapsed 
#>   3.175   0.313   8.864

f <- function(df, cluster) {
  df %>% 
    group_by(index) %>% 
    partition(cluster) %>% 
    summarise(concat = stringr::str_c(to_concat, collapse = "_")) %>% 
    collect()
}
print(system.time(f(df, cluster)))
#>    user  system elapsed 
#>  11.328   0.352  14.678

Created on 2019-11-18 by the reprex package (v0.3.0)

hadley commented 4 years ago

The performance difference comes from within summarise.multidplyr_party_df(), because the generated call is 12 MB rather than 400kB. This is because the environment of the ... contains a bunch of stuff that we don't care about (mostly importantly in the wrapped function case, a full copy of the dataset). Even in the case that's currently fast, it looks like we're still copying stuff, so this is worth some further exploration. The easiest approach would probably be to take the same approach as dbplyr and dtplyr, and walk the quosure to embed needed data (building from dtplyr:::dt_squash()).

adamthrash commented 3 years ago

Has there been any progress on this issue? I'm in the process of updating a package of mine to use multidplyr, and I just ran into the same problem.

In addition, if I call the function and pass in the number of cores to use (8, in this case), only four of those display heavy activity in my CPU history. If I run the same code without running it as a function, all 8 cores display heavy activity.

ldanai commented 3 years ago

I run into the same problem.

For now I need to go back and use the beta version which work out OK remotes::install_github("tidyverse/multidplyr#10")

avsdev-cw commented 2 years ago

In addition, if I call the function and pass in the number of cores to use (8, in this case), only four of those display heavy activity in my CPU history. If I run the same code without running it as a function, all 8 cores display heavy activity.

Also having this problem, opened a new issue: #123