apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.34k stars 3.48k forks source link

[R] Error passing data to/from DuckDB - "NotImplemented: Call to R (SafeRecordBatchReader::ReadNext()) from a non-R thread from an unsupported context" #37302

Open thisisnic opened 1 year ago

thisisnic commented 1 year ago

Describe the bug, including details regarding any error messages, version, and platform.

I'm testing out some code for a workshop on a fresh install (all packages just downloaded, Arrow built from source from 13.0.0 release candidate 3 branch) and get the following error:

library(dplyr)
devtools::load_all("~/arrow/r")

# data downloaded from the Voltron Daat S3 bucket
nyc_taxi <- open_dataset(here::here("data/nyc-taxi/year=2019/"))
nyc_taxi %>%
  filter(month == 1) %>%
  to_duckdb() %>%
  mutate(mean_distance = mean(trip_distance)) %>%
  to_arrow() %>%
  filter(trip_distance < mean_distance) %>%
  nrow()

Error: NotImplemented: Call to R (SafeRecordBatchReader::ReadNext()) from a non-R thread from an unsupported context
/home/nic/arrow/cpp/src/arrow/record_batch.h:257  ReadNext(&batch)

sessionInfo() output:

> sessionInfo()
R version 4.2.2 Patched (2022-11-10 r83330)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Ubuntu 23.04

Matrix products: default
BLAS:   /usr/lib/x86_64-linux-gnu/blas/libblas.so.3.11.0
LAPACK: /usr/lib/x86_64-linux-gnu/lapack/liblapack.so.3.11.0

locale:
 [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C               LC_TIME=en_US.UTF-8        LC_COLLATE=C              
 [5] LC_MONETARY=en_US.UTF-8    LC_MESSAGES=en_US.UTF-8    LC_PAPER=en_US.UTF-8       LC_NAME=C                 
 [9] LC_ADDRESS=C               LC_TELEPHONE=C             LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C       

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] duckdb_0.8.1-1  DBI_1.1.3       arrow_13.0.0    tictoc_1.2      testthat_3.1.10 dplyr_1.1.2    

loaded via a namespace (and not attached):
 [1] tidyselect_1.2.0  remotes_2.4.2.1   purrr_1.0.2       vctrs_0.6.3       generics_0.1.3    miniUI_0.1.1.1    htmltools_0.5.6  
 [8] usethis_2.2.2     blob_1.2.4        utf8_1.2.3        rlang_1.1.1       pkgbuild_1.4.2    later_1.3.1       pillar_1.9.0     
[15] urlchecker_1.0.1  glue_1.6.2        withr_2.5.0       dbplyr_2.3.3      bit64_4.0.5       sessioninfo_1.2.2 lifecycle_1.0.3  
[22] stringr_1.5.0     devtools_2.4.5    htmlwidgets_1.6.2 memoise_2.0.1     callr_3.7.3       fastmap_1.1.1     httpuv_1.6.11    
[29] ps_1.7.5          fansi_1.0.4       Rcpp_1.0.11       xtable_1.8-4      promises_1.2.1    cachem_1.0.8      desc_1.4.2       
[36] pkgload_1.3.2.1   mime_0.12         fs_1.6.3          bit_4.0.5         brio_1.1.3        digest_0.6.33     stringi_1.7.12   
[43] processx_3.8.2    shiny_1.7.4.1     rprojroot_2.0.3   here_1.0.1        cli_3.6.1         tools_4.2.2       magrittr_2.0.3   
[50] tibble_3.2.1      profvis_0.3.8     crayon_1.5.2      pkgconfig_2.0.3   ellipsis_0.3.2    prettyunits_1.1.1 timechange_0.2.0 
[57] lubridate_1.9.2   assertthat_0.2.1  rstudioapi_0.15.0 R6_2.5.1          compiler_4.2.2   

Component(s)

R

thisisnic commented 1 year ago

A much simpler reprex:

library(arrow)
library(dplyr)
tf <- tempfile()
dir.create(tf)
write_dataset(group_by(mtcars, am), tf)
open_dataset(tf) |>
  filter(cyl == 6) %>%
  to_duckdb() %>%
  mutate(mean_hp = mean(hp)) %>%
  to_arrow() %>%
  filter(hp < mean_hp) %>%
  nrow()
paleolimbot commented 1 year ago

I think this is a peculiarity of our dim.arrow_dplyr_query() implementation, which uses Scanner$CountRows(). For example, a regular collect() works even though dim() doesn't:

library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
library(dplyr, warn.conflicts = FALSE)

tf <- tempfile()
dir.create(tf)
write_dataset(group_by(mtcars, am), tf)

# fine?
open_dataset(tf) |>
  filter(cyl == 6) %>%
  to_duckdb() %>%
  mutate(mean_hp = mean(hp)) %>%
  to_arrow() %>%
  filter(hp < mean_hp) %>%
  collect()
#> Warning: Missing values are always removed in SQL aggregation functions.
#> Use `na.rm = TRUE` to silence this warning
#> This warning is displayed once every 8 hours.
#> # A tibble: 4 × 12
#>     mpg   cyl  disp    hp  drat    wt  qsec    vs  gear  carb    am mean_hp
#>   <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <int>   <dbl>
#> 1  21.4     6   258   110  3.08  3.22  19.4     1     3     1     0    122.
#> 2  18.1     6   225   105  2.76  3.46  20.2     1     3     1     0    122.
#> 3  21       6   160   110  3.9   2.62  16.5     0     4     4     1    122.
#> 4  21       6   160   110  3.9   2.88  17.0     0     4     4     1    122.

# fine
open_dataset(tf) |>
  filter(cyl == 6) %>%
  to_duckdb() %>%
  mutate(mean_hp = mean(hp)) %>%
  to_arrow() %>%
  filter(hp < mean_hp) %>%
  collect()
#> # A tibble: 4 × 12
#>     mpg   cyl  disp    hp  drat    wt  qsec    vs  gear  carb    am mean_hp
#>   <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <int>   <dbl>
#> 1  21.4     6   258   110  3.08  3.22  19.4     1     3     1     0    122.
#> 2  18.1     6   225   105  2.76  3.46  20.2     1     3     1     0    122.
#> 3  21       6   160   110  3.9   2.62  16.5     0     4     4     1    122.
#> 4  21       6   160   110  3.9   2.88  17.0     0     4     4     1    122.

# error
open_dataset(tf) |>
  filter(cyl == 6) %>%
  to_duckdb() %>%
  mutate(mean_hp = mean(hp)) %>%
  to_arrow() %>%
  filter(hp < mean_hp) %>%
  dim()
#> Error: NotImplemented: Call to R (SafeRecordBatchReader::ReadNext()) from a non-R thread from an unsupported context

The error traceback:

Error: NotImplemented: Call to R (SafeRecordBatchReader::ReadNext()) from a non-R thread from an unsupported context
dataset___Scanner__CountRows(self) at dataset-scan.R#85
Scanner$create(x)$CountRows() at dplyr.R#186
dim.arrow_dplyr_query(x)
dim(x)
nrow(.)

The workaround would be to use count() and pull(n). This works because executing an exec plan is one of the "supported contexts" for SafeCallIntoR() (calling a Scanner method is not, and probably shouldn't be since as far as I know the Scanner methods are all implementable using an exec plan).

library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
library(dplyr, warn.conflicts = FALSE)

tf <- tempfile()
dir.create(tf)
write_dataset(group_by(mtcars, am), tf)

open_dataset(tf) |>
  filter(cyl == 6) %>%
  to_duckdb() %>%
  mutate(mean_hp = mean(hp)) %>%
  to_arrow() %>%
  filter(hp < mean_hp) %>%
  count() |> 
  pull(n)
#> Warning: Missing values are always removed in SQL aggregation functions.
#> Use `na.rm = TRUE` to silence this warning
#> This warning is displayed once every 8 hours.
#> Warning: Default behavior of `pull()` on Arrow data is changing. Current behavior of returning an R vector is deprecated, and in a future release, it will return an Arrow `ChunkedArray`. To control this:
#> ℹ Specify `as_vector = TRUE` (the current default) or `FALSE` (what it will change to) in `pull()`
#> ℹ Or, set `options(arrow.pull_as_vector)` globally
#> This warning is displayed once every 8 hours.
#> [1] 4

A more permanent solution would be to reimplement dim() for a dplyr query using an exec plan.