apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.16k stars 3.45k forks source link

[R][C++] Arrow crashes (OOM) on R client with large remote parquet files #30594

Open asfimport opened 2 years ago

asfimport commented 2 years ago

The below should be a reproducible crash:


library(arrow)
library(dplyr)
server <- arrow::s3_bucket("ebird",endpoint_override = "minio.cirrus.carlboettiger.info")

path <- server$path("Oct-2021/observations")
obs <- arrow::open_dataset(path)

path$ls() # observe -- 1 parquet file

obs %>% count() # CRASH

obs %>% to_duckdb() # also crash

I have attempted to split this large (~100 GB parquet file) into some smaller files, which helps:


path <- server$path("partitioned")
obs <- arrow::open_dataset(path)
obs$ls() # observe, multiple parquet files now
obs %>% count() 

(These parquet files have also been created by arrow, btw, from a single large csv file provided by the original data provider (eBird).  Unfortunately generating the partitioned versions is cumbersome as the data is very unevenly distributed, there's few columns that can avoid creating 1000s of parquet partition files and even so the bulk of the 1-billion rows fall within the same group.  But all the same I think this is a bug as there's no indication why arrow cannot handle a single 100GB parquet file I think?). 

 

Let me know if I can provide more info! I'm testing in R with latest CRAN version of arrow on a machine with 200 GB RAM. 

Reporter: Carl Boettiger / @cboettig

Related issues:

Note: This issue was originally created as ARROW-15081. Please see the migration documentation for further details.

asfimport commented 2 years ago

Weston Pace / @westonpace: I agree this should work. I'll have to look at how we have count implemented as I believe we shouldn't even have to look at the data in that case and I thought we had some special paths in place for this.

asfimport commented 2 years ago

Dewey Dunnington / @paleolimbot: There was another user who reported an issue with count on a parquet file that seems to have been fixed in the development version (which is about to be released to CRAN). Perhaps ARROW-15201 is the same issue?

If it is not, when I try to reproduce the above I get an error (see below). Is there a more recent bucket with the files we can use to reproduce?


library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

server <- arrow::s3_bucket(
  "ebird",
  endpoint_override = "minio.cirrus.carlboettiger.info"
)

path <- server$path("Oct-2021/observations")
path$ls()
#> Error: IOError: Path does not exist 'ebird/Oct-2021/observations'
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/filesystem/s3fs.cc:1913  collector.Finish(this)
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/filesystem/s3fs.cc:2275  impl_->Walk(select, base_path.bucket, base_path.key, &results)
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/filesystem/filesystem.cc:341  base_fs_->GetFileInfo(selector)
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/filesystem/filesystem.cc:341  base_fs_->GetFileInfo(selector)

path <- server$path("partitioned")
path$ls()
#> Error: IOError: Path does not exist 'ebird/partitioned'
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/filesystem/s3fs.cc:1913  collector.Finish(this)
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/filesystem/s3fs.cc:2275  impl_->Walk(select, base_path.bucket, base_path.key, &results)
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/filesystem/filesystem.cc:341  base_fs_->GetFileInfo(selector)
#> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/filesystem/filesystem.cc:341  base_fs_->GetFileInfo(selector)
asfimport commented 2 years ago

Carl Boettiger / @cboettig: sorry about that, you caught me at a time I was doing a few server upgrades.  Should be back up in a few hours.

asfimport commented 2 years ago

Carl Boettiger / @cboettig: should be back up now.  Just a note I need to use collect() as well after count() to cause the OOM crash (on arrow 6.0.1). 

asfimport commented 2 years ago

Carl Boettiger / @cboettig: Just tested with the nightly build,  ‘7.0.0.20220208’ and arrow no longer crashes on the above example.  yay!  I do note that RStudio reports that it is already using 12 GB after running open_dataset, which goes up to 24 GB during/after running count() %>% compute().  Not sure if that's expected (i.e. maybe arrow knows 24 GB is okay on this system?).  I'm running inside a Docker container that is capped at 35 GB, though on a machine that has a total of 65 GB.  I haven't tested on a more restricted container, but I'm curious what the minimum RAM requirements would be for working with data like this.

asfimport commented 2 years ago

Carl Boettiger / @cboettig: One more note: even the nightly arrow, I can't go very far without OOM errors still crashing my system when working with this dataset.  For instance:


obs %>% count(country_code, sort=TRUE) %>% collect()

crashes my system in the example shown above. 

asfimport commented 2 years ago

Weston Pace / @westonpace: There is definitely something going on here. When I run this on a 1GB dataset stored on local disk I can verify that we only read ~29MB out of 1GB (which still seems a little high). Extrapolating I would expect to see something on the order of ~300MB for a 100GB file. Instead, when running against Carl's dataset, I see RAM hit 24GB before hitting the RAM limit on my system and crashing.

Even just opening the dataset seems to take quite a while. I verified that the execution plan is the same whether we are using S3 or local disk. It is:


ExecPlan with 5 nodes:
4:SinkNode{}
  3:ProjectNode{projection=[n]}
    2:ScalarAggregateNode{aggregates=[
    sum(n, {skip_nulls=true, min_count=1}),
]}
      1:ProjectNode{projection=["n": 1]}
        0:SourceNode{}

Earlier I mentioned:

I thought we had some special paths in place for this.

We do, but the R bindings are not using them. The special paths are in the scanner's "lightweight consumer API" and the dplyr bindings build and execute an exec plan directly, using the scanner only as a scanner.

I'm guessing the solution will be to move this fast path from scanner and into the scan node as a scan option. This still doesn't really explain why we are reading so efficiently from local filesystem so I think I don't yet have the full picture.

I'll try and find some time to do some more instrumentation & debugging soon, probably Friday if I can't get to it any earlier.

asfimport commented 2 years ago

Weston Pace / @westonpace: So the measuring I was doing that claimed ~29 out of 1GB of the parquet file was read was incorrect. It turns out we are simply reading the entire file when a count query is issued from R, regardless of filesystem.

Long term, I think this will be addressed by ARROW-12311. In the newly proposed ScanOptions, if no fields are selected for projection, then no data will be returned. Today, as a convenience, we read all fields. However, that convenience should live higher up than ScanOptions.

In the short term I think we could address this by allowing R to use the same kind of hack that C++/python is using. If we get closer to 8.0.0 and it seems that ARROW-12311 is not going to be addressed then I will try and remember to put in the short-term fix.

asfimport commented 2 years ago

Carl Boettiger / @cboettig: Any news on this? I'll keep an eye on https://issues.apache.org/jira/browse/ARROW-12311 as well. 

Still having examples of simple arrow+dplyr operations crash on the above dataset. 

asfimport commented 2 years ago

Weston Pace / @westonpace: 8.0.0 should behave better when it comes to reading large parquet datasets and OOM (ARROW-15410). If you have a chance to test out one of the RCs and see if it prevents the crash I'd be grateful.

ARROW-12311 is on my personal development roadmap for 9.0.0 but I can't promise anything.

I unfortunately did not put in a short term fix for count() queries in 8.0.0.

asfimport commented 2 years ago

Carl Boettiger / @cboettig: Thanks Weston, I'll try that.  Just to make sure I'm testing the right thing, it will suffice to test the nightlies,  arrow-7.0.0.20220501 With that version I still see high RAM use that leads to a crash (ie. after it exceeds the 50 GB RAM I allocate to my container), e.g. which should be reproducible with this example:


## 
library(arrow)
library(dplyr)
packageVersion("arrow")
path <- arrow::s3_bucket("ebird/Mar-2022/observations",
                         endpoint_override = "minio.carlboettiger.info",
                         anonymous=TRUE)
obs <- arrow::open_dataset(path) 

tmp <- obs |> 
  group_by(sampling_event_identifier, scientific_name) |>
  summarize(count = sum(observation_count, na.rm=TRUE),
            .groups = "drop") 
tmp <- tmp |> compute() # crashes
asfimport commented 2 years ago

Weston Pace / @westonpace: That should be new enough. I'll try and reproduce and see what's going on.

asfimport commented 2 years ago

Weston Pace / @westonpace: I'm going to keep looking into this but this doesn't seem to be backpressure related. However, I think we are using more memory than we should be. I haven't yet pinpointed what is using the memory. It's possible there is some kind of per-batch leak that is cleaned up when the plan is destroyed. I'm going to try and play more with this tomorrow.

asfimport commented 2 years ago

Weston Pace / @westonpace: One mystery solved, a few more remained, I managed to pinpoint the memory usage in my reproduction. When we scan parquet files we store the metadata in the parquet fragment. At one time this was added to make it slightly quicker to re-read the dataset because all of the metadata information is cached. However:

asfimport commented 2 years ago

Carl Boettiger / @cboettig: Thanks Weston, sounds promising!  Hmm... in this particular case I have control of the serialization (the official eBird is distributed as a single giant tab-separated values file inside a tarball), so I wonder if this suggests tweaks I can do on my end.  Previously I was actually serializing into a single parquet file, which would suggest lower memory use due to the metadata cache you mention. 

Would it be possible to serialize the metadata cache to a parquet file in tempdir rather than turning it off?  (Not sure if it really would improve things, but it seems impossibly magical that arrow can do these operations without write operations and in small memory). 

Not sure if it is relevant, but I have identical OOM issues doing this in pure duckdb (with local copies of the parquet), see https://github.com/duckdb/duckdb/issues/3554.  Hannes suggests that on the duckdb side that is expected since some operations are not yet done "out-of-core" (and suggests a similar issue impacts Arrow I think?)

 

Anyway thanks again for your sluething here!

asfimport commented 2 years ago

Weston Pace / @westonpace: Moving to one file instead of many files will save you ~10-11GB of RAM today. I don't know if that is enough to prevent a crash.

Serializing the metadata cache should be possible. I think there is a bigger question around whether or not we should either invent a serialization format for the datasets API or adopt some kind of existing format (there is some discussion of this at ARROW-15317)

I would guess the out-of-core part Hannes is referring to is keeping track of the group identities. The group_by(sampling_event_identifier, scientific_name) operation is the culprit here. If there are millions / billions of combinations of (sampling_event_identifier, scientific_name) then the output table is going to be very large. That is where the bulk of the memory usage is in your query.

We also cannot stream the output because the very last row of your data might just happen to be in the same group as the very first row of your data. So we don't know that our count is correct until we've seen every single row.

There are a few ways around this inability to stream but these are longer term goals.

We can spill the groupings to disk as we perform the query. If the last row does happen to belong in the first group we saw then we have to load it back from disk, update it, and put it back on disk. Once we've seen all the rows we can stream the result from our temporary disk storage. This is what I assume Hannes is referring to when he describes an out-of-core operator.

Another approach we could take involves sorting/partitioning. If the incoming data is sorted or partitioned by one of those two columns then we could emit results early. For example, if we know that the incoming data is sorted on sampling_event_identifier then as soon as we stop seeing some sampling event (e.g. we've finished reading S10054205) we know that we can emit all combinations of (S10054205, scientific_name).

Of course, if you are doing |> collect() to collect the entire result into memory it won't make much difference. However, if you're able to process results incrementally, then either of these two approaches could allow you to run this query with much less RAM.

asfimport commented 2 years ago

Carl Boettiger / @cboettig: Thanks, this is super.   Right, I have no need for collect() on this, in general it is pretty easy to postpone doing any collect() until later filter / summarize operations have further shrunk the data down to size; so this example is just an illustrative initial step.  (this one is actually a standard pre-processing step in the eBird data, it merely aggregates subspecies counts into species-level counts; since most analyses focus on the species level.  most such analyses would then do further subsetting/aggregation).     I am used to using compute() with dplyr+duckdb to force the computation to occur without reading results into RAM in R; not sure if that's meaningful in arrow's streaming context though?  I try instead doing something like calling head() before collect(), to limit the size of the R object, but this still quickly exceeds my 50 GB allocation and crashes.  I rebuilt the parquet file collection to be 5x larger (i.e. 5e6 lines per file), but still even asking for head() on the above summarize operation exceeds my 50 GB RAM and crashes the R session.

I'm very intrigued by the ability to spill onto disk, since that seems the most general strategy for these operations.  Naively, I would think that should not be too rate-limiting in examples like this, since most users would have a faster disk I/O then network I/O?  In general, taking a performance hit (and the required free disk space) for some disk operations would be acceptable in order to still be able to work with data like this that is much larger than available RAM. (i.e. in a one-off case for the above calculation, I guess I could manually iterate over each parquet file separately, but that seems very cumbersome compared to arrow...)

Not sure if it should make a difference, but I tried the above with local set of parquet files rather than using the S3 network access, but experience the same issue. 

asfimport commented 2 years ago

Carl Boettiger / @cboettig: Just a note that even with a single parquet file I see the same crash after exceeding my 50 GB RAM, so I don't think the parquet per-file-metadata is the main culprit here?  Probably the group-identities tracking like you say.  Is it possible for arrow to use the local disk for this instead of attempting to keep all this in RAM?

 

asfimport commented 2 years ago

Weston Pace / @westonpace: Yes, the per-file metadata was only accounting for around 10GB. There is another chunk potentially being wasted in ARROW-16452 but it is a bit more difficult to tell exactly how much but it won't be that significant. So you are correct, the group-identities themselves are the main culprit here.

Yes, it is possible to use the local disk, but not trivial :)

At the moment I think someone is looking into ARROW-14163 which adds this to the join node. My hope is that adding disk spilling to join will cover a lot of the ground needed to add disk spilling to group-by.

CC @save-buffer

asfimport commented 2 years ago

Sasha Krassovsky / @save-buffer: I'm not actually sure how applicable Join's spilling is to GroupBy's, since GroupBy spilling involves spilling the hash table while Join spills batches before building hash table. 

asfimport commented 2 years ago

Todd Farmer / @toddfarmer: This issue was last updated over 90 days ago, which may be an indication it is no longer being actively worked. To better reflect the current state, the issue is being unassigned per project policy. Please feel free to re-take assignment of the issue if it is being actively worked, or if you plan to start that work soon.