apache / parquet-java

Apache Parquet Java
https://parquet.apache.org/
Apache License 2.0
2.49k stars 1.38k forks source link

ParquetWriter.getDataSize NullPointerException after closed #2037

Open asfimport opened 7 years ago

asfimport commented 7 years ago

When I run ParquetWriter.getDataSize(), it works normally. But after I call ParquetWriter.close(), subsequent calls to ParquetWriter.getDataSize result in a NullPointerException.

java.lang.NullPointerException
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.getDataSize(InternalParquetRecordWriter.java:132)
    at org.apache.parquet.hadoop.ParquetWriter.getDataSize(ParquetWriter.java:314)
    at FileBufferState.getFileSizeInBytes(FileBufferState.scala:83)

The reason for the NPE appears to be in InternalParquetRecordWriter.getDataSize, where it assumes that columnStore is not null.

But the close() method calls flushRowGroupToStore() which sets columnStore = null.

I'm guessing that once the file is closed, we can just return lastRowGroupEndPos since there should be no more buffered data, but I don't fully understand how this class works.

Environment: Linux prim 4.8.13-1-ARCH #1 SMP PREEMPT Fri Dec 9 07:24:34 CET 2016 x86_64 GNU/Linux

openjdk version "1.8.0_112" OpenJDK Runtime Environment (build 1.8.0_112-b15) OpenJDK 64-Bit Server VM (build 25.112-b15, mixed mode)

Reporter: Mike Mintz

Related issues:

Note: This issue was originally created as PARQUET-860. Please see the migration documentation for further details.

asfimport commented 6 years ago

e.birukov: I get the same error:

This happens when I get the temporary unavailability of s3

....FileSystemException...  at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:158) at org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:121) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:643)

when  I call the close () method again Caused by: java.lang.NullPointerException at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:162) at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:109) at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:301)

I read data from the stream and write data using ParquetWriter. So this problem is critical. It causes data loss!

 

asfimport commented 6 years ago

Ryan Blue / @rdblue: [~e.birukov], this issue is not related to the problem you're hitting. If you'd like, please open another issue for that and we can look into whether it is worth fixing. Most of the time, we assume that an exception in close is not recoverable and the entire file needs to be rewritten. You're only guaranteed durability when close returns successfully, so this is not causing data loss. Data loss is only a problem if you've already discarded the input data, but that is a problem with the writing application and not with Parquet.

[~mikemintz], I hadn't seen this issue before now. We can probably fix this by adding logic that checks whether the file was closed and saves the file position just after writing the footer. We've also recently added an accessor for the footer that is available once the file is closed, so you could also use that to get stats and other info if that's what you're after.

asfimport commented 6 years ago

e.birukov: @rdblue, You wrote "Most of the time, we assume that an exception in close is not recoverable and the entire file needs to be rewritten". This is obvious when writing data to the local file system, when the file is opened for writing when creating ParquetWriter. But Amazon S3 is a cloud service that represents a distributed storage system for objects. There is no permanent connection to the saving object. Any network problems or service failures can cause temporary errors or timeout. You write that my application code cares about saving the original data. Are these data already buffered in memory in objects created from ParquetWriter? Do I need to duplicate copy input data in the application?

asfimport commented 6 years ago

Ryan Blue / @rdblue: The S3 file system implementation should retry and recover if it is a transient error. In general I'm skeptical that Parquet can reliably provide what you want.

Parquet makes no guarantee of durability until the close operation returns. As a consequence, you should not discard incoming records until then. This is why Parquet works better with systems like Kafka that have a long window of time where records can be replayed. In general, I would not recommend Parquet as a format from other streaming systems like Flume. This works fine with MR or Spark where the framework itself will retry a write task when an output writer throws an exception.

asfimport commented 6 years ago

e.birukov: @rdblue, thank you for detailed answer. I adapt the architecture of the application to use my stream provider to can be replayed records in case of a failure.

asfimport commented 1 month ago

Steve Loughran / @steveloughran:

ut Amazon S3 is a cloud service that represents a distributed storage system for objects. There is no permanent connection to the saving object. Any network problems or service failures can cause temporary errors or timeout. You write that my application code cares about saving the original data. Are these data already buffered in memory in objects created from ParquetWriter? Do I need to duplicate copy input data in the application?

s3a FS puts a lot of effort into retry and recovery in close() because it is so critical. One thing to note is that too much code assumes that close() is fast...it often isn't and if the thread is sending heartbeats back they can time out. If you set a progress callback on the FSDataOutputStream then we will actually invoke it after every queued block is uploaded.