OHDSI / Andromeda

AsynchroNous Disk-based Representation of MassivE DAta: An R package aimed at replacing ff for storing large data objects.
https://ohdsi.github.io/Andromeda/
11 stars 13 forks source link

Error: Out of memory when using arrow #57

Open schuemie opened 1 year ago

schuemie commented 1 year ago

It seems arrow doesn't play very nice with dplyr as it relates to memory. When I call batchApply() on the results of a dplyr query, this currently causes the result of the dplyr query to be written to a temp Andromeda object. However, for large data objects this causes my system (with 64GB of RAM) to run out of memory:

Error: Out of memory: malloc of size 4564638464 failed
17.
Table__from_ExecPlanReader(self)
16.
x$read_table()
15.
as_arrow_table.RecordBatchReader(out)
14.
as_arrow_table(out)
13.
plan$Run(final_node)
12.
as_adq(plan$Run(final_node))
11.
arrow::write_dataset(value, file.path(attr(x, "path"), i), format = "feather")
10.
.local(x, i, ..., value)
9.
`[[<-`(`*tmp*`, name, value = structure(list(.data = <environment>,
selected_columns = list(rowId = <environment>, stratumId = <environment>,
covariateId = <environment>, covariateValue = <environment>),
filtered_rows = TRUE, group_by_vars = character(0), drop_empty_groups = NULL, ...
8.
`[[<-`(`*tmp*`, name, value = structure(list(.data = <environment>,
selected_columns = list(rowId = <environment>, stratumId = <environment>,
covariateId = <environment>, covariateValue = <environment>),
filtered_rows = TRUE, group_by_vars = character(0), drop_empty_groups = NULL, ...
7.
`$<-`(`*tmp*`, "tbl", value = structure(list(.data = <environment>,
selected_columns = list(rowId = <environment>, stratumId = <environment>,
covariateId = <environment>, covariateValue = <environment>),
filtered_rows = TRUE, group_by_vars = character(0), drop_empty_groups = NULL, ...
6.
`$<-`(`*tmp*`, "tbl", value = structure(list(.data = <environment>,
selected_columns = list(rowId = <environment>, stratumId = <environment>,
covariateId = <environment>, covariateValue = <environment>),
filtered_rows = TRUE, group_by_vars = character(0), drop_empty_groups = NULL, ...
5.
Andromeda::batchApply(covariates, loadCovariates, batchSize = 1e+05)

This is a bit unfortunate because the whole idea of Andromeda is to handle objects that don't fit in memory. What is especially odd is that the data isn't even that big. If I collect the data, it is 17GB in memory.

I notice the same behavior when not writing to a temp Andromeda, but using as_record_batch_reader() instead, as implemented in PR https://github.com/OHDSI/Andromeda/pull/56. It seems that executing the dplyr query is what causes the massive memory use. The query in my test case is a simple sort.

schuemie commented 1 year ago

I've been able to simplify the issue to this code:

library(dplyr)
covariates <- arrow::open_dataset("d:/temp/vignetteSccsArrow/covariates", format = "feather")
x <- covariates %>%
  arrange(.data$covariateId, .data$stratumId, .data$rowId)
arrow::write_dataset(x, "d:/temp/test.arrow", format = "feather")
# R crashes

The covariates folder has 571 files, totaling 1.18GB. I would not expect this to cause me to run out of memory (when there's 64GB of RAM)

schuemie commented 1 year ago

If I resave the covariates arrow table as a single partition the performance is slightly better, with memory usage peaking at 50GB. That is still no ok, since I expect to do that in multiple threads.

In contrast, using RSQLite, I never exceed 0.5GB of memory usage, and it is about as fast as arrow (on this task):

# Collect covariates to RAM, then save as SQLite table:
covRam <- covariates %>% collect()
sqlite <- RSQLite::dbConnect(RSQLite::SQLite(), "d:/temp/test.sqlite")
DBI::dbWriteTable(conn = sqlite, name = "covariates", value = covRam)
DBI::dbDisconnect(sqlite)
rm(covRam)

# Open the SQLite database, sort the table, and process in batches:
sqlite <- RSQLite::dbConnect(RSQLite::SQLite(), "d:/temp/test.sqlite")
result <- DBI::dbSendQuery(sqlite, "SELECT * FROM covariates ORDER BY covariateId, stratumId, rowId;")
while (!DBI::dbHasCompleted(result)) {
  batch <- DBI::dbFetch(result, n = 100000)
  print(nrow(batch))
}

@ablack3 : This looks like a deal-breaker for the arrow implementation at this moment. There are actual studies I need to do that I can't do with the arrow implementation that run just fine with the SQLite implementation. There's a blog post here on how they have a better sort implementation in Rust. Maybe they'll port that over to R one day?

egillax commented 1 year ago

I did look into this a bit. You are correct the sort is done in memory. It accumulates the batches in memory and then sorts the data. Seems to be single threaded as well. They have this warning for the node that does this computation in the docs:

This node is a “pipeline breaker” and will fully materialize the dataset in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.

There's the same warning for their aggregate node (which would affect group_by's)

One of their main c++ developers on his personal roadmap for next year has an item to address these "pipeline breakers" :

One significant remaining challenge is getting some better tools in place for reducing runtime memory usage. This mostly equates to being smarter about scanning (in particular how we scan large row groups) and adding support for spilling to pipeline breakers (there is a promising PR for this that I have not yet been able to get around to). I would like to find time to address these things over the next year.

I did explore a bit other libraries that do spilling, among them the Rust implementation that is based on duckdb's approach. The only one I got to work though was duckdb with something like:

con <- DBI::dbConnect(duckdb::duckdb(), read_only=F)
dbExecute(con, "
          PRAGMA temp_directory='./tmp/';
          SET memory_limit='10GB' ;
          COPY (SELECT * from './parquetTestHuge/*.parquet' order by rowId, stratumId, covariateId ) 
          TO 'arrowTestResults/' (FORMAT PARQUET, PER_THREAD_OUTPUT 1,
          ALLOW_OVERWRITE 1);")
dbDisconnect(con, shutdown=TRUE)

I think we should at least make an issue in the arrow repo and explain our need for computations on larger-than-memory data and that currently the sorting (and maybe others operatins) are stopping us from adopting arrow. I think from their response we can see if this is coming in the near future or not.

ablack3 commented 1 year ago

@egillax Thanks for looking into this issue. Would you be willing to open that issue on arrow and link it here.

@schuemie I agree this does seem like a deal breaker for Andromeda-arrow. So I guess we stick with SQLite for now and keep the arrow branch alive as experimental.

Almost everything I do with arrow on my mac crashes R so it's not working great for me right now.

ablack3 commented 1 year ago

I'm thinking we move the arrow backend work to "exploratory" given all the issues we've uncovered. Seems too risky. People can still use the arrow branch if they want to try it. What do you all think @schuemie @egillax ?

egillax commented 1 year ago

@ablack3 I think that's a good idea. At least in regards to this issue that is a blocker for the transition.

I made an issue in the arrow repo a month ago. The response was this is one of the top priorities for acero which is the c++ query engine built on top of arrow, but they couldn't promise when they would have time for it. Recently someone has assigned himself the issue and seems to have started working on it.

schuemie commented 1 year ago

Agreed @ablack3 . It is a pity given all the work we put into this, but I don't see us moving forward with arrow until this is resolved.

ablack3 commented 1 year ago

Ok sounds good. Hopefully it will get resolved at some point. I'll keep the arrow branch alive. The issue with using arrow on Mac is also a major challenge for me.

ablack3 commented 1 year ago

@schuemie @egillax I'm most of the way through the arrow course and my initial thought is that we don't even need andromeda anymore. We can just use the arrow package directly. Works quite well. Out of memory sorting is a difficult problem but I would like to suggest that we use arrow as a general purpose tool, abandon the Andromeda wrapper, and implement the specific operations needed in each of the other package that depend on Andromeda (e.g. find alternatives to a general purpose out of memory sort). Is that crazy?

Another idea (orthogonal to/independent of the suggestion above) is to use both arrow and duckdb and pass data back and forth to work around the limitations of each. Both support different subsets of dplyr and moving data between them is almost zero cost.

schuemie commented 1 year ago

I certainly like arrow a lot, but there are two reasons why I prefer keeping the Andromed wrapper:

  1. It would require quite a lot of work to modify all HADES packages to use arrow directly instead of via the Andromeda wrapper.
  2. I deliberately made it so Andromeda objects behave like in-memory objects, in that when you save them, load them, and modify them, the saved object is not affected. I did that based on my experience with ff, where this was not the case, and that caused a lot of very hard to debug problems.

Do you have a suggestion for how to solve the out-of-memory problem using an alternative approach?

ablack3 commented 1 year ago

I'm fine with continuing development on Andromeda if you think the abstraction adds value. I think if I spent some time really understanding exactly what operations we need would help.

For out of memory sorting I think duckdb can do this so I might just add a special function that converts the data to duckdb, sort, then converts it back to arrow.

So maybe try something like this?

library(arrow)
library(duckdb)
library(dplyr)

nyc_taxi <- open_dataset(here::here("data/nyc-taxi")) # a large parquet file

con <- DBI::dbConnect(duckdb::duckdb(), here::here("tempdb.duckdb"))

result <- nyc_taxi %>% 
  to_duckdb(con = con) %>% 
  arrange(dropoff_datetime) %>% 
  # to_arrow() %>%  # not sure if you need to go back to arrow here
  head(10) %>% 
  collect()

result

It seems to work but takes a long time. I think my suggestion is to examine exactly where we need to sort and question whether we really need to sort a very large unpartitioned dataset. If we can partition or filter before the sort this would help speed up the operation. Yea my suggestion is to be more specific about the exact operations we need and try to avoid needing to sort a gigantic table. I think sorting will require at least one full pass through the data.

What about replacing sqlite with duckdb? That might be an easier change than moving from sqlite to arrow.

ablack3 commented 1 year ago

The Andromeda abstraction/interface is closely tied to the database backend and does not map so well onto arrow. For example you cannot "append rows" to a parquet file so appendToTable will need to be dropped from Andromeda. You have to rewrite the entire file. You also cannot add indexes. So if we switch to arrow all the hades code that uses these Andromeda functions will need to change anyway. There is no notion of closing a connection to an Arrow file system dataset. I think if we want to use arrow the andromeda exported will need to change. Duckdb on the other hand would be very similar to sqlite.

All andromeda references: https://github.com/search?q=org%3AOHDSI%20Andromeda%3A%3A&type=code

I still think it is worth exploring deprecating Andromeda but it requires a careful review of all the larger than memory operations required by Hades. And leaving Andromeda alone (sqlite) is also a good option since it requires no additional work.

ablack3 commented 1 year ago

Out of memory sort code locations:

Trying to identify all instances where out of memory sorting is used in Hades.

https://github.com/OHDSI/CohortDiagnostics/blob/85bae994c71aafade7598c7a2a5bfb3eb619e5b1/R/CohortRelationship.R#L161 https://github.com/OHDSI/CaseControl/blob/f491adbfb209c0159ea5ba84baeb5d423f5645da/R/SelectControls.R#L198

Well this search is not so quick and easy to do since I'm looking for places where arrange is called on an Andromeda object. But that's what I found so far.