Closed asfimport closed 2 years ago
Dragoș Moldovan-Grünfeld / @dragosmg:
Hey. Thanks for submitting the issue. I reorganised your small script a bit (see below) but it seems to be working for me. I tried to keep it as close to yours as possible. I changed the paths to point to my desktop. Also, when creating df
you referenced 2 objects ({}tabla1_arrow{
} and {}tabla2_arrow{
}) which don't exists so I assumed you were referring to df1_arrow
and {}df2_arrow{
}.
I tried both with the latest CRAN version (‘6.0.1’) and the development version (‘6.0.1.9000’) and it works with both.
A small note: I don't think purrr::walk()
is needed there, you can simply get away with calling your function with the desired inputs.
library(dplyr)
library(purrr)
library(arrow)
library(fs)
library(vroom)
library(glue)
data(iris)
temp_csv_file <- tempfile(fileext = ".csv")
temp_parquet_dir <- fs::dir_create(tempdir(), "/parquet")
write.csv(iris, temp_csv_file) # write csv file
# write parket file with write_chunk_data function (below)
# Run this function to write parquet files in this example please
write_chunk_data <- function(data_path, output_dir, chunk_size = 1000000) {
#If the output_dir do not exist, it is created
if (!fs::dir_exists(output_dir)) fs::dir_create(output_dir)
#It gets the name of the file
data_name <- fs::path_ext_remove(fs::path_file(data_path))
#It sets the chunk_num to 0
chunk_num <- 0
#Read the file using vroom
data_chunk <- vroom::vroom(data_path)
#It gets the variable names
data_names <- names(data_chunk)
#It gets the number of rows
rows<-nrow(data_chunk)
#The following loop creates a parquet file for every [chunk_size] rows
repeat{
#It checks if we are over the max rows
if(rows>(chunk_num+1)*chunk_size){
arrow::write_parquet(data_chunk[(chunk_num*chunk_size+1):((chunk_num+1)*chunk_size),],
fs::path(output_dir, glue::glue("{data_name}-{chunk_num}.parquet")))
}else{
arrow::write_parquet(data_chunk[(chunk_num*chunk_size+1):rows,],
fs::path(output_dir, glue::glue("{data_name}-{chunk_num}.parquet")))
break
}
chunk_num <- chunk_num + 1
}
#This is to recover some memory and space in the disk
rm(data_chunk)
tmp_file <- tempdir()
files <- list.files(tmp_file, full.names = T, pattern = "^vroom")
file.remove(files)
}
walk(temp_csv_file,
write_chunk_data, temp_parquet_dir, chunk_size = 50)
iris_arrow <- open_dataset(temp_parquet_dir)
df1_arrow <- iris_arrow %>%
select(`...1`, Sepal.Length, Sepal.Width, Petal.Length)
df2_arrow <- iris_arrow %>%
select(`...1`, Petal.Width, Species,)
df <- df1_arrow %>%
inner_join(df2_arrow, by = "...1") %>%
group_by(Species) %>%
summarise(prom = mean(Sepal.Length)) %>%
collect()
print(df)
Output:
# A tibble: 3 × 2
Species prom
<chr> <dbl>
1 setosa 5.01
2 virginica 6.59
3 versicolor 5.94
Dewey Dunnington / @paleolimbot: I also tried to replicate this using Dragos' reprex and couldn't (on MacOS, arrow C++ and R from the master branch). I wonder if this is a duplicate of ARROW-14908?
Dragoș Moldovan-Grünfeld / @dragosmg:
It could be. I was also testing on macOS. [~Zea]
would you be able to provide the output of utils::sessionInfo()
or {}devtools::session_info(){
}? My devtools
one is below:
─ Session info ──────────────────────────────────────────────────────────────────────────────────
setting value
version R version 4.1.2 (2021-11-01)
os macOS Monterey 12.1
system aarch64, darwin20
ui RStudio
language (EN)
collate en_US.UTF-8
ctype en_US.UTF-8
tz Europe/London
date 2022-02-01
rstudio 2021.09.0+351 Ghost Orchid (desktop)
pandoc NA
─ Packages ───────────────────────────────────────────────────────────────────────────────────────────────────────
package * version date (UTC) lib source
arrow * 6.0.1 2021-11-20 [1] CRAN (R 4.1.1)
assertthat 0.2.1 2019-03-21 [1] CRAN (R 4.1.0)
backports 1.4.0 2021-11-23 [1] CRAN (R 4.1.1)
bit 4.0.4 2020-08-04 [1] CRAN (R 4.1.1)
bit64 4.0.5 2020-08-30 [1] CRAN (R 4.1.0)
broom 0.7.10 2021-10-31 [1] CRAN (R 4.1.1)
cachem 1.0.6 2021-08-19 [1] CRAN (R 4.1.1)
callr 3.7.0 2021-04-20 [1] CRAN (R 4.1.0)
cellranger 1.1.0 2016-07-27 [1] CRAN (R 4.1.0)
cli 3.1.0 2021-10-27 [1] CRAN (R 4.1.1)
codetools 0.2-18 2020-11-04 [1] CRAN (R 4.1.0)
colorspace 2.0-2 2021-06-24 [1] CRAN (R 4.1.1)
crayon 1.4.2 2021-10-29 [1] CRAN (R 4.1.1)
DBI 1.1.2 2021-12-20 [1] CRAN (R 4.1.1)
dbplyr 2.1.1 2021-04-06 [1] CRAN (R 4.1.0)
desc 1.4.0 2021-09-28 [1] CRAN (R 4.1.1)
devtools * 2.4.3 2021-11-30 [1] CRAN (R 4.1.1)
dplyr * 1.0.7 2021-06-18 [1] CRAN (R 4.1.0)
ellipsis 0.3.2 2021-04-29 [1] CRAN (R 4.1.0)
fansi 1.0.0 2022-01-10 [1] CRAN (R 4.1.2)
fastmap 1.1.0 2021-01-25 [1] CRAN (R 4.1.0)
flow * 0.0.2 2021-08-13 [1] CRAN (R 4.1.1)
forcats * 0.5.1 2021-01-27 [1] CRAN (R 4.1.1)
fs 1.5.2 2021-12-08 [1] CRAN (R 4.1.1)
generics 0.1.1 2021-10-25 [1] CRAN (R 4.1.1)
ggplot2 * 3.3.5 2021-06-25 [1] CRAN (R 4.1.1)
glue 1.6.0 2021-12-17 [1] CRAN (R 4.1.1)
gtable 0.3.0 2019-03-25 [1] CRAN (R 4.1.1)
haven 2.4.3 2021-08-04 [1] CRAN (R 4.1.1)
here * 1.0.1 2020-12-13 [1] CRAN (R 4.1.0)
highlite 0.0.0.9000 2021-12-10 [1] Github (jimhester/highlite@767b122)
hms 1.1.1 2021-09-26 [1] CRAN (R 4.1.1)
httr 1.4.2 2020-07-20 [1] CRAN (R 4.1.0)
jsonlite 1.7.2 2020-12-09 [1] CRAN (R 4.1.0)
lifecycle 1.0.1 2021-09-24 [1] CRAN (R 4.1.1)
lookup * 0.0.0.9000 2021-12-10 [1] Github (jimhester/lookup@eba63db)
lubridate * 1.8.0 2021-10-07 [1] CRAN (R 4.1.1)
magrittr 2.0.1 2020-11-17 [1] CRAN (R 4.1.0)
memoise 2.0.1 2021-11-26 [1] CRAN (R 4.1.1)
modelr 0.1.8 2020-05-19 [1] CRAN (R 4.1.0)
munsell 0.5.0 2018-06-12 [1] CRAN (R 4.1.0)
pak * 0.2.0 2021-12-01 [1] CRAN (R 4.1.1)
pillar 1.6.4 2021-10-18 [1] CRAN (R 4.1.1)
pkgbuild 1.3.1 2021-12-20 [1] CRAN (R 4.1.2)
pkgconfig 2.0.3 2019-09-22 [1] CRAN (R 4.1.0)
pkgload 1.2.4 2021-11-30 [1] CRAN (R 4.1.1)
prettyunits 1.1.1 2020-01-24 [1] CRAN (R 4.1.0)
processx 3.5.2 2021-04-30 [1] CRAN (R 4.1.0)
prompt * 1.0.1 2021-12-10 [1] Github (gaborcsardi/prompt@7ef0f2e)
ps 1.6.0 2021-02-28 [1] CRAN (R 4.1.0)
purrr * 0.3.4 2020-04-17 [1] CRAN (R 4.1.0)
R6 2.5.1 2021-08-19 [1] CRAN (R 4.1.1)
Rcpp 1.0.7 2021-07-07 [1] CRAN (R 4.1.0)
readr * 2.1.1 2021-11-30 [1] CRAN (R 4.1.1)
readxl 1.3.1 2019-03-13 [1] CRAN (R 4.1.0)
remotes 2.4.2 2021-11-30 [1] CRAN (R 4.1.1)
reprex * 2.0.1 2021-08-05 [1] CRAN (R 4.1.1)
rlang 0.4.12 2021-10-18 [1] CRAN (R 4.1.1)
rprojroot 2.0.2 2020-11-15 [1] CRAN (R 4.1.0)
rsthemes * 0.3.1 2021-12-10 [1] Github (gadenbuie/rsthemes@bbe73ca)
rstudioapi 0.13 2020-11-12 [1] CRAN (R 4.1.0)
rvest 1.0.2 2021-10-16 [1] CRAN (R 4.1.1)
scales 1.1.1 2020-05-11 [1] CRAN (R 4.1.0)
sessioninfo 1.2.2 2021-12-06 [1] CRAN (R 4.1.1)
stringi 1.7.6 2021-11-29 [1] CRAN (R 4.1.1)
stringr * 1.4.0 2019-02-10 [1] CRAN (R 4.1.1)
testthat * 3.1.1 2021-12-03 [1] CRAN (R 4.1.1)
tibble * 3.1.6 2021-11-07 [1] CRAN (R 4.1.1)
tidyr * 1.1.4 2021-09-27 [1] CRAN (R 4.1.1)
tidyselect 1.1.1 2021-04-30 [1] CRAN (R 4.1.0)
tidyverse * 1.3.1 2021-04-15 [1] CRAN (R 4.1.0)
tzdb 0.2.0 2021-10-27 [1] CRAN (R 4.1.1)
usethis * 2.1.5 2021-12-09 [1] CRAN (R 4.1.1)
utf8 1.2.2 2021-07-24 [1] CRAN (R 4.1.0)
vctrs 0.3.8 2021-04-29 [1] CRAN (R 4.1.0)
vroom 1.5.7 2021-11-30 [1] CRAN (R 4.1.1)
withr 2.4.3 2021-11-30 [1] CRAN (R 4.1.1)
xml2 1.3.3 2021-11-30 [1] CRAN (R 4.1.1)
[1] /Library/Frameworks/R.framework/Versions/4.1-arm64/Resources/library
Nicola Crane / @thisisnic: I've tried to replicate this (Dragos' reformulation) on Ubuntu 20.04 with R 4.1.2 and Arrow 6.0.1 but had no luck - code ran fine.
Hi dear arrow developers. I tested inner_join with arrow R package but R crashed, this is my example with toy dataset iris:
data(iris) write.csv(iris, "iris.csv") # write csv file
write parket file with write_chunk_data function (below)
walk("C:/Users/Stats/Desktop/ejemplo_join/iris.csv", write_chunk_data, "C:/Users/Stats/Desktop/ejemplo_join/parquet", chunk_size = 50)
iris_arrow <- open_dataset("parquet")
df1_arrow <- iris_arrow %>% select(
...1
, Sepal.Length, Sepal.Width, Petal.Length) df2_arrow <- iris_arrow %>% select(...1
, Petal.Width, Species,) ddf <- tabla1_arrow %>% inner_join(tabla2_arrow, by = "...1") %>%
group_by(Species) %>% summarise(prom = mean(Sepal.Length)) %>% collect() print(df)
Run this function to write parquet files in this example please
write_chunk_data <- function(data_path, output_dir, chunk_size = 1000000) { #If the output_dir do not exist, it is created if (!fs::dir_exists(output_dir)) fs::dir_create(output_dir) #It gets the name of the file data_name <- fs::path_ext_remove(fs::path_file(data_path)) #It sets the chunk_num to 0 chunk_num <- 0 #Read the file using vroom data_chunk <- vroom::vroom(data_path) #It gets the variable names data_names <- names(data_chunk) #It gets the number of rows rows<-nrow(data_chunk) #The following loop creates a parquet file for every [chunk_size] rows repeat{ #It checks if we are over the max rows if(rows>(chunk_num+1)*chunk_size)
{ arrow::write_parquet(data_chunk[(chunk_num*chunk_size+1):((chunk_num+1)*chunk_size),], fs::path(output_dir, glue::glue(" {data_name}
-{chunk_num}.parquet"))) } else
{ arrow::write_parquet(data_chunk[(chunk_num*chunk_size+1):rows,], fs::path(output_dir, glue::glue(" {data_name}
-{chunk_num}.parquet"))) break } chunk_num <- chunk_num + 1 } #This is to recover some memory and space in the disk rm(data_chunk) tmp_file <- tempdir() files <- list.files(tmp_file, full.names = T, pattern = "^vroom") file.remove(files) }
Reporter: José F
Related issues:
Note: This issue was originally created as ARROW-15397. Please see the migration documentation for further details.