apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.3k stars 3.48k forks source link

[R] Dataset re-partitioning consumes considerable amount of memory #31705

Open asfimport opened 2 years ago

asfimport commented 2 years ago

A short background: I was trying to create a dataset from a big pile of csv files (couple of hundreds). In first step the csv were parsed and saved to parquet files because there were many inconsistencies between csv files. In a consequent step the dataset was re-partitioned using one column (code_key).

 


new_dataset <- open_dataset(
  temp_parquet_folder, 
  format = "parquet",
  unify_schemas = TRUE
  )
new_dataset |> 
  group_by(code_key) |> 
  write_dataset(
    folder_repartitioned_dataset, 
    format = "parquet"
  )

 

This re-partitioning consumed a considerable amount of memory (5 GB). 

Reporter: Zsolt Kegyes-Brassai / @kbzsl

Original Issue Attachments:

Note: This issue was originally created as ARROW-16320. Please see the migration documentation for further details.

asfimport commented 2 years ago

Weston Pace / @westonpace: How are you measuring memory used?

There is a known issue when scanning parquet that it uses more RAM than expected. 8.0.0 should behave a little more reliably. Exactly how RAM is expected depends on the structure of your input files. I'm working on documenting that this week. However, in general, I would estimate a few GB of process RAM to be needed for this operation.

I would not expect any process memory (e.g. RSS assigned to the process) to remain after the operation. If you are on Linux, we use jemalloc by default, and it is configured so you might need to wait up to 1 second for all the memory to be returned to the OS.

If you are measuring RAM with a tool like Linux's free then I would also expect you would see a large (potentially all) chunk of RAM move from the free column and into the buf/cache column. That would persist even after the repartitioning is done. However, that RAM should be "available" RAM and this is just kind of how the Linux disk cache works. I'd like to add a solution to do writes with direct I/O at some point which would avoid this.

asfimport commented 2 years ago

Zsolt Kegyes-Brassai / @kbzsl: Hi @westonpace. Thank you for your prompt answer.

Sorry, I forget to describe the environment: I am using a laptop with 64-bit win10, R 4.1.2 and quite up to date R packages (arrow 7.0.0). I am running my scripts from RStudio IDE. 

I was checking the memory utilization both in the RStudio environment pane and the windows task manager.  Both are showing around 5.6 GB memory utilization increase: in RStudio from 300 MB to 5.9 GB (the task manager is showing about 250 MB higher – most probably the memory occupied by the IDE).   There is no (new) visible object in the RStudio Environment which can be associated with this re-partitioning activity.

And this memory remained occupied until the RStudio session (or the R project) is closed.  I waited for 15 minutes before closing the IDE.

asfimport commented 2 years ago

Zsolt Kegyes-Brassai / @kbzsl: I tried to find what is causing this issue.

First, I saved my data as a non-partitioned dataset to simplify a bit. It resulted one 617 MB parquet file (having strings, timestamp and integers).

Then I tried to read and check the memory utilization. I run the code both in RStudio IDE and R Console (RGui).

The result were the same: a large amount of memory was occupied and not freed up until the IDE or Console was shut down (having 15+ minutes waiting time).

The occupied memory was much larger (11GB)  than R object size (700 MB) shown by {}obj_size(){}.

 


fs::file_size(here::here("db", "large_parquet", "part-0.parquet"))
#> 617M
a = arrow::read_parquet(here::here("db", "large_parquet", "part-0.parquet"))
lobstr::obj_size(a)
#> 721,474,112 B 

Here are some screenshots using the IDE

Rstudio_env.jpg

Rstudio_mem.jpg

and the RGui

Rgui_mem.jpg

asfimport commented 2 years ago

Weston Pace / @westonpace: What do you get from the following?


a = arrow::read_parquet(here::here("db", "large_parquet", "part-0.parquet"))
a$nbytes()

The nbytes function should print a pretty decent approximation of the C memory referenced by a.

lobstr::obj_size prints only the R memory used (I think).

fs::file_size is going to give you the size of the file, which is possibly encoded and compressed. Some parquet files can be much larger in memory than they are on disk. So it is not unheard of for a 620MB parquet file to end up occupying gigabytes in memory (11 GB seems a little extreme but within the realm of possibility)

asfimport commented 2 years ago

Zsolt Kegyes-Brassai / @kbzsl: It's an error message:


a$nbytes()
#> Error: attempt to apply non-function
#> In addition: Warning message:
#> Unknown or uninitialised column: `nbytes`

 

asfimport commented 2 years ago

Zsolt Kegyes-Brassai / @kbzsl: Hi @westonpace 

I tried to create a reproducible example.

In the first step I created a dummy dataset wit nearly 100 M rows, having different column types and missing data. When writing this dataset to a parquet file I realized, that even the write_parquet() consumes a large amount of memory which is not returned back.

Here is the data generation part:

 


library(tidyverse)
n = 99e6 + as.integer(1e6 * runif(n = 1))
# n = 1000
a = 
  tibble(
    key1 = sample(datasets::state.abb, size = n, replace = TRUE),
    key2 = sample(datasets::state.name, size = n, replace = TRUE),
    subkey1 = sample(LETTERS, size = n, replace = TRUE),
    subkey2 = sample(letters, size = n, replace = TRUE),
    value1 = runif(n = n),
    value2 = as.integer(1000 * runif(n = n)),
    time = as.POSIXct(1e8 * runif(n = n), tz = "UTC", origin = "2020-01-01")
  ) |> 
  mutate(
    subkey1 = if_else(key1 %in% c("WA", "WV", "WI", "WY"), 
                      subkey1, NA_character_),
    subkey2 = if_else(key2 %in% c("Washington", "West Virginia", "Wisconsin", "Wyoming"), 
                      subkey2, NA_character_),
  )
lobstr::obj_size(a)
#> 5,177,583,640 B

and the memory utilization after the dataset creation

100m_1_create.jpg

and writing to rds file


readr::write_rds(a, here::here("db", "test100m.rds"))

no visible memory utilization increase

100m_2_rds.jpg

and writing to parquet file 


arrow::write_parquet(a, here::here("db", "test100m.parquet"))

there is a drastic increase in memory utilization 10.6 GB -> 15 GB - just for writing the file

100m_3_parquet.jpg It looks that this memory amount consumed during writing the parquet file was not returned back even after 15 minutes.

My biggest concern is that the ability to handle datasets larger than the available memory seems increasingly remote. I consider that this is a critical bug, but it might happen that is affecting only me… as I don’t have possibility to test elsewhere.

asfimport commented 2 years ago

Zsolt Kegyes-Brassai / @kbzsl: And here is the result of the reading back these files.

 


a = readr::read_rds(here::here("db", "test100m.rds"))
lobstr::obj_size(a)
#> 5,177,583,640 B

 

100m_4_read_rds.jpg

 


a = arrow::read_parquet(here::here("db", "test100m.parquet"))
lobstr::obj_size(a)
#> 796,553,696 B

 

100m_5_read-parquet.jpg

This time there is no considerable difference in the memory utilization.

It’s a bit hard for me to understand when additional memory is used for parquet activities, but more important when this memory amount is returned and when is not (and what can trigger it).  

Sorry, I am a bit puzzled. It might happen that this is not an bug, just lack in my understanding.

asfimport commented 2 years ago

Weston Pace / @westonpace: The writing behavior you described seemed odd so I modified your script a little (and added a memory print which, sadly, will only work on Linux):


> 
> print_rss <- function() {
+   print(grep("vmrss", readLines("/proc/self/status"), ignore.case=TRUE, value=TRUE))
+ }
> 
> n = 99e6 + as.integer(1e6 * runif(n = 1))
> a = 
+   tibble(
+     key1 = sample(datasets::state.abb, size = n, replace = TRUE),
+     key2 = sample(datasets::state.name, size = n, replace = TRUE),
+     subkey1 = sample(LETTERS, size = n, replace = TRUE),
+     subkey2 = sample(letters, size = n, replace = TRUE),
+     value1 = runif(n = n),
+     value2 = as.integer(1000 * runif(n = n)),
+     time = as.POSIXct(1e8 * runif(n = n), tz = "UTC", origin = "2020-01-01")
+   ) |> 
+   mutate(
+     subkey1 = if_else(key1 %in% c("WA", "WV", "WI", "WY"), 
+                       subkey1, NA_character_),
+     subkey2 = if_else(key2 %in% c("Washington", "West Virginia", "Wisconsin", "Wyoming"), 
+                       subkey2, NA_character_),
+   )
> lobstr::obj_size(a)
5,171,792,240 B
> print("Memory usage after creating the tibble")
[1] "Memory usage after creating the tibble"
> print_rss()
[1] "VmRSS:\t 5159276 kB"
> 
> 
> readr::write_rds(a, here::here("db", "test100m.rds"))
> print("Memory usage after writing rds")
[1] "Memory usage after writing rds"
> print_rss()
[1] "VmRSS:\t 5161776 kB"
> 
> 
> arrow::write_parquet(a, here::here("db", "test100m.parquet"))
> print("Memory usage after writing parquet")
[1] "Memory usage after writing parquet"
> print_rss()
[1] "VmRSS:\t 8990620 kB"
> Sys.sleep(5)
> print("And after sleeping 5 seconds")
[1] "And after sleeping 5 seconds"
> print_rss()
[1] "VmRSS:\t 8990620 kB"
> print(gc())
            used   (Mb) gc trigger    (Mb)   max used   (Mb)
Ncells    892040   47.7    1749524    93.5    1265150   67.6
Vcells 647980229 4943.7 1392905158 10627.1 1240800333 9466.6
> Sys.sleep(5)
> print("And again after a garbage collection and 5 more seconds")
[1] "And again after a garbage collection and 5 more seconds"
> print_rss()
[1] "VmRSS:\t 5377900 kB"

Summarizing...


Create table
~5.15GB RAM used
Write RDS
~5.16GB RAM used
Write Parquet
~9GB RAM used
Wait 5 seconds
~9GB RAM used
Run garbage collection
Wait 5 seconds
~5.38GB RAM used

This doesn't seem terribly ideal. I think, after writing, some R objects are holding references (possibly transitively) to some shared pointers to record batches in C++. When the garbage collection runs those R objects are destroyed and the shared pointers (and buffers) can be freed.