apache / arrow

Apache Arrow is the universal columnar format and multi-language toolbox for fast data interchange and in-memory analytics
https://arrow.apache.org/
Apache License 2.0
14.48k stars 3.52k forks source link

[R] `as_arrow_table()` fails when called in `future_lapply()` with `plan(multisession)` #40231

Closed etiennebacher closed 8 months ago

etiennebacher commented 8 months ago

Describe the bug, including details regarding any error messages, version, and platform.

arrow produces invalid arrow_table when it is called in a multisession plan created by future (or its related packages). In the examples below, future_lapply() works when the plan is sequential but not when the plan is multisession.

library(arrow, warn.conflicts = FALSE)
library(future.apply)
#> Loading required package: future

plan(sequential)

future_lapply(1, \(x) as_arrow_table(iris))
#> [[1]]
#> Table
#> 150 rows x 5 columns
#> $Sepal.Length <double>
#> $Sepal.Width <double>
#> $Petal.Length <double>
#> $Petal.Width <double>
#> $Species <dictionary<values=string, indices=int8>>
#> 
#> See $metadata for additional Schema metadata
library(arrow, warn.conflicts = FALSE)
library(future.apply)
#> Loading required package: future

plan(multisession)

future_lapply(1, \(x) as_arrow_table(iris))
#> [[1]]
#> Table
#> Error: Invalid <Table>, external pointer to null

This failure with future_lapply() was originally found in polars: https://github.com/pola-rs/r-polars/issues/851

cc @eitsupi

Component(s)

R

Session info

Session info ``` r sessioninfo::session_info() #> ─ Session info ─────────────────────────────────────────────────────────────── #> setting value #> version R version 4.3.2 (2023-10-31 ucrt) #> os Windows 10 x64 (build 19044) #> system x86_64, mingw32 #> ui RTerm #> language (EN) #> collate English_Europe.utf8 #> ctype English_Europe.utf8 #> tz Europe/Paris #> date 2024-02-26 #> pandoc 3.1.1 @ C:/Program Files/RStudio/resources/app/bin/quarto/bin/tools/ (via rmarkdown) #> #> ─ Packages ─────────────────────────────────────────────────────────────────── #> package * version date (UTC) lib source #> arrow * 14.0.0.2 2023-12-02 [1] CRAN (R 4.3.2) #> assertthat 0.2.1 2019-03-21 [1] CRAN (R 4.3.0) #> bit 4.0.5 2022-11-15 [1] CRAN (R 4.3.0) #> bit64 4.0.5 2020-08-30 [1] CRAN (R 4.3.0) #> cli 3.6.2 2023-12-11 [1] CRAN (R 4.3.2) #> codetools 0.2-19 2023-02-01 [1] CRAN (R 4.3.2) #> digest 0.6.34 2024-01-11 [1] CRAN (R 4.3.2) #> evaluate 0.23 2023-11-01 [1] CRAN (R 4.3.2) #> fastmap 1.1.1 2023-02-24 [1] CRAN (R 4.3.0) #> fs 1.6.3 2023-07-20 [1] CRAN (R 4.3.1) #> future * 1.33.1 2023-12-22 [1] CRAN (R 4.3.2) #> future.apply * 1.11.1 2023-12-21 [1] CRAN (R 4.3.2) #> globals 0.16.2 2022-11-21 [1] CRAN (R 4.3.0) #> glue 1.7.0 2024-01-09 [1] CRAN (R 4.3.2) #> htmltools 0.5.7 2023-11-03 [1] CRAN (R 4.3.2) #> knitr 1.45 2023-10-30 [1] CRAN (R 4.3.1) #> lifecycle 1.0.4 2023-11-07 [1] CRAN (R 4.3.2) #> listenv 0.9.1 2024-01-29 [1] CRAN (R 4.3.2) #> magrittr 2.0.3 2022-03-30 [1] CRAN (R 4.3.0) #> parallelly 1.37.0 2024-02-14 [1] CRAN (R 4.3.2) #> purrr 1.0.2 2023-08-10 [1] CRAN (R 4.3.1) #> R.cache 0.16.0 2022-07-21 [1] CRAN (R 4.3.0) #> R.methodsS3 1.8.2 2022-06-13 [1] CRAN (R 4.3.0) #> R.oo 1.26.0 2024-01-24 [1] CRAN (R 4.3.2) #> R.utils 2.12.3 2023-11-18 [1] CRAN (R 4.3.2) #> R6 2.5.1 2021-08-19 [1] CRAN (R 4.3.0) #> reprex 2.1.0.9000 2024-01-12 [1] Github (tidyverse/reprex@33ccedf) #> rlang 1.1.3 2024-01-10 [1] CRAN (R 4.3.2) #> rmarkdown 2.25.3 2024-02-07 [1] Github (rstudio/rmarkdown@07e2a99) #> rstudioapi 0.15.0 2023-07-07 [1] CRAN (R 4.3.1) #> sessioninfo 1.2.2 2021-12-06 [1] CRAN (R 4.3.0) #> styler 1.10.2 2023-08-29 [1] CRAN (R 4.3.1) #> tidyselect 1.2.0 2022-10-10 [1] CRAN (R 4.3.0) #> tzdb 0.4.0 2023-05-12 [1] CRAN (R 4.3.0) #> vctrs 0.6.5.9000 2023-12-14 [1] Github (r-lib/vctrs@8bf5ba5) #> withr 3.0.0 2024-01-16 [1] CRAN (R 4.3.2) #> xfun 0.42 2024-02-08 [1] CRAN (R 4.3.2) #> yaml 2.3.8 2023-12-11 [1] CRAN (R 4.3.2) #> #> [1] C:/Users/etienne/AppData/Local/Programs/R/R-4.3.2/library #> #> ────────────────────────────────────────────────────────────────────────────── ```
etiennebacher commented 8 months ago

Actually, this looks like a documented limitation of future about exporting pointers: https://future.futureverse.org/articles/future-4-non-exportable-objects.html?q=pointer#packages-that-rely-on-external-pointers

Setting options(future.globals.onReference = "error") shows the error:

library(arrow, warn.conflicts = FALSE)
library(future.apply)
#> Loading required package: future

plan(multisession)

options(future.globals.onReference = "error")  
future_lapply(1, \(x) as_arrow_table(iris))
#> Error: Detected a non-exportable reference ('externalptr') in the value (of class 'list') of the resolved future
shikokuchuo commented 8 months ago

Hi @etiennebacher @eitsupi

It is true that objects backed by external pointers cannot be straightforwardly exported to parallel processes.

However {mirai} has devised a novel method, utilising the low level 'refhook' capability of R serialization itself, to allow such objects to be used transparently in the same way as other R objects.

This was originally devised for torch tensors: https://shikokuchuo.net/mirai/articles/torch.html

This has just been extended in the development version to support other object types such as Arrow.

library(arrow, warn.conflicts = FALSE)
library(mirai)
serialization(refhook = list(arrow::write_to_raw, arrow::read_ipc_stream), class = "ArrowTabular")
cl <- make_cluster(1)
parallel::parLapply(cl, 1, \(x) arrow::as_arrow_table(head(iris)))
#> [[1]]
#>   Sepal.Length Sepal.Width Petal.Length Petal.Width Species
#> 1          5.1         3.5          1.4         0.2  setosa
#> 2          4.9         3.0          1.4         0.2  setosa
#> 3          4.7         3.2          1.3         0.2  setosa
#> 4          4.6         3.1          1.5         0.2  setosa
#> 5          5.0         3.6          1.4         0.2  setosa
#> 6          5.4         3.9          1.7         0.4  setosa

The above aims to demonstrate something close to what you were attempting above.

It relies on registering the custom serialization and unserialisation functions. Here the return value is a 'tibble' as this is what is produced round-tripping read_ipc_stream(write_ipc_raw(x)) but as per the torch case, it is possible to get back the same type if this were perfect.

Using the native 'mirai' interface better demonstrates the possibilities, such as seamlessly moving Arrow objects in deeply nested structures:

library(arrow, warn.conflicts = FALSE)
library(mirai)
serialization(refhook = list(arrow::write_to_raw, arrow::read_ipc_stream), class = "ArrowTabular")
daemons(2)
#> [1] 2
m <- mirai(list(a = arrow::as_arrow_table(x), b = "some text"), x = arrow::as_arrow_table(head(iris)))
call_mirai(m)$data
#> $a
#>   Sepal.Length Sepal.Width Petal.Length Petal.Width Species
#> 1          5.1         3.5          1.4         0.2  setosa
#> 2          4.9         3.0          1.4         0.2  setosa
#> 3          4.7         3.2          1.3         0.2  setosa
#> 4          4.6         3.1          1.5         0.2  setosa
#> 5          5.0         3.6          1.4         0.2  setosa
#> 6          5.4         3.9          1.7         0.4  setosa
#> 
#> $b
#> [1] "some text"

Hope you find this useful. If you believe there is a better way to integrate with Arrow, please do let me know.

p.s. you'll need the dev versions installable from:

install.packages(c("nanonext", "mirai"), repos = "shikokuchuo.r-universe.dev")
etiennebacher commented 8 months ago

Thanks for the info @shikokuchuo. I'm unlikely to try this soon since I don't use future or mirai (the issue above was triggered by a user report) but that's good to know

HenrikBengtsson commented 7 months ago

FYI, you can apply those marshalling and unmarshalling operations manually, e.g.

library(parallel)
cl <- makeCluster(1)

.y <- parallel::parLapply(cl, X = 1, fun = function(x) {
  ## Create arrow table on parallel worker
  z <- arrow::as_arrow_table(head(iris))

  ## Marshal non-exportable objects
  arrow::write_to_raw(z)
})

## Unmarshal marshalled objects
y <- lapply(.y, FUN = arrow::read_ipc_stream)

print(y)
#> [[1]]
#> # A tibble: 6 × 5
#>   Sepal.Length Sepal.Width Petal.Length Petal.Width Species
#>          <dbl>       <dbl>        <dbl>       <dbl> <fct>  
#> 1          5.1         3.5          1.4         0.2 setosa 
#> 2          4.9         3            1.4         0.2 setosa 
#> 3          4.7         3.2          1.3         0.2 setosa 
#> 4          4.6         3.1          1.5         0.2 setosa 
#> 5          5           3.6          1.4         0.2 setosa 
#> 6          5.4         3.9          1.7         0.4 setosa 

FWIW, I've added this to https://future.futureverse.org/articles/future-4-non-exportable-objects.html#package-arrow