apache / parquet-java

Apache Parquet Java
https://parquet.apache.org/
Apache License 2.0
2.63k stars 1.41k forks source link

Out of Memory when reading large parquet file #2201

Open asfimport opened 6 years ago

asfimport commented 6 years ago

Hi,

We are successfully reading parquet files block by block, and are running into a JVM out of memory issue in a certain edge case. Consider the following scenario:

Parquet file has one column and one block and is 10 GB

Our JVM is 5 GB

Is there any way to read such a file? Below is our implementation/stack trace


Caused by: java.lang.OutOfMemoryError: Java heap space
at org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:778)
at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:511)

try {
  ParquetMetadata readFooter = ParquetFileReader.readFooter(hfsConfig, path,
                               ParquetMetadataConverter.NO_FILTER);
  MessageType schema = readFooter.getFileMetaData().getSchema();
  long a = readFooter.getBlocks().stream().
    reduce(0L, (left, right) -> left > 
      right.getTotalByteSize() ? left : right.getTotalByteSize(), 
    (leftl, rightl) -> leftl > rightl ? leftl : rightl);

  for (BlockMetaData block : readFooter.getBlocks()) {
    try {
      fileReader = new ParquetFileReader(hfsConfig, 
                   readFooter.getFileMetaData(), path, Collections
      .singletonList(block), schema.getColumns());
      PageReadStore pages;

    while (null != (pages = fileReader.readNextRowGroup())) {
      //exception gets thrown here on blocks larger than jvm memory
      final long rows = pages.getRowCount();
      final MessageColumnIO columnIO = new 
                            ColumnIOFactory().getColumnIO(schema);
      final RecordReader<Group> recordReader = 
            columnIO.getRecordReader(pages, new GroupRecordConverter(schema));

      for (int i = 0; i < rows; i++) {
        final Group group = recordReader.read();
        int fieldCount = group.getType().getFieldCount();

        for (int field = 0; field < fieldCount; field++) {
          int valueCount = group.getFieldRepetitionCount(field);
          Type fieldType = group.getType().getType(field);
          String fieldName = fieldType.getName();

          for (int index = 0; index < valueCount; index++) {
            // Process data 
          }
        }
      }
    }
  } catch (IOException e) {
    ...
  } finally {
    ...
  }
}

Reporter: Ryan Sachs

Note: This issue was originally created as PARQUET-1359. Please see the migration documentation for further details.

asfimport commented 6 years ago

Yuming Wang / @wangyum: Is it duplicate to PARQUET-980?