apache / arrow

Apache Arrow is the universal columnar format and multi-language toolbox for fast data interchange and in-memory analytics
https://arrow.apache.org/
Apache License 2.0
14.6k stars 3.55k forks source link

[R] Filtering based on str_detect character columns with more than 4000 characters and occasional empty cells not working correctly when reading from disk with arrow #41175

Open giocomai opened 7 months ago

giocomai commented 7 months ago

Describe the bug, including details regarding any error messages, version, and platform.

I've spent a few hours trying to pinpoint exactly when this issue appears. The reprex below should make this clear.

The type of dataset creating this issue is a data frame with:

The issue appears only if:

Under these conditions, the filter returns an incomplete set of rows. If the same arrow connection is collected before filtering, then it returns the expected result.

Even if it returns an incomplete set of rows it throws no errors or warnings: the user will not notice unless they conduct additional tests.

In my real-world case, this happens with textual corpora; it seems to be happening more frequently (i.e. even if strings are shorter) with corpora with non-latin characters, but I haven't found the exact threshold.

Tested with both current version on CRAN as well as current development version, details in reprex below.

library("tibble")
library("dplyr")
library("stringr")
library("purrr")
library("arrow")

set.seed(1)

### Create a data frame with a column with long string, and another for testing grouping
rows <- 100
data_df <- purrr::map(.x = 1:rows,
                      .progress = TRUE,
                      .f = function(x) {
  tibble::tibble(text = paste(sample(x = c(letters, LETTERS),
                                     size = sample(0:10000, size = 1), replace = TRUE), collapse = ""),
                 category = sample(x = 1:10, size = 1))
}) |> 
  purrr::list_rbind()

### Add a few empty cells
data_df[["text"]][sample(c(TRUE, FALSE), size = rows, prob = c(0.05, 0.95), replace = TRUE)] <- ""

### Store in a temp folder
test_arrow_path <- file.path(tempdir(), "test_arrow")
write_dataset(dataset = data_df |> 
                dplyr::group_by(category),
              path = test_arrow_path)

### Read from temp folder
arrow_from_disk <- open_dataset(test_arrow_path)
### Read from memory
arrow_from_memory <- arrow_table(data_df |> 
                                   dplyr::group_by(category))

arrow_from_disk_filtered <- arrow_from_disk |> 
  filter(str_detect(string = text, pattern = "a"))

arrow_from_memory_filtered <- arrow_from_memory |> 
  filter(str_detect(string = text, pattern = "a"))

data_df |> 
  filter(str_detect(string = text, pattern = "a")) |> 
  nrow()
#> [1] 97

arrow_from_disk_filtered_n_rows <- arrow_from_disk_filtered |> 
  dplyr::collect() |>
  nrow()

arrow_from_disk_filtered_n_rows
#> [1] 78

arrow_from_memory_filtered_n_rows <- arrow_from_memory_filtered |> 
  dplyr::collect() |> 
  nrow()

arrow_from_memory_filtered_n_rows
#> [1] 97

## different number of rows, while they should be the same
arrow_from_memory_filtered_n_rows==arrow_from_disk_filtered_n_rows
#> [1] FALSE

## filter before collecting gives wrong result
arrow_from_disk |> 
  filter(str_detect(string = text, pattern = "a")) |> 
  dplyr::collect() |> 
  nrow()
#> [1] 78

## filter after collecting gives correct result
arrow_from_disk |> 
  dplyr::collect() |> 
  filter(str_detect(string = text, pattern = "a")) |> 
  nrow()
#> [1] 97

## write to disk but without grouping 

test_arrow_path_no_group <- file.path(tempdir(), "test_arrow_no_group")

write_dataset(dataset = data_df,
              path = test_arrow_path_no_group)

arrow_from_disk_no_group <- open_dataset(test_arrow_path_no_group)

arrow_from_disk_no_group |> 
  filter(str_detect(string = text, pattern = "a")) |> 
  dplyr::collect() |> 
  nrow()
#> [1] 0

arrow_from_disk_no_group |> 
  dplyr::collect() |> 
  filter(str_detect(string = text, pattern = "a")) |> 
  nrow()
#> [1] 97

arrow_from_disk_no_group |> 
  dplyr::collect() |> 
  filter(str_detect(string = text, pattern = "a")) |> 
  nrow()
#> [1] 97

arrow_from_disk_no_group |> 
  filter(grepl(x = text, "a")) |> 
  dplyr::collect() |> 
  nrow()
#> [1] 0

packageVersion("arrow")
#> [1] '15.0.2.9000'

sessionInfo()
#> R version 4.3.3 (2024-02-29)
#> Platform: x86_64-redhat-linux-gnu (64-bit)
#> Running under: Fedora Linux 38 (Workstation Edition)
#> 
#> Matrix products: default
#> BLAS/LAPACK: FlexiBLAS OPENBLAS-OPENMP;  LAPACK version 3.11.0
#> 
#> locale:
#>  [1] LC_CTYPE=en_IE.UTF-8       LC_NUMERIC=C              
#>  [3] LC_TIME=en_IE.UTF-8        LC_COLLATE=en_IE.UTF-8    
#>  [5] LC_MONETARY=en_IE.UTF-8    LC_MESSAGES=en_IE.UTF-8   
#>  [7] LC_PAPER=en_IE.UTF-8       LC_NAME=C                 
#>  [9] LC_ADDRESS=C               LC_TELEPHONE=C            
#> [11] LC_MEASUREMENT=en_IE.UTF-8 LC_IDENTIFICATION=C       
#> 
#> time zone: Europe/Rome
#> tzcode source: system (glibc)
#> 
#> attached base packages:
#> [1] stats     graphics  grDevices utils     datasets  methods   base     
#> 
#> other attached packages:
#> [1] arrow_15.0.2.9000 purrr_1.0.2       stringr_1.5.1     dplyr_1.1.4      
#> [5] tibble_3.2.1     
#> 
#> loaded via a namespace (and not attached):
#>  [1] vctrs_0.6.5       cli_3.6.2         knitr_1.46        rlang_1.1.3      
#>  [5] xfun_0.43         stringi_1.8.3     generics_0.1.3    assertthat_0.2.1 
#>  [9] bit_4.0.5         glue_1.7.0        htmltools_0.5.8.1 fansi_1.0.6      
#> [13] rmarkdown_2.26    evaluate_0.23     fastmap_1.1.1     yaml_2.3.8       
#> [17] lifecycle_1.0.4   compiler_4.3.3    fs_1.6.3          pkgconfig_2.0.3  
#> [21] rstudioapi_0.16.0 digest_0.6.35     R6_2.5.1          reprex_2.1.0     
#> [25] tidyselect_1.2.1  utf8_1.2.4        pillar_1.9.0      magrittr_2.0.3   
#> [29] bit64_4.0.5       tools_4.3.3       withr_3.0.0

Created on 2024-04-12 with reprex v2.1.0

Component(s)

R

giocomai commented 7 months ago

Here's a more revealing reprex. By creating a data frame with strings of growing length, the issue is much clearer.

Conditions for reproducing:

If all these conditions are met, then arrow returns an empty data frame.

If the dataset is stored partitioned, partitions where both conditions are met return zero rows, while other partitions return data as expected.

With non-ASCII characters, e.g. with cyrillic letters such as б, г, д, etc., the issue emerges with text size of just over 2000 characters (both here and above, I'd suspect the actual limits would be the classic 4096 and 2048).

A separate issue with non-ASCII characters that may lead to inconsistencies in testing related to how arrow parses regex (with re2, if I understand well). compared to standard stringr::str_detect(), hence same code with/without arrow may give different results. Mentioning here just in case this may be somehow related.

library("tibble")
library("dplyr")
library("stringr")
library("arrow")

set.seed(1)

data_df <- tibble::tibble(size = 2:10000) |> 
  dplyr::mutate(text = paste(c("a", 
                        sample(x = c(letters, LETTERS),
                               size = 10000,
                               replace = TRUE)),
                        collapse = "")) |> 
  dplyr::group_by(size) |> 
  dplyr::mutate(text = stringr::str_trunc(text, width = size, ellipsis = "")) |> 
  dplyr::mutate(category = round(size/10)) |> 
  dplyr::ungroup() |> 
  dplyr::group_by(category)

data_df[["text"]][sample(c(TRUE, FALSE), size = nrow(data_df), prob = c(0.1, 0.9), replace = TRUE)] <- ""

### Store in a temp folder
test_arrow_path <- file.path(tempdir(), "test_arrow")
write_dataset(dataset = data_df,
              path = test_arrow_path)

### Read from temp folder
arrow_from_disk <- open_dataset(test_arrow_path)
### Read from memory
arrow_from_memory <- arrow_table(data_df)

filtered_from_disk_df <- arrow_from_disk |> 
  dplyr::filter(stringr::str_detect(text, "a")) |> 
  dplyr::collect()
filtered_from_disk_df
#> # A tibble: 5,652 × 3
#>     size text                                                           category
#>    <int> <chr>                                                             <int>
#>  1   995 adMaHwQnrYGuuPTjgiouKOyTKKHPyRoGtIfjPLUtBtRwfNRyfMYPfxFnbSrvn…      100
#>  2   996 adMaHwQnrYGuuPTjgiouKOyTKKHPyRoGtIfjPLUtBtRwfNRyfMYPfxFnbSrvn…      100
#>  3   998 adMaHwQnrYGuuPTjgiouKOyTKKHPyRoGtIfjPLUtBtRwfNRyfMYPfxFnbSrvn…      100
#>  4  1000 adMaHwQnrYGuuPTjgiouKOyTKKHPyRoGtIfjPLUtBtRwfNRyfMYPfxFnbSrvn…      100
#>  5  1001 adMaHwQnrYGuuPTjgiouKOyTKKHPyRoGtIfjPLUtBtRwfNRyfMYPfxFnbSrvn…      100
#>  6  1002 adMaHwQnrYGuuPTjgiouKOyTKKHPyRoGtIfjPLUtBtRwfNRyfMYPfxFnbSrvn…      100
#>  7  1003 adMaHwQnrYGuuPTjgiouKOyTKKHPyRoGtIfjPLUtBtRwfNRyfMYPfxFnbSrvn…      100
#>  8  1004 adMaHwQnrYGuuPTjgiouKOyTKKHPyRoGtIfjPLUtBtRwfNRyfMYPfxFnbSrvn…      100
#>  9  1005 adMaHwQnrYGuuPTjgiouKOyTKKHPyRoGtIfjPLUtBtRwfNRyfMYPfxFnbSrvn…      100
#> 10    95 adMaHwQnrYGuuPTjgiouKOyTKKHPyRoGtIfjPLUtBtRwfNRyfMYPfxFnbSrvn…       10
#> # ℹ 5,642 more rows

filtered_from_memory_df <- arrow_from_memory |> 
  dplyr::filter(stringr::str_detect(text, "a")) |> 
  dplyr::collect()
filtered_from_memory_df
#> # A tibble: 9,000 × 3
#> # Groups:   category [1,001]
#>     size text        category
#>    <int> <chr>          <dbl>
#>  1     2 ad                 0
#>  2     3 adM                0
#>  3     4 adMa               0
#>  4     5 adMaH              0
#>  5     6 adMaHw             1
#>  6     7 adMaHwQ            1
#>  7     8 adMaHwQn           1
#>  8     9 adMaHwQnr          1
#>  9    10 adMaHwQnrY         1
#> 10    11 adMaHwQnrYG        1
#> # ℹ 8,990 more rows

dplyr::anti_join(filtered_from_memory_df,
                 filtered_from_disk_df,
                 by = "size") |> 
  dplyr::arrange(size)
#> # A tibble: 3,348 × 3
#> # Groups:   category [392]
#>     size text                                                           category
#>    <int> <chr>                                                             <dbl>
#>  1  4095 adMaHwQnrYGuuPTjgiouKOyTKKHPyRoGtIfjPLUtBtRwfNRyfMYPfxFnbSrvn…      410
#>  2  4097 adMaHwQnrYGuuPTjgiouKOyTKKHPyRoGtIfjPLUtBtRwfNRyfMYPfxFnbSrvn…      410
#>  3  4098 adMaHwQnrYGuuPTjgiouKOyTKKHPyRoGtIfjPLUtBtRwfNRyfMYPfxFnbSrvn…      410
#>  4  4099 adMaHwQnrYGuuPTjgiouKOyTKKHPyRoGtIfjPLUtBtRwfNRyfMYPfxFnbSrvn…      410
#>  5  4100 adMaHwQnrYGuuPTjgiouKOyTKKHPyRoGtIfjPLUtBtRwfNRyfMYPfxFnbSrvn…      410
#>  6  4101 adMaHwQnrYGuuPTjgiouKOyTKKHPyRoGtIfjPLUtBtRwfNRyfMYPfxFnbSrvn…      410
#>  7  4102 adMaHwQnrYGuuPTjgiouKOyTKKHPyRoGtIfjPLUtBtRwfNRyfMYPfxFnbSrvn…      410
#>  8  4103 adMaHwQnrYGuuPTjgiouKOyTKKHPyRoGtIfjPLUtBtRwfNRyfMYPfxFnbSrvn…      410
#>  9  4104 adMaHwQnrYGuuPTjgiouKOyTKKHPyRoGtIfjPLUtBtRwfNRyfMYPfxFnbSrvn…      410
#> 10  4105 adMaHwQnrYGuuPTjgiouKOyTKKHPyRoGtIfjPLUtBtRwfNRyfMYPfxFnbSrvn…      410
#> # ℹ 3,338 more rows

nrow(filtered_from_disk_df)==nrow(filtered_from_memory_df)

Created on 2024-04-15 with reprex v2.1.0

amoeba commented 7 months ago

Hi @giocomai, thank you for the report and all the investigation. I don't have a great guess as to what's happening yet but I'll take a look this week.

paleolimbot commented 6 months ago

I did a quick pass with the reprex (thank you!) and ensure that even with an identical query plan (except the source node), there are a different number of rows that are selected. The next step would be to reproduce in Python since the people who know how to fix it are better at debugging it there (I may get there in the next few minutes but just leaving this here in case I don't!).

``` r library("tibble") library("dplyr") #> #> Attaching package: 'dplyr' #> The following objects are masked from 'package:stats': #> #> filter, lag #> The following objects are masked from 'package:base': #> #> intersect, setdiff, setequal, union library("stringr") library("arrow") #> Warning: package 'arrow' was built under R version 4.3.3 #> #> Attaching package: 'arrow' #> The following object is masked from 'package:utils': #> #> timestamp set.seed(1) data_df <- tibble::tibble(size = 2:10000) |> dplyr::mutate(text = paste(c("a", sample(x = c(letters, LETTERS), size = 10000, replace = TRUE)), collapse = "")) |> dplyr::group_by(size) |> dplyr::mutate(text = stringr::str_trunc(text, width = size, ellipsis = "")) |> dplyr::mutate(category = round(size/10)) |> dplyr::ungroup() |> dplyr::group_by(category) data_df[["text"]][sample(c(TRUE, FALSE), size = nrow(data_df), prob = c(0.1, 0.9), replace = TRUE)] <- "" ### Store in a temp folder test_arrow_path <- file.path(tempdir(), "test_arrow") write_dataset(dataset = data_df, path = test_arrow_path) ### Read from temp folder arrow_from_disk <- open_dataset(test_arrow_path) ### Read from memory arrow_from_memory <- dplyr::compute(arrow_from_disk) # Used select(size, text) to ensure that the query plans were identical arrow_from_disk |> dplyr::filter(stringr::str_detect(text, "a")) |> dplyr::select(size, text) |> show_query() #> ExecPlan with 4 nodes: #> 3:SinkNode{} #> 2:ProjectNode{projection=[size, text]} #> 1:FilterNode{filter=match_substring_regex(text, {pattern="a", ignore_case=false})} #> 0:SourceNode{} arrow_from_memory |> dplyr::filter(stringr::str_detect(text, "a")) |> select(size, text) |> show_query() #> ExecPlan with 4 nodes: #> 3:SinkNode{} #> 2:ProjectNode{projection=[size, text]} #> 1:FilterNode{filter=match_substring_regex(text, {pattern="a", ignore_case=false})} #> 0:TableSourceNode{} arrow_from_disk |> dplyr::filter(stringr::str_detect(text, "a")) |> dplyr::select(size, text) |> nrow() #> [1] 5652 arrow_from_memory |> dplyr::filter(stringr::str_detect(text, "a")) |> select(size, text) |> nrow() #> [1] 9000 ``` Created on 2024-04-25 with [reprex v2.1.0](https://reprex.tidyverse.org)