Open paleolimbot opened 1 year ago
I spent some time thinking about the current conversion system in #279 and #285, and I think the best way to go about this would be to make convert_array_stream()
generic to match convert_array()
. There are more ways than just ALTREP that can benefit from having all chunks available at once. The addition of run-end encoded arrays to the C data spec (and nanoarrow) will make the chunk resolver somewhat essential and then the existing ALTREP string array can be made generic around a simple equivalent of a chunked array.
Hi,
Not sure if related, but just in case the debug information below is of interest. As a side note, not sure where rbind is used but dplyr::bind_rows should be strictly superior in terms of computation time.
What I am finding is that bind_rows is normally extremely fast, but when used in convert_array_stream with multiple batches (r debug browser code not shown), bind_rows slows down dramatically (factor of 10). But when I make a deep copy, bind_rows is fast again.
The reason for interest in bind_rows is because I was trying to speed up convert_array_stream with multiple batches, by first converting each batch to data.frame (super fast across all batches), then bind_rows in R, which again should have been very fast, but turned out slow without a deep copy.
Any idea what's causing the slow bind_rows and whether this strategy is viable way of speeding up convert_array_stream with multiple batches? Currently convert_array_stream is atrociously slow with high number of batches.
library(nanoarrow)
library(tictoc)
# generate some data
gen_df <- function(n) {
data.frame(w = uuid::UUIDgenerate(n = n))
}
N <- 50000
df_list <- lapply(seq(300), function(i) gen_df(N))
# bind_rows on list of tibbles is very fast, median time 200ms. rbind is hopeless 4 seconds.
microbenchmark::microbenchmark(times = 20, dplyr::bind_rows(df_list))
binded_df <- dplyr::bind_rows(df_list)
tic(); do.call(rbind, df_list); toc()
# single stream (instant)
single_stream <- as_nanoarrow_array_stream(binded_df)
tic()
single_converted <- convert_array_stream(single_stream)
toc()
single_stream$release()
# batched streams (3 seconds)
batches <- lapply(df_list, as_nanoarrow_array)
batched_stream <- basic_array_stream(batches)
tic()
batch_converted <- convert_array_stream(batched_stream)
toc()
batched_stream$release()
# diy convert (batch conversion is super fast, but the bind_rows now takes 3 seconds)
# code taken from convert_array_stream
tic()
to <- dplyr::slice(gen_df(1), 0)
tmp <- lapply(batches, function(b) .Call(nanoarrow:::nanoarrow_c_convert_array, b, to))
toc()
for(b in batches) {nanoarrow_pointer_release(b)}
tic()
diy_converted <- dplyr::bind_rows(tmp)
toc()
# create deep copy of the data frame list, where the vectors inside each tibble are deep copies
# then bind_rows becomes fast again (median 200 ms)
tmp_deepcopy <- tmp
for (i in seq(length(tmp_deepcopy))) {
tmp_deepcopy[[i]]$w <- c(tmp_deepcopy[[i]]$w)
}
stopifnot(lobstr::obj_addr(tmp[[1]]$w) != lobstr::obj_addr(tmp_deepcopy[[1]]$w))
microbenchmark::microbenchmark(times = 10, dplyr::bind_rows(tmp_deepcopy))
sessionInfo()
R version 4.3.2 (2023-10-31 ucrt)
Platform: x86_64-w64-mingw32/x64 (64-bit)
Running under: Windows 11 x64 (build 22621)
Matrix products: default
locale:
[1] LC_COLLATE=English_Australia.utf8 LC_CTYPE=English_Australia.utf8 LC_MONETARY=English_Australia.utf8 LC_NUMERIC=C
[5] LC_TIME=English_Australia.utf8
time zone: Australia/Sydney
tzcode source: internal
attached base packages:
[1] stats graphics grDevices datasets utils methods base
other attached packages:
[1] tictoc_1.2 nanoarrow_0.4.0
loaded via a namespace (and not attached):
[1] utf8_1.2.4 R6_2.5.1 microbenchmark_1.4.10 tidyselect_1.2.0 magrittr_2.0.3 glue_1.6.2
[7] tibble_3.2.1 pkgconfig_2.0.3 dplyr_1.1.4 generics_0.1.3 lifecycle_1.0.4 cli_3.6.1
[13] fansi_1.0.5 vctrs_0.6.4 renv_1.0.3 compiler_4.3.2 rstudioapi_0.15.0 tools_4.3.2
[19] pillar_1.9.0 lobstr_1.1.2 rlang_1.1.2 uuid_1.1-1
I'm not sure I can solve your problem; however, I'll explain what I think is happening and perhaps that will help.
# bind_rows on list of tibbles is very fast, median time 200ms. rbind is hopeless 4 seconds.
The reason that bind_rows()
is faster than convert_array_stream()
here is that the strings area already R strings. R has a global string pool and all of the strings are already R objects, so the "bind" operation is "just" copying pointers to existing strings. The "slow" operation -- and the reason why ALTREP strings are used so frequently -- is inserting turning a C string (series of bytes) into an R string (because it involves checking the global string pool to see if one exists, and if it does not, inserting it).
# single stream (instant)
The reason this is instantaneous is because it functionally is just wrapping the ArrowArray in an R object via ALTREP. This is very good if you're pretty sure you won't need all of those strings; it's somewhat slower if you know that you are going to turn all of those objects into R strings anyway. To get a sense of how long it takes to materialize all of those strings into R strings, you can run nanoarrow:::nanoarrow_altrep_force_materialize(single_converted, recursive = TRUE)
. For me that took about 4 seconds.
# batched streams (3 seconds)
The way that convert_array_stream()
currently handles this is to (1) collect all of the arrays into memory, (2) allocate one big long character()
, and (3) fill it in batch by batch. I would expect this to take the exact same amount of time as nanoarrow:::nanoarrow_altrep_force_materialize(single_converted, recursive = TRUE)
since I think it uses the same code to materialize the strings into R land.
# diy convert (batch conversion is super fast, but the bind_rows now takes 3 seconds)
Here, each individual batch is very fast because each individual character()
vector is converted via ALTREP, which does not attempt to insert any strings into the R global string pool. When bind_rows()
is called, all of those strings get inserted into the R global string pool in pretty much the exact same way as they would have been in your # batched streams
example.
# create deep copy of the data frame list, where the vectors inside each tibble are deep copies
# then bind_rows becomes fast again (median 200 ms)
This is not an accurate representation: your bind_rows()
call above resulted in all of the values in tmp
being "materialized": they are no longer ALTREP and are just thin wrappers around a regular R character vector. This is why your performance appears to increase; however, it's just because you're dealing with regular R vectors containing strings that have already been inserted into the global R string pool.
I hope that helps!
Thank you very much for the detailed response. I apologise for not learning about ALTREP before posting. What you said makes perfect sense.
I originally went down this path because I was using DBI and ADBI and adbcdrivermanager to read data from Snowflake. Previously I used DBI::dbReadTable which converted the nanoarrow batched stream to data.frame, taking hours to read 1GB tables. I've now realised the proper solution is to use DBI::dbReadTableArrow and keep data in arrow format, which is perfectly fine with me (actually more preferable since all i was doing is arrow::write_parquet. Now my data fetching plus write parquet via ADBC is 6 times faster than ODBC, perfect.
No problem! Depending on the day I'm not sure if I understand ALTREP either...
I'm glad ADBC is helping you out!
First noted in #66, it's fairly common to attempt a conversion of a stream with more than one batch to a data.frame. Currently this will convert one chunk at a time and
rbind()
orc()
everything together. This is slow and requires at least twice the memory.Related is the the "fixed size" converter path, which does a "preallocate + fill"; however this requires knowing the exact size before starting to pull batches which is almost never the case. The first bit could be solved by implementing the requisite copying functions to allow the pre-allocated vectors to be growable; however, that wouldn't allow for the individual components to be ALTREP...everything would be fully materialized.
The Arrow package handles this by a rather complicated implementation that has excellent type coverage: most chunked arrays can be wrapped in an ALTREP vector. Because we don't have Arrow C++ at our disposal, this is not practical here.
Somewhere in the middle is implementing a generic ALTREP vector of a concatenation: The "data" would be a
list()
of type-checked vectors (that could themselves be ALTREP); the ALTREP class would implement element access using something like theChunkResolver
sitting in Arrow C++.Independently of that, implementing ALTREP conversion for a single
ArrowArray
-- particularly the ones that can share memory like int32/double with no nulls -- would reduce another copy. For types that can't share memory, lazily converting via theArrowArrayViewGet()
functions is also an option.