Closed hujunfei closed 9 years ago
Do you have files (possibly hidden files) in that directory that are not Lzo?
No. In consideration of hidden files, I create a new dir and put just two lzo files. It didn't work either.
I debugged the code, and found, error occurs when using local[n] as running with multithread, but ok when using local[1] with just one thread.
The code in LzopDecompressor: 46
chkDMap.put(flag, flag.getChecksumClass().newInstance());
I think the new instance of ChecksumClass(actually Adler32) was affected by something related to multithread. This is all I know now.
Thanks for the report @junfeihu. Would you provide a patch for thread safety in this case?
@junfeihu I put some info about what's going on in #94
I hava meet this problem too. how to solve it.
Hey @suiwenfeng, you could apply my patch in #94, or ensure you only use "local[1]" as your spark master url.
Thx, i will try this solution.
I use spark to process lzo files. With a.lzo and b.lzo in "/input/", eg. When processing seperately with filename as "/input/a.lzo" and "/input/b.lzo", it can get corrent result. When processing together with filename as "/input" or "/input/", get errors like this:
14/09/05 13:01:32 WARN LzopInputStream: IOException in getCompressedData; likely LZO corruption. java.io.IOException: Corrupted uncompressed block at com.hadoop.compression.lzo.LzopInputStream.verifyChecksums(LzopInputStream.java:219) at com.hadoop.compression.lzo.LzopInputStream.getCompressedData(LzopInputStream.java:284) at com.hadoop.compression.lzo.LzopInputStream.decompress(LzopInputStream.java:261) at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:211) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:206) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:45) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:201) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:184) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$1.hasNext(Iterator.scala:847) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:133) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:130) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:571) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:571) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 14/09/05 13:01:32 WARN LzopInputStream: Incorrect LZO file format: file did not end with four trailing zeroes. java.io.IOException: Corrupted uncompressed block at com.hadoop.compression.lzo.LzopInputStream.verifyChecksums(LzopInputStream.java:219) at com.hadoop.compression.lzo.LzopInputStream.close(LzopInputStream.java:342) at org.apache.hadoop.util.LineReader.close(LineReader.java:150) at org.apache.hadoop.mapred.LineRecordReader.close(LineRecordReader.java:241) at org.apache.spark.rdd.HadoopRDD$$anon$1.close(HadoopRDD.scala:211) at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63) at org.apache.spark.rdd.HadoopRDD$$anon$1$$anonfun$1.apply$mcV$sp(HadoopRDD.scala:196) at org.apache.spark.TaskContext$$anonfun$executeOnCompleteCallbacks$1.apply(TaskContext.scala:63) at org.apache.spark.TaskContext$$anonfun$executeOnCompleteCallbacks$1.apply(TaskContext.scala:63) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.TaskContext.executeOnCompleteCallbacks(TaskContext.scala:63) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:204) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
Anyone can help?