fingltd / 4mc

4mc - splittable lz4 and zstd in hadoop/spark/flink
Other
108 stars 36 forks source link

DirectBuffer grows larger than available space. #2

Closed pbutler closed 9 years ago

pbutler commented 9 years ago

When using this decompressor while decompressing the map outputs I run into problems with large files and it looks like the DirectBuffer grows too large. I have tried changing the maximum direct buffer size but it seems to require too large of a size to be practical ( -XX:MaxDirectMemorySize=2g didn't work and I stopped there). The following exception is thrown:

2015-01-21 16:54:39,137 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: Error while doing final merge 
    at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:160)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:376)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
Caused by: java.lang.OutOfMemoryError: Direct buffer memory
    at java.nio.Bits.reserveMemory(Bits.java:658)
    at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
    at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
    at com.hadoop.compression.fourmc.Lz4Decompressor.<init>(Lz4Decompressor.java:101)
    at com.hadoop.compression.fourmc.Lz4MediumCodec.createDecompressor(Lz4MediumCodec.java:156)
    at org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:178)
    at org.apache.hadoop.mapred.IFile$Reader.<init>(IFile.java:345)
    at org.apache.hadoop.mapred.Merger$Segment.init(Merger.java:302)
    at org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:634)
    at org.apache.hadoop.mapred.Merger.merge(Merger.java:191)
    at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.finalMerge(MergeManagerImpl.java:796)
    at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.close(MergeManagerImpl.java:363)
    at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:158)
    ... 6 more

I am not sure if there is something that can be done on decompressor side to fix this or if I am just not setting the options correctly. Any help would be appreciated.

carlomedas commented 9 years ago

In 4mc the LZ4 compression/decompression is bounded up to 4MB (4 * 1024 * 1024), which is an hard fixed limit. When creating a compressor or decompressor the direct buffer is allocated twice (once for compressed data buffer, and for output uncompressed data buffer). So each single decompressor istantiated will need no more than 2x 4MB direct buffers.

If you have some simple test case or directions to try and reproduce your issue, I can try.

carlomedas commented 9 years ago

This should be anyway fixed now by the direct buffer caching introduced, working good both on cluster distributed jobs and on HDFS clients using 4mc codecs.