apache / parquet-format

Apache Parquet Format
https://parquet.apache.org/
Apache License 2.0
1.76k stars 428 forks source link

Close decompression stream to free off-heap memory in time #398

Closed asfimport closed 1 year ago

asfimport commented 2 years ago

The decompressed stream in HeapBytesDecompressor$decompress now relies on the JVM GC to close. When reading parquet in zstd compressed format, sometimes I ran into OOM cause high off-heap usage. I think the reason is that the GC is not timely and causes off-heap memory fragmentation. I had to set  lower MALLOC_TRIMTHRESHOLD to make glibc give back memory to system quickly. There is a [thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789&cid=C025PH0G1D4]] of this zstd parquet issus in Iceberg community slack:  some people had the same problem. 

I think maybe we can use ByteArrayBytesInput as decompressed bytes input and close decompressed stream in time to solve this problem:


InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
decompressed = BytesInput.from(is, uncompressedSize); 

->


InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize));
is.close(); 

After I made this change to decompress, I found off-heap memory is significantly reduced (with same query on same data).

Environment: Spark 3.1.2 + Iceberg 0.12 + Parquet 1.12.3 + zstd-jni 1.4.9.1 + glibc Reporter: Yujiang Zhong / @zhongyujiang

Note: This issue was originally created as PARQUET-2160. Please see the migration documentation for further details.

asfimport commented 2 years ago

Yujiang Zhong / @zhongyujiang: @shangxinli @dongjoon-hyun Can you please take a look at this?

asfimport commented 2 years ago

Adam Binford: I think we are running into the same issue, this has been driving me crazy. Can recreate with pure Spark parquet table, I can easily eat up a few GB of native heap with a single read of a table, and doesn't really get reclaimed. I've hit up to 40gb of native heap usage with a 30gb JVM heap executor. Finally narrowed it down to reading zstd parquet

asfimport commented 2 years ago

Chao Sun / @sunchao:

After I made this change to decompress, I found off-heap memory is significantly reduced (with same query on same data).

What about query performance? was it affected too since it now incurs an extra copy.

asfimport commented 2 years ago

Yujiang Zhong / @zhongyujiang: [~Kimahriman] I didn't recreate this with pure spark parquet table. Which parquet version you're using? There are some fix patchs(https://github.com/apache/parquet-mr/pull/903 and https://github.com/apache/parquet-mr/pull/889) released in 1.12.3. 

 

 

asfimport commented 2 years ago

Yujiang Zhong / @zhongyujiang:

What about query performance? was it affected too since it now incurs an extra copy. @sunchao 

It does not add an extra copy actually,  BytesInput#copy  reads bytes from the input stream and constructs a ByteArrayBytesInput as return,  these bytes will be wrapped as ByteByffer later to construct a ByteBufferInputStream. This is the same as the current process, just that bytes are read in advance. I had run some queries to test (not strict benchmark) and the results show that there is no performance loss.

asfimport commented 2 years ago

Chao Sun / @sunchao: Hmm it does need to allocate extra heap memory and then read the data from the decompressor stream into it though, so IMO that's some extra overhead. However I do agree this can fix the issue where the off-heap memory usage spikes. It seems there is no other easy workaround on this since the BytesInput is passed to clients as pages after the decompress call, which makes it hard to determine when they can be closed.

asfimport commented 2 years ago

Yujiang Zhong / @zhongyujiang:

Hmm it does need to allocate extra heap memory and then read the data from the decompressor stream into it though But I see it happens in the current way too, only it happens after the decompress call, may I ask what's the difference between these two ways?

asfimport commented 2 years ago

Adam Binford:

Which parquet version you're using? There are some fix patchs(https://github.com/apache/parquet-mr/pull/903 and https://github.com/apache/parquet-mr/pull/889) released in 1.12.3. Yeah this is in Spark 3.3.0 so Parquet 1.12.2. It looks like https://github.com/apache/parquet-mr/pull/889 made it into 1.12.2, so the buffer pool is the only main difference. I tried dropping in 1.12.3, and enabling the buffer pool in 1.12.2, and both still exhibit the same issue. The reason I can generate so much off heap usage (> 1GB in a few seconds), is because I have a very wide table (1k+ columns), that are mostly strings (not sure if that makes a difference), so it's probably creating a lot of {}ZstdInputStream{}'s when reading all of the columns. Selecting only some of the columns isn't as noticeable, but still slowly grows over time.

I compiled this suggested fix myself and tested it out and it did in fact completely fix my problem. What was generating GBs of off heap memory that never got cleaned up (and dozens of GB of virtual memory), now consistently stays around ~100MB. I also agree looking at BytesInput that no extra copy of the actual data is made using {}BytesInput.copy{}, because either way the data will be loaded into a single byte[] at some point, albeit a little earlier with the copy method. Only overhead is creating the additional BytesInput java object.

asfimport commented 2 years ago

Yujiang Zhong / @zhongyujiang:

The reason I can generate so much off heap usage (> 1GB in a few seconds), is because I have a very wide table (1k+ columns) Thanks for verfing this, I also used a wide table for testing previously, but it's far from this wide. I haven't reproduced this issue on Parquet tables before so I doubt if this should be a Parquet problem to fix.

asfimport commented 2 years ago

Chao Sun / @sunchao:

... only it happens after the decompress call, may I ask what's the difference between these two ways?

Where is the decompress call? I was only comparing the code you posted in the JIRA description, so not sure what happened afterwards.

I think you should open a PR for this, and we can discuss more there.

asfimport commented 2 years ago

Yujiang Zhong / @zhongyujiang:

I was only comparing the code you posted in the JIRA description I see, there will be an extra copy indeed if looking at it this way. I think you should open a PR for this, and we can discuss more there. I'll open a pr later.

asfimport commented 1 year ago

Alexey Kudinkin / @alexeykudinkin: Corresponding Spark issue: https://issues.apache.org/jira/browse/SPARK-41952