lz4 / lz4-java

LZ4 compression for Java
Apache License 2.0
1.11k stars 252 forks source link

Fail to decompress a stream #89

Closed davies closed 8 years ago

davies commented 8 years ago

When LZ4 is used to compresss the shuffle files in Spark, we saw these failure:

java.io.IOException: Stream is corrupted
    at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211)
    at org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:353)
    at java.io.DataInputStream.read(DataInputStream.java:149)
    at com.google.common.io.ByteStreams.read(ByteStreams.java:828)
    at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695)
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
    at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
    at org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1$$anonfun$3.apply(SortAggregateExec.scala:78)
    at org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1$$anonfun$3.apply(SortAggregateExec.scala:75)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: net.jpountz.lz4.LZ4Exception: Error decoding offset 3212 of input buffer
    at net.jpountz.lz4.LZ4JNIFastDecompressor.decompress(LZ4JNIFastDecompressor.java:39)
    at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:206)
    ... 36 more

LZ4BlockInputStream is a fork of LZ4BlockInputStream here, in order to support concated stream: https://github.com/davies/lz4-java/commit/cc1fa940ac57cc66a0b937300f805d37e2bf8411

They have the same bug

    case COMPRESSION_METHOD_LZ4:
      if (compressedBuffer.length < originalLen) {
        compressedBuffer = new byte[Math.max(compressedLen, compressedBuffer.length * 3 / 2)];
      }
      readFully(compressedBuffer, compressedLen);

I think compressedBuffer.length < originalLen should be compressedBuffer.length < compressedLen

davies commented 8 years ago

Since the LZ4BlockOutputStream could make sure that the originalLen is always larger than compressedLen, this is not bug actual.

JoshRosen commented 7 years ago

I'm continuing to investigate a stream corruption bug, so here are some notes based on me revisiting this PR (to hopefully save time for anyone else who comes across this):

The bug that @davies identified here looks like a duplicate of #70, where @mtopolnik described it as a performance issue (due to possible overallocation of more buffer space than is really needed).

The current code would be incorrect if the compressed data was longer than the original data, in which case we might write past the end of the buffer, but it turns out that LZ4BlockOutputStream has checks which switch from COMPRESSION_METHOD_LZ4 to COMPRESSION_METHOD_RAW in case the LZ4-compressed data is longer than the original data (see discussion at https://github.com/apache/spark/pull/15632#discussion_r85047248)..

I agree with other commenters on the linked Spark PR who pointed out that the existing code is somewhat dodgy / hard-to-understand because it's implicitly relying on guarantees from a distant part of the code. If the current behavior is intended then there should at least be a comment explaining the invariants that make it safe. Given the potential perf. impact mentioned in #70, though, I'd rather see this fixed completely via a code change to use the safer comparison.

odaira commented 7 years ago

Yes, I agree. I'll fix the code.

Do you still see the corrupted stream error in Spark? I think the problem is specific to your copied version of lz4-java, which supports concatenated stream, because the original version does not call LZ4BlockInputStream.refill() from LZ4BlockInputStream.available(). It looks weird to me that available() calls refill(), because usually available() should not call a blocking method like refill().

odaira commented 7 years ago

Fixed by c8f4371931a356036a201f01475df99e4be7726c.