apache / arrow

Apache Arrow is the universal columnar format and multi-language toolbox for fast data interchange and in-memory analytics
https://arrow.apache.org/
Apache License 2.0
14.59k stars 3.54k forks source link

[R] join on dataset crashes on Windows #30433

Closed asfimport closed 2 years ago

asfimport commented 2 years ago

library(tidyverse)
library(arrow)

car_info <- rownames_to_column(mtcars, "car_info") 

cars_arrow_table <- arrow_table(car_info)

other_mtcars_data <- select(car_info, 1) %>% 
  mutate(main_color = sample( c("red", "blue", "white", "black"), size = n(), replace = TRUE)) %>% 
  arrow::arrow_table()

temp <- tempdir()
par_temp <- paste0(temp, "\\parquet")

car_info %>% arrow::write_dataset(par_temp)
cars_arrow <- arrow::open_dataset(par_temp) 

# using arrow tables works ------------------------------------------------------
cars_arrow_table %>% left_join(other_mtcars_data) %>% count(main_color) %>% collect()

# using open dataset crashes R ------------------------------------------------------------------
other_mtcars_data %>% 
  left_join(cars_arrow) %>% 
  count(main_color) %>% 
  collect()

#other variation also crash
cars_arrow %>% 
  left_join(other_mtcars_data) %>% 
  count(main_color) %>% 
  collect()

cars_arrow %>% 
  left_join(other_mtcars_data) %>% 
  group_by(main_color) %>% 
  summarise(n = n()) %>% 
  collect()

#compute also crashes
cars_arrow %>% 
  left_join(other_mtcars_data) %>% 
  count(main_color) %>% 
  compute()

# workaround with duckdb ------------------------------------------------------
##this works
cars_duck <- to_duckdb(cars_arrow, auto_disconnect = TRUE)
other_cars_duck <- to_duckdb(other_mtcars_data, auto_disconnect = TRUE)
    
cars_duck %>% 
  left_join(other_cars_duck) %>%
  count(main_color) %>%
  collect()

##this doesn't (don't know if expected to work actually)
cars_arrow %>% 
  left_join(other_mtcars_data) %>% 
  to_duckdb() 

Environment: R version 4.0.4 Reporter: Will Jones / @wjones127 Assignee: Will Jones / @wjones127

Related issues:

Note: This issue was originally created as ARROW-14908. Please see the migration documentation for further details.

asfimport commented 2 years ago

Neal Richardson / @nealrichardson: Can you install the latest patch release (6.0.1)? These all work for me on that version.

asfimport commented 2 years ago

Fabio Machado: I installed the CRAN 6.0.1 version but I'm still getting crashes.

asfimport commented 2 years ago

Neal Richardson / @nealrichardson: Ok, can you provide sessionInfo() and any other system details?

asfimport commented 2 years ago

Fabio Machado: Here's the sessionInfo()


R version 4.0.4 (2021-02-15)
Platform: x86_64-w64-mingw32/x64 (64-bit)
Running under: Windows 10 x64 (build 18363)
Matrix products: default
locale:
[1] LC_COLLATE=English_United States.1252  LC_CTYPE=English_United States.1252   
[3] LC_MONETARY=English_United States.1252 LC_NUMERIC=C                          
[5] LC_TIME=English_United States.1252    
attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     
other attached packages:
 [1] arrow_6.0.1     forcats_0.5.1   stringr_1.4.0   dplyr_1.0.7    
 [5] purrr_0.3.4     readr_1.4.0     tidyr_1.1.3     tibble_3.1.3   
 [9] ggplot2_3.3.3   tidyverse_1.3.0
loaded via a namespace (and not attached):
 [1] Rcpp_1.0.7        lubridate_1.7.9.2 assertthat_0.2.1  digest_0.6.27    
 [5] utf8_1.1.4        R6_2.5.0          cellranger_1.1.0  odbc_1.3.0       
 [9] backports_1.2.1   reprex_1.0.0      httr_1.4.2        pillar_1.6.2     
[13] tictoc_1.0        rlang_0.4.11      curl_4.3          readxl_1.3.1     
[17] rstudioapi_0.13   data.table_1.14.0 blob_1.2.1        DT_0.17          
[21] foreign_0.8-81    htmlwidgets_1.5.3 bit_4.0.4         munsell_0.5.0    
[25] tinytex_0.29      broom_0.7.5       compiler_4.0.4    modelr_0.1.8     
[29] xfun_0.21         pkgconfig_2.0.3   htmltools_0.5.1.1 tidyselect_1.1.1 
[33] rio_0.5.26        fansi_0.4.2       crayon_1.4.1      dbplyr_2.1.1     
[37] withr_2.4.1       grid_4.0.4        jsonlite_1.7.2    gtable_0.3.0     
[41] lifecycle_1.0.0   DBI_1.1.1         magrittr_2.0.1    scales_1.1.1     
[45] zip_2.1.1         cli_2.3.1         stringi_1.5.3     fs_1.5.0         
[49] xml2_1.3.2        ellipsis_0.3.2    generics_0.1.0    vctrs_0.3.8      
[53] openxlsx_4.2.3    tools_4.0.4       bit64_4.0.5       glue_1.4.2       
[57] hms_1.0.0         colorspace_2.0-0  sessioninfo_1.1.1 rvest_1.0.0      
[61] haven_2.3.1      

 

asfimport commented 2 years ago

Jeanette Clark: I am seeing similar issues (but without the summarize). The crash only occurs after the join, filtering, and even collecting the entire (large) table works fine. Example I was working off of and session info are below. 

 


library(arrow)
library(tidyverse)
library(janitor)

dir_out <- tempdir()
iris_clean <- clean_names(iris)

write_dataset(iris_clean, file.path(dir_out, "iris"))

species_codes <- tribble(~species, ~code,
"setosa", "SET",
"versicolor", "VER",
"virginica", "VIR")

iris <- open_dataset(file.path(dir_out, "iris"))

left_join(iris, species_codes) %>% collect()

 


R version 4.1.2 (2021-11-01)
Platform: x86_64-w64-mingw32/x64 (64-bit)
Running under: Windows 10 x64 (build 19044)

Matrix products: default

locale:
[1] LC_COLLATE=English_United States.1252 LC_CTYPE=English_United States.1252 LC_MONETARY=English_United States.1252 LC_NUMERIC=C 
[5] LC_TIME=English_United States.1252

attached base packages:
[1] stats graphics grDevices utils datasets methods base

other attached packages:
[1] janitor_2.1.0 forcats_0.5.1 stringr_1.4.0 dplyr_1.0.7 purrr_0.3.4 readr_2.1.1 tidyr_1.1.4 tibble_3.1.6 ggplot2_3.3.5 tidyverse_1.3.1 arrow_6.0.1

loaded via a namespace (and not attached):
[1] tidyselect_1.1.1 xfun_0.29 haven_2.4.3 snakecase_0.11.0 colorspace_2.0-2 vctrs_0.3.8 generics_0.1.1 htmltools_0.5.2 yaml_2.2.1 utf8_1.2.2 
[11] rlang_0.4.12 pillar_1.6.4 glue_1.6.0 withr_2.4.3 DBI_1.1.2 bit64_4.0.5 dbplyr_2.1.1 modelr_0.1.8 readxl_1.3.1 lifecycle_1.0.1 
[21] munsell_0.5.0 gtable_0.3.0 cellranger_1.1.0 rvest_1.0.2 evaluate_0.14 knitr_1.37 tzdb_0.2.0 fastmap_1.1.0 fansi_1.0.0 broom_0.7.11 
[31] Rcpp_1.0.7 scales_1.1.1 backports_1.4.1 jsonlite_1.7.2 fs_1.5.2 bit_4.0.4 hms_1.1.1 digest_0.6.29 stringi_1.7.6 grid_4.1.2 
[41] cli_3.1.0 tools_4.1.2 magrittr_2.0.1 crayon_1.4.2 pkgconfig_2.0.3 ellipsis_0.3.2 xml2_1.3.3 reprex_2.0.1 lubridate_1.8.0 rstudioapi_0.13 
[51] assertthat_0.2.1 rmarkdown_2.11 httr_1.4.2 R6_2.5.1 compiler_4.1.2

 

 

asfimport commented 2 years ago

Neal Richardson / @nealrichardson: Other angles [~jclark] and I explored:

asfimport commented 2 years ago

Will Jones / @wjones127: [~jclark] thanks for that example repro. I get a crash just like you said in Arrow 6.0.0 (but don't on my Mac). I also tried in the dev version of Arrow, and instead get the following error:


> left_join(iris, species_codes) %>% collect()
Error: Invalid: ReplaceFieldsWithKnownValues called on an unbound Expression
C:/Users/voltron/arrow/cpp/src/arrow/compute/exec/expression.cc:957  ReplaceFieldsWithKnownValues(known_values, std::move(expr))
C:/Users/voltron/arrow/cpp/src/arrow/dataset/dataset.cc:158  SimplifyWithGuarantee(std::move(predicate), partition_expression_)
C:/Users/voltron/arrow/cpp/src/arrow/dataset/scanner.cc:792  dataset->GetFragments(scan_options->filter)

Looking at the traceback from the Arrow 6 crash it's not immediately obvious that it's the same error as above, but I will continue to look deeper.

asfimport commented 2 years ago

Will Jones / @wjones127: I've made some progress, but not yet found a fix. This issue isn't strictly linked to the Windows platform; instead it has do to with the setting options(arrow.use_threads = FALSE), which it doesn't seem the join implementation anticipated. That setting is FALSE on Windows by default, but TRUE on other platforms. I can reproduce this issue on Mac OS by setting it to FALSE.

Note that I can get the example to work on Windows by setting options(arrow.use_threads = TRUE), so that may be a viable workaround in some cases. However, other issues have been reported on Windows with the use_threads option, which is why it has been turned off for now on Windows.

asfimport commented 2 years ago

Sam Albers / @boshek: Some additional break crumbs. The examples (run on windows 10, with arrow 7.0.0) are wrapped in reprex::reprex because it catches the R crash nicely.

works by specifying join column


reprex::reprex(
  {
    library(arrow)
    library(dplyr)
    
    tf1 <- tempfile()
    dir.create(tf1)
    
    quakes %>% 
      select(stations, lat,long) %>% 
      write_dataset(tf1)
    
    tf2 <- tempfile()
    dir.create(tf2)
    
    quakes |> 
      select(stations, mag, depth) %>% 
      write_dataset(tf2)
    
    open_dataset(tf1) |> 
      left_join(open_dataset(tf2), by = "stations") |> 
      collect()
  }, session_info = TRUE, 
  std_out_err = TRUE
)

doesn't work when one of the joins dfs is in memory

 


reprex::reprex(
  {
    library(arrow)
    library(dplyr)
    
    tf1 <- tempfile()
    dir.create(tf1)
    
    quakes %>% 
      select(stations, lat,long) %>% 
      write_dataset(tf1)
    
    tf2 <- tempfile()
    dir.create(tf2)
    
    quakes |> 
      select(stations, mag, depth) %>% 
      write_dataset(tf2)
    
    b <- open_dataset(tf2) %>% collect()
    
    open_dataset(tf1) |> 
      left_join(b, by = "stations") |> 
      collect()
  }, session_info = TRUE, 
  std_out_err = TRUE
)

doesn't work when join column in not specified


reprex::reprex(
  {
    library(arrow)
    library(dplyr)
    
    tf1 <- tempfile()
    dir.create(tf1)
    
    quakes %>% 
      select(stations, lat,long) %>% 
      write_dataset(tf1)
    
    tf2 <- tempfile()
    dir.create(tf2)
    
    quakes |> 
      select(stations, mag, depth) %>% 
      write_dataset(tf2)
    
    open_dataset(tf1) |> 
      left_join(open_dataset(tf2)) |> 
      collect()
  }, session_info = TRUE, 
  std_out_err = TRUE
)

doesn't work join column is specified by that column is the partition


reprex::reprex(
  {
    library(arrow)
    library(dplyr)
    
    tf1 <- tempfile()
    dir.create(tf1)
    
    quakes %>% 
      select(stations, lat,long) %>% 
      group_by(stations) %>% 
      write_dataset(tf1)
    
    tf2 <- tempfile()
    dir.create(tf2)
    
    quakes |> 
      select(stations, mag, depth) %>% 
      group_by(stations) %>% 
      write_dataset(tf2)
    
    open_dataset(tf1) |> 
      left_join(open_dataset(tf2), by = "stations") |> 
      collect()
  }, session_info = TRUE, 
  std_out_err = TRUE
)

 

asfimport commented 2 years ago

Will Jones / @wjones127: Thanks @boshek .

To provide an update: We have identified the issue now and my fix is currently being reviewed. It will likely be included in a 7.0.1 release.

You may be able to workaround the issue by setting options(arrow.use_threads = TRUE), which will avoid this bug. (However, there are some cases where turning on threads on Windows cause other issues right now, hence why it's off by default on Windows.)

asfimport commented 2 years ago

Sam Albers / @boshek: @wjones127 that's great! Does that catch instances where the join columns are partitions? I don't see any tests for that and the reprexes above suggest that might be relevant.

asfimport commented 2 years ago

Weston Pace / @westonpace: Issue resolved by pull request 12339 https://github.com/apache/arrow/pull/12339

asfimport commented 2 years ago

Weston Pace / @westonpace: I went ahead and marked this closed with Will's change but feel free to reopen if either of you decide this doesn't cover all cases.

asfimport commented 2 years ago

Will Jones / @wjones127: Ah it actually does not. I don't think the problem is the partition columns per se; I think the fix i provided works for joining a dataset to an in-memory table, but not a dataset to a dataset. So need to add an additional test for that. Thanks for pointing that out! (y)

@westonpace With a dataset to dataset join we are getting a thread index of 2 now. I guess we have an IO thread per scanner, right? Probably need to modify our approach.

asfimport commented 2 years ago

Weston Pace / @westonpace: Yes, we would have an I/O thread per scanner in that case.

asfimport commented 2 years ago

Weston Pace / @westonpace: Well, we have 8 I/O threads (by default) and each scanner would use a different one of those 8.

asfimport commented 2 years ago

Sam Albers / @boshek: @wjones127 just to close the loop, your fix appears to have sorted out all the cases i outlined. Nice work! For the test, I was thinking something like this:

 


test_that("arrow dplyr query can join on a partitioned Dataset", {
  dir_out <- tempdir()
  iris_grouped <- group_by(iris, Species)
  write_dataset(iris_grouped, file.path(dir_out, "iris"))
  species_codes <- data.frame(
    Species = c("setosa", "versicolor", "virginica"),
    code = c("SET", "VER", "VIR")
  )

  iris <- open_dataset(file.path(dir_out, "iris"))
  res <- left_join(iris, species_codes) %>% collect() # We should not segfault here.
  expect_equal(nrow(res), 150)
})