apache / arrow-rs

Official Rust implementation of Apache Arrow
https://arrow.apache.org/
Apache License 2.0
2.37k stars 696 forks source link

Bug?: Writer with EnabledStatistics::Page on large arrays consumer 10x more memory #4973

Open REASY opened 9 months ago

REASY commented 9 months ago

Describe the bug When write Parquet using arrow-rs with large array columns, memory consumption is 10x larger in case EnabledStatistics is EnabledStatistics::Page. The schema of Parquet file has the following fields:

To Reproduce

Curiously, if I run DHAT memory profiler, I do not see much difference in memory consumption, https://github.com/REASY/parquet-example-rs#memory-profiler

Run valgrind to check memory related issue, it takes ~4 hours, but nothing was reported:

➜  parquet-example-rs git:(main) ✗ cargo install cargo-valgrind                                                                                                                             ➜  parquet-example-rs git:(main) ✗ cargo valgrind run  -- --output-parquet-folder output --rows 1000 --statistics-mode page                                                                 
   Compiling parquet-example-rs v0.1.0 (/home/artavazd.balaian/work/github/REASY/parquet-example-rs)
    Finished dev [unoptimized + debuginfo] target(s) in 2.14s
     Running `/home/artavazd.balaian/.cargo/bin/cargo-valgrind target/debug/parquet-example-rs --output-parquet-folder output --rows 1000 --statistics-mode page`
Received args: AppArgs { output_parquet_folder: "output", rows: 1000, statistics_mode: Page }
Processed 500 msgs with throughout 0.073 msg/s
Processed 1000 msgs with throughout 0.073 msg/s
Wrote 1000 Lidar Point Cloud to parquet in 13759.010 seconds, average throughput 0.073 msg/s

When I trace the code, the only place where EnabledStatistics::Page is used is in https://github.com/apache/arrow-rs/blob/1d6feeacebb8d0d659d493b783ba381940973745/parquet/src/column/writer/encoder.rs#L139-L144 and not clear how it can cause so much allocation.

Expected behavior

Additional context Dependencies:

arrow = "47"
clap = { version = "4", features = ["derive"] }
dhat = "0.3.2"
once_cell = "1.18.0"
parquet = "47"
rand = "0.8"

The comparison between three mode of statistics is done https://github.com/REASY/parquet-example-rs#page-statistics-consume-10x-more-memory-when-write-8000-rows, for the same number of rows check the table below:

Statistics mode Number of rows Total time, seconds CPU usage, % Average throughput, rows/s Maximum resident set size, Mbytes Output Parquet size, Mbytes
None 8000 113.124 96 70.719 752.67 38148.34
Chunk 8000 128.318 97 62.345 790.96 38148.37
Page 8000 130.53 98 61.301 8516.36 38148.88

Even though writing to Page stats uses more 10x more memory, in terms of the file size (it is not compressed), the Page one is only 548.2 Kbytes larger than None.

tustvold commented 9 months ago

A brief look at the linked code has a max row group size of 100? This will lead to a huge number of pages, the metadata for which must be buffered before it can all be flushed when writing the file footer. This would be my guess as to what is occurring.

A more typical row group limit would be 100,000 or more, and this will lead to more reasonably sized pages.

REASY commented 9 months ago

Hi, @tustvold Thanks for your reply.

The reason why I'm using small row group size 100 is the large data size per row group, a single row is ~5 Mbytes. If I increase row group size, I cannot read parquet from JVM world without requesting too much memory.

tustvold commented 9 months ago

What happens if you instead lower the write batch size to produce smaller pages, that way you may be able to raise the row group limit?

As an aside 5MB rows is likely to stress most analytics systems, you may be better off storing point clouds separately... The typical expectation is for pages to be ~1MB let alone rows...

REASY commented 9 months ago

Let me try your suggestion, @tustvold, thank you!

Meanwhile, debugging points me to https://github.com/apache/arrow-rs/blob/1d6feeacebb8d0d659d493b783ba381940973745/parquet/src/column/writer/mod.rs#L680-L685 that accumulates stats every time a new page is created https://github.com/apache/arrow-rs/blob/1d6feeacebb8d0d659d493b783ba381940973745/parquet/src/column/writer/mod.rs#L745

In case if I write 1000 rows, 100 rows per group I see it is a vec<vec<>> with 50 elements: image

REASY commented 9 months ago

What happens if you instead lower the write batch size to produce smaller pages, that way you may be able to raise the row group limit?

As an aside 5MB rows is likely to stress most analytics systems, you may be better off storing point clouds separately... The typical expectation is for pages to be ~1MB let alone rows...

  • With set_max_row_group_size(100) and DEFAULT_WRITE_BATCH_SIZE = 1024 getting maximum resident set size = 8.31 Gbytes
  • With set_max_row_group_size(100) and .set_write_batch_size(10) getting maximum resident set size = 5.31 Gbytes
  • With set_max_row_group_size(1000) and .set_write_batch_size(10) getting maximum resident set size = 5.65 Gbytes
tustvold commented 9 months ago

Yes it will accumulate per page per row group. You could also try increasing the page size for that column, but I can't help feeling this data is ill-fitted for a general-purpose analytics data format...

REASY commented 9 months ago

@tustvold you're right!

But one think is not clear for me, what's the reason that chunk level statistic do not take so much memory? I assume because it doesn't need to be accumulated for the whole page?

tustvold commented 9 months ago

I'm honestly not sure, have you tried using a memory profiler to figure out what the allocations are?

REASY commented 9 months ago

Yeap, I tried to use DHAT, but nothing much is visible there, https://github.com/REASY/parquet-example-rs#memory-profiler

Also I wanted to make sure there is no memory leaks, so run valgrind to check memory related issue, it takes ~4 hours, but nothing was reported:

➜  parquet-example-rs git:(main) ✗ cargo install cargo-valgrind                                                                                                                             ➜  parquet-example-rs git:(main) ✗ cargo valgrind run  -- --output-parquet-folder output --rows 1000 --statistics-mode page                                                                 
   Compiling parquet-example-rs v0.1.0 (/home/artavazd.balaian/work/github/REASY/parquet-example-rs)
    Finished dev [unoptimized + debuginfo] target(s) in 2.14s
     Running `/home/artavazd.balaian/.cargo/bin/cargo-valgrind target/debug/parquet-example-rs --output-parquet-folder output --rows 1000 --statistics-mode page`
Received args: AppArgs { output_parquet_folder: "output", rows: 1000, statistics_mode: Page }
Processed 500 msgs with throughout 0.073 msg/s
Processed 1000 msgs with throughout 0.073 msg/s
Wrote 1000 Lidar Point Cloud to parquet in 13759.010 seconds, average throughput 0.073 msg/s
tustvold commented 9 months ago

Perhaps try https://github.com/KDE/heaptrack

REASY commented 9 months ago

Another curious observation, even though writing to Page stats uses more memory, in terms of the file size (it is not compressed), the Page one is only 548.2 Kbytes larger than None

Statistics mode Number of rows Total time, seconds CPU usage, % Average throughput, rows/s Maximum resident set size, Mbytes Output Parquet size, Mbytes
None 8000 113.124 96 70.719 752.67 38148.34
Chunk 8000 128.318 97 62.345 790.96 38148.37
Page 8000 130.53 98 61.301 8516.36 38148.88
➜  parquet-example-rs git:(main) ✗ ls -la output
total 117192408
drwxrwxr-x 2 artavazd.balaian artavazd.balaian        4096 Oct 22 19:47 .
drwxrwxr-x 8 artavazd.balaian artavazd.balaian        4096 Oct 22 19:50 ..
-rw-rw-r-- 1 artavazd.balaian artavazd.balaian 40001465975 Oct 22 19:47 0_Chunk.parquet
-rw-rw-r-- 1 artavazd.balaian artavazd.balaian 40001443255 Oct 22 19:44 0_None.parquet
-rw-rw-r-- 1 artavazd.balaian artavazd.balaian 40002004615 Oct 22 19:49 0_Page.parquet
tustvold commented 9 months ago

I can't seem to reproduce this, using heaptrack

$ heaptrack ./target/release/parquet-example-rs --output-parquet-folder output --rows 1500 --statistics-mode page

image

vs

$ heaptrack ./target/release/parquet-example-rs --output-parquet-folder output --rows 1500 --statistics-mode none

image

Both show the sawtooth pattern I would expect as it creates batches, and then flushes them to disk

REASY commented 9 months ago

@tustvold I'm experiencing the same behavior when profile with heaptrack. That's probably why I didn't see anything suspicious in DHAT as well.

But again, if I run it without profiler, I see the difference in RAM:

➜  parquet-example-rs git:(main) ✗ date
Mon Oct 23 09:48:31 AM +08 2023
➜  parquet-example-rs git:(main) ✗ cargo build --release && /usr/bin/time -pv target/release/parquet-example-rs --output-parquet-folder output --rows 4000 --statistics-mode page
    Finished release [optimized + debuginfo] target(s) in 0.03s
Received args: AppArgs { output_parquet_folder: "output", rows: 4000, statistics_mode: Page }
Processed 500 msgs with throughout 71.429 msg/s
Processed 1000 msgs with throughout 66.667 msg/s
Processed 1500 msgs with throughout 65.217 msg/s
Processed 2000 msgs with throughout 64.516 msg/s
Processed 2500 msgs with throughout 64.103 msg/s
Processed 3000 msgs with throughout 62.500 msg/s
Processed 3500 msgs with throughout 63.636 msg/s
Processed 4000 msgs with throughout 62.500 msg/s
Wrote 4000 Lidar Point Cloud to parquet in 64.054 seconds, average throughput 62.447 msg/s
        Command being timed: "target/release/parquet-example-rs --output-parquet-folder output --rows 4000 --statistics-mode page"
        User time (seconds): 53.96
        System time (seconds): 9.36
        Percent of CPU this job got: 98%
        Elapsed (wall clock) time (h:mm:ss or m:ss): 1:04.15
        Average shared text size (kbytes): 0
        Average unshared data size (kbytes): 0
        Average stack size (kbytes): 0
        Average total size (kbytes): 0
        Maximum resident set size (kbytes): 4322052
        Average resident set size (kbytes): 0
        Major (requiring I/O) page faults: 0
        Minor (reclaiming a frame) page faults: 1490997
        Voluntary context switches: 2227
        Involuntary context switches: 835
        Swaps: 0
        File system inputs: 0
        File system outputs: 39064488
        Socket messages sent: 0
        Socket messages received: 0
        Signals delivered: 0
        Page size (bytes): 4096
        Exit status: 0
➜  parquet-example-rs git:(main) ✗ date
Mon Oct 23 09:49:57 AM +08 2023
➜  parquet-example-rs git:(main) ✗ cargo build --release && /usr/bin/time -pv target/release/parquet-example-rs --output-parquet-folder output --rows 4000 --statistics-mode none
    Finished release [optimized + debuginfo] target(s) in 0.03s
Received args: AppArgs { output_parquet_folder: "output", rows: 4000, statistics_mode: None }
Processed 500 msgs with throughout 83.333 msg/s
Processed 1000 msgs with throughout 83.333 msg/s
Processed 1500 msgs with throughout 83.333 msg/s
Processed 2000 msgs with throughout 83.333 msg/s
Processed 2500 msgs with throughout 80.645 msg/s
Processed 3000 msgs with throughout 81.081 msg/s
Processed 3500 msgs with throughout 79.545 msg/s
Processed 4000 msgs with throughout 80.000 msg/s
Wrote 4000 Lidar Point Cloud to parquet in 50.245 seconds, average throughput 79.610 msg/s
        Command being timed: "target/release/parquet-example-rs --output-parquet-folder output --rows 4000 --statistics-mode none"
        User time (seconds): 40.76
        System time (seconds): 8.81
        Percent of CPU this job got: 98%
        Elapsed (wall clock) time (h:mm:ss or m:ss): 0:50.26
        Average shared text size (kbytes): 0
        Average unshared data size (kbytes): 0
        Average stack size (kbytes): 0
        Average total size (kbytes): 0
        Maximum resident set size (kbytes): 744364
        Average resident set size (kbytes): 0
        Major (requiring I/O) page faults: 0
        Minor (reclaiming a frame) page faults: 596603
        Voluntary context switches: 1764
        Involuntary context switches: 550
        Swaps: 0
        File system inputs: 0
        File system outputs: 39063928
        Socket messages sent: 0
        Socket messages received: 0
        Signals delivered: 0
        Page size (bytes): 4096
        Exit status: 0
REASY commented 9 months ago

If I replace default allocator with jemalloc, memory consumption is similar between two modes. The changes are in https://github.com/REASY/parquet-example-rs/pull/1

➜  parquet-example-rs git:(feature/experiment-with-jemalloc) ✗ date
Mon Oct 23 05:03:35 PM +08 2023
➜  parquet-example-rs git:(feature/experiment-with-jemalloc) ✗ cargo build --features=jemalloc --release && /usr/bin/time -pv target/release/parquet-example-rs --output-parquet-folder output --rows 4000 --statistics-mode page
    Finished release [optimized + debuginfo] target(s) in 0.03s
Received args: AppArgs { output_parquet_folder: "output", rows: 4000, statistics_mode: Page }
Processed 500 msgs with throughout 71.429 msg/s
Processed 1000 msgs with throughout 66.667 msg/s
Processed 1500 msgs with throughout 65.217 msg/s
Processed 2000 msgs with throughout 66.667 msg/s
Processed 2500 msgs with throughout 65.789 msg/s
Processed 3000 msgs with throughout 65.217 msg/s
Processed 3500 msgs with throughout 64.815 msg/s
Processed 4000 msgs with throughout 64.516 msg/s
Wrote 4000 Lidar Point Cloud to parquet in 62.136 seconds, average throughput 64.375 msg/s
        Command being timed: "target/release/parquet-example-rs --output-parquet-folder output --rows 4000 --statistics-mode page"
        User time (seconds): 52.13
        System time (seconds): 8.61
        Percent of CPU this job got: 97%
        Elapsed (wall clock) time (h:mm:ss or m:ss): 1:02.15
        Average shared text size (kbytes): 0
        Average unshared data size (kbytes): 0
        Average stack size (kbytes): 0
        Average total size (kbytes): 0
        Maximum resident set size (kbytes): 797584
        Average resident set size (kbytes): 0
        Major (requiring I/O) page faults: 0
        Minor (reclaiming a frame) page faults: 390983
        Voluntary context switches: 1854
        Involuntary context switches: 1227
        Swaps: 0
        File system inputs: 0
        File system outputs: 39064480
        Socket messages sent: 0
        Socket messages received: 0
        Signals delivered: 0
        Page size (bytes): 4096
        Exit status: 0
➜  parquet-example-rs git:(feature/experiment-with-jemalloc) ✗ date                                                                                                                         Mon Oct 23 05:04:46 PM +08 2023
➜  parquet-example-rs git:(feature/experiment-with-jemalloc) ✗ cargo build --features=jemalloc --release && /usr/bin/time -pv target/release/parquet-example-rs --output-parquet-folder output --rows 4000 --statistics-mode none
    Finished release [optimized + debuginfo] target(s) in 0.03s
Received args: AppArgs { output_parquet_folder: "output", rows: 4000, statistics_mode: None }
Processed 500 msgs with throughout 83.333 msg/s
Processed 1000 msgs with throughout 83.333 msg/s
Processed 1500 msgs with throughout 78.947 msg/s
Processed 2000 msgs with throughout 76.923 msg/s
Processed 2500 msgs with throughout 78.125 msg/s
Processed 3000 msgs with throughout 78.947 msg/s
Processed 3500 msgs with throughout 79.545 msg/s
Processed 4000 msgs with throughout 78.431 msg/s
Wrote 4000 Lidar Point Cloud to parquet in 51.196 seconds, average throughput 78.131 msg/s
        Command being timed: "target/release/parquet-example-rs --output-parquet-folder output --rows 4000 --statistics-mode none"
        User time (seconds): 40.25
        System time (seconds): 8.94
        Percent of CPU this job got: 96%
        Elapsed (wall clock) time (h:mm:ss or m:ss): 0:51.22
        Average shared text size (kbytes): 0
        Average unshared data size (kbytes): 0
        Average stack size (kbytes): 0
        Average total size (kbytes): 0
        Maximum resident set size (kbytes): 807580
        Average resident set size (kbytes): 0
        Major (requiring I/O) page faults: 0
        Minor (reclaiming a frame) page faults: 400793
        Voluntary context switches: 2327
        Involuntary context switches: 947
        Swaps: 0
        File system inputs: 0
        File system outputs: 39063920
        Socket messages sent: 0
        Socket messages received: 0
        Signals delivered: 0
        Page size (bytes): 4096
        Exit status: 0
REASY commented 9 months ago

@zenixan and @xhumanoid suggested it could be due to high memory fragmentation in case of default allocator. I assume the fragmentation is due to vec of small vectors mentioned in https://github.com/apache/arrow-rs/issues/4973#issuecomment-1774066454 ?

page

image

none

image

tustvold commented 9 months ago

That seems like a stretch, memory fragmentation could perhaps account for a 2x, but a 10x would be surprising. Regardless we should probably not be designing around degenerate cases of OS specific allocators

Edit: is it possible this is actually an issue with dhat, you override the system allocator with it - https://github.com/REASY/parquet-example-rs/blob/main/src/main.rs#L3

REASY commented 9 months ago

That seems like a stretch, memory fragmentation could perhaps account for a 2x, but a 10x would be surprising. Regardless we should probably not be designing around degenerate cases of OS specific allocators

Edit: is it possible this is actually an issue with dhat, you override the system allocator with it - https://github.com/REASY/parquet-example-rs/blob/main/src/main.rs#L3

That line is protected with dhat-heap feature, so it should not affect anything.

REASY commented 9 months ago

If I run collectl as collectl -sB --verbose -oT in parallel with different modes, here is what I see (OS level memory fragmentation), it looks quite similar to me, probably jemalloc is better in recycling previously allocated memory.

page with default allocator

MEMORY FRAGMENTATION SUMMARY (4K pages)_ Default allocator (1)

page with jemalloc

MEMORY FRAGMENTATION SUMMARY (4K pages)_ jemalloc (1)

tustvold commented 7 months ago

Just cleaning up some tickets, is there anything actionable to come out of this, or is the conclusion just that the glibc memory allocator is not behaving as one might hope?

REASY commented 6 months ago

@tustvold if we want it to perform well with glibc memory allocator, it seems that replacing vec<vec> by a single vec and extra metadata for ColumnIndexBuilder.min_values and ColumnIndexBuilder.max_values https://github.com/apache/arrow-rs/blob/f7101ec3a2b37c436f4554c28fa2d0a05de533ff/parquet/src/file/metadata.rs#L886-L887 can help to reduce allocation of small vectors.