apache / parquet-java

Apache Parquet Java
https://parquet.apache.org/
Apache License 2.0
2.65k stars 1.41k forks source link

ParquetReader not using FileSystem cache effectively? #1847

Open asfimport opened 9 years ago

asfimport commented 9 years ago

We've seen spark job stucked with following trace:

java.util.HashMap.put(HashMap.java:494) org.apache.hadoop.conf.Configuration.set(Configuration.java:1065) org.apache.hadoop.conf.Configuration.set(Configuration.java:1035) org.apache.hadoop.fs.viewfs.HDFSCompatibleViewFileSystem.mergeViewFsHdfsMountPoints(HDFSCompatibleViewFileSystem.java:491) org.apache.hadoop.fs.viewfs.HDFSCompatibleViewFileSystem.mergeConfFromDirectory(HDFSCompatibleViewFileSystem.java:413) org.apache.hadoop.fs.viewfs.HDFSCompatibleViewFileSystem.mergeViewFsAndHdfs(HDFSCompatibleViewFileSystem.java:273) org.apache.hadoop.fs.viewfs.HDFSCompatibleViewFileSystem.initialize(HDFSCompatibleViewFileSystem.java:190) org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2438) org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:90) org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2472) org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2454) org.apache.hadoop.fs.FileSystem.get(FileSystem.java:384) org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:384) parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:157) parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140) org.apache.spark.rdd.NewHadoopRDD$$anon$1.(NewHadoopRDD.scala:133) org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:104) org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:66) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.rdd.RDD.iterator(RDD.scala:244) org.apache.spark.rdd.NewHadoopRDD$NewHadoopMapPartitionsWithSplitRDD.compute(NewHadoopRDD.scala:244) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.rdd.RDD.iterator(RDD.scala:244) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.rdd.RDD.iterator(RDD.scala:244) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.rdd.RDD.iterator(RDD.scala:244) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.rdd.RDD.iterator(RDD.scala:244) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.rdd.RDD.iterator(RDD.scala:244) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:64) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745)

Reporter: Tim / @tsdeng

Note: This issue was originally created as PARQUET-328. Please see the migration documentation for further details.

asfimport commented 9 years ago

Jacques Nadeau / @jacques-n: I'd be much more inclined to simply allow and external tool to cache and pass in the FileSystem object. In general, I prefer to avoid (as much as possible), holding changing state in static members.

asfimport commented 9 years ago

Ryan Blue / @rdblue: File system handles are cached by the Hadoop FS API, so it is safe to call FileSystem.get every time. I agree with Jacques that we shouldn't cache in Parquet, though I don't like the idea of relying on a user to pass in the correct file system. That causes errors when they pass a FS that doesn't match.

asfimport commented 9 years ago

Jacques Nadeau / @jacques-n: I'm not sure about how other frameworks use Parquet... but in the case of Drill, we already have the FS object since we're doing a bunch of stuff. (As such, this method requires us to convert back into a conf so that it can be converted into an FS again.)

asfimport commented 9 years ago

Tim / @tsdeng: Actually the original ticket does not show the issue clearly. I created this ticket because we've seen a spark job getting stucked with following trace, it seems the caching is not being effective. Anyone else seeing this problem?

java.util.HashMap.put(HashMap.java:494) org.apache.hadoop.conf.Configuration.set(Configuration.java:1065) org.apache.hadoop.conf.Configuration.set(Configuration.java:1035) org.apache.hadoop.fs.viewfs.HDFSCompatibleViewFileSystem.mergeViewFsHdfsMountPoints(HDFSCompatibleViewFileSystem.java:491) org.apache.hadoop.fs.viewfs.HDFSCompatibleViewFileSystem.mergeConfFromDirectory(HDFSCompatibleViewFileSystem.java:413) org.apache.hadoop.fs.viewfs.HDFSCompatibleViewFileSystem.mergeViewFsAndHdfs(HDFSCompatibleViewFileSystem.java:273) org.apache.hadoop.fs.viewfs.HDFSCompatibleViewFileSystem.initialize(HDFSCompatibleViewFileSystem.java:190) org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2438) org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:90) org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2472) org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2454) org.apache.hadoop.fs.FileSystem.get(FileSystem.java:384) org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:384) parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:157) parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140) org.apache.spark.rdd.NewHadoopRDD$$anon$1.(NewHadoopRDD.scala:133) org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:104) org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:66) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.rdd.RDD.iterator(RDD.scala:244) org.apache.spark.rdd.NewHadoopRDD$NewHadoopMapPartitionsWithSplitRDD.compute(NewHadoopRDD.scala:244) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.rdd.RDD.iterator(RDD.scala:244) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.rdd.RDD.iterator(RDD.scala:244) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.rdd.RDD.iterator(RDD.scala:244) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.rdd.RDD.iterator(RDD.scala:244) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.rdd.RDD.iterator(RDD.scala:244) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:64) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745)

asfimport commented 9 years ago

Ryan Blue / @rdblue: I see what you guys are saying and I could be convinced to have an API where you can pass in a FileSystem to avoid this. I also have code that has a copy of the FileSystem, though I would generally discourage users from passing the FS in because it can throw exceptions in cases where they path and FS don't match.

asfimport commented 9 years ago

Alex Levenson / @isnotinvain: Have you seen: PARQUET-284 ?

Concurrent access to a HashMap can cause deadlock, wonder if that's what's happening here?

Hadoop has a cacheing mechanism for FileSystem.get() – which is synchronized – however I don't think the returned FileSystem objects are themselves thread safe (maybe FileSystem.get() uses thread local? not sure). So either way, care has to be taken to not pass fs instances across threads as far as I know. It's actually important to cache the FS instances however, in the past we've tried using FileSystem.newInstance() instead of the cache, but turns out that constructing a fs instance is very expensive (similar to constructing a new Configuration object, it involves parsing xml of disk and such).

asfimport commented 9 years ago

Ryan Blue / @rdblue: @isnotinvain, didn't we address this? Can we close it?