fingltd / 4mc

4mc - splittable lz4 and zstd in hadoop/spark/flink
Other
108 stars 37 forks source link

Warning in pyspark: NullPointerException in RecordReader.close() #20

Open surjikal opened 7 years ago

surjikal commented 7 years ago

I compressed a file (~30mb, just for testing) using the 4mc tool:

$ 4mc data.txt 
Compressed filename will be : data.txt.4mc 
Compression: LZ4
Compressed (fast) 30288541 bytes into 2865501 bytes ==> 9.46% (Ratio=10.570)   

Then I tried to open the compressed file in (py)spark:

$ pyspark --master local[1]
>>> sc.textFile('file:///data.txt.4mc').count()
17/01/24 09:28:04 WARN org.apache.spark.rdd.HadoopRDD: Exception in RecordReader.close()
java.lang.NullPointerException
        at com.hadoop.compression.fourmc.Lz4Decompressor.reset(Lz4Decompressor.java:234)
        at org.apache.hadoop.io.compress.CodecPool.returnDecompressor(CodecPool.java:224)
        at org.apache.hadoop.mapred.LineRecordReader.close(LineRecordReader.java:288)
        at org.apache.spark.rdd.HadoopRDD$$anon$1.close(HadoopRDD.scala:276)
        at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:66)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:75)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
        at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1953)
        at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
4634

I got the result, but it warned me about a null pointer exception. Thought you guys might want to know!

gtinjr commented 1 year ago

Similar issue with pyspark. It seems that the decompressor has already been destroyed/finalized when reset buffer is getting called so the buffer does not exist anymore and it throws an Null Pointer in the Lz4Decompressor.

23/03/20 12:12:24 INFO CodecPool: Got brand-new decompressor [.4mc] 23/03/20 12:12:24 ERROR TaskContextImpl: Error in TaskCompletionListener java.lang.NullPointerException at com.fing.compression.fourmc.Lz4Decompressor.reset(Lz4Decompressor.java:234) at org.apache.hadoop.io.compress.CodecPool.returnDecompressor(CodecPool.java:224) at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.close(LineRecordReader.java:243) at org.apache.spark.sql.execution.datasources.RecordReaderIterator.close(RecordReaderIterator.scala:62) at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.close(HadoopFileLinesReader.scala:73) at org.apache.spark.sql.execution.datasources.text.TextFileFormat$$anonfun$readToUnsafeMem$1$$anonfun$apply$1$$anonfun$apply$2.apply(TextFileFormat.scala:123) at org.apache.spark.sql.execution.datasources.text.TextFileFormat$$anonfun$readToUnsafeMem$1$$anonfun$apply$1$$anonfun$apply$2.apply(TextFileFormat.scala:123) at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:131) at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:117) at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:117) at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:130) at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:128) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:128) at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.org$apache$spark$executor$Executor$TaskRunner$$anonfun$$res$1(Executor.scala:412) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:419) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1359) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:430) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) 23/03/20 12:12:24 ERROR Executor: Exception in task 0.3 in stage 0.0 (TID 3) org.apache.spark.util.TaskCompletionListenerException: null at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138) at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.org$apache$spark$executor$Executor$TaskRunner$$anonfun$$res$1(Executor.scala:412) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:419) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1359) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:430) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) 23/03/20 12:13:25 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM 23/03/20 12:13:25 INFO DiskBlockManager: Shutdown hook called 23/03/20 12:13:25 INFO ShutdownHookManager: Shutdown hook called

gtinjr commented 1 year ago

This is a problem on FourMcInputStream.close() method calling LZ4Decompressor.releaseDirectBuffers. when reset gets called the buffers have been previously set to null and therefore it causes the NullPointerException. Commenting this out on FourMcInputStream.close() method fix this issue. //((Lz4Decompressor)decompressor).releaseDirectBuffers(); I dont now the reason why this is getting called at this. I looked at a similar project hadoop-lzo from twitter and it does not releaseDirectBuffers in its inputstream.close() method. It is very destructive to make this call here since the decompressor will later be return to the hadoop CodecPool and it will be a bad decompressor in the pool since it is internal buffers have been destroyed.

The following scala code can be used to reproduce this issue, using spark 2.4.4-scala-2.11 and hadoop 2.7.0. This will fail here val raw_df = spark.read.option("header", value = true).csv("file:///c:/lift/V1_sep_test_900.psv.4mc") when it uses the inputstream to do a partial read to create the dataframe and its schema, then it was supposed to return the decompressor back to the pool and later reused on this line raw_df.show(5) but it fails after calling calling reset on the decompressor after its directbuffers were released in the close method.

object Decompressor { def createSparkSession(): SparkSession = { System.setProperty("hadoop.home.dir", "C:\opt\mapr\hadoop\hadoop-2.7.0") System.setProperty("SPARK_HOME", "C:\opt\mapr\spark\spark-2.4.4") return SparkSession.builder() .master("local[1]") .appName("Decompressor - 4mc debug") .config("spark.some.config.option", "config-value") .getOrCreate() }

def main(args: Array[String]): Unit = { val spark = createSparkSession() val raw_df = spark.read.option("header", value = true) .csv("file:///c:/lift/V1_sep_test_900.psv.4mc") raw_df.show(5) } }