apache / parquet-java

Apache Parquet Java
https://parquet.apache.org/
Apache License 2.0
2.61k stars 1.41k forks source link

Cannot read row group larger than 2GB #2057

Open asfimport opened 7 years ago

asfimport commented 7 years ago

Parquet MR 1.8.2 does not support reading row groups which are larger than 2 GB. See:https://github.com/apache/parquet-mr/blob/parquet-1.8.x/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L1064

We are seeing this when writing skewed records. This throws off the estimation of the memory check interval in the InternalParquetRecordWriter. The following spark code illustrates this:

/**
 * Create a data frame that will make parquet write a file with a row group larger than 2 GB. Parquet
 * only checks the size of the row group after writing a number of records. This number is based on
 * average row size of the already written records. This is problematic in the following scenario:
 * - The initial (100) records in the record group are relatively small.
 * - The InternalParquetRecordWriter checks if it needs to write to disk (it should not), it assumes
 *   that the remaining records have a similar size, and (greatly) increases the check interval (usually
 *   to 10000).
 * - The remaining records are much larger then expected, making the row group larger than 2 GB (which
 *   makes reading the row group impossible).
 *
 * The data frame below illustrates such a scenario. This creates a row group of approximately 4GB.
 */
val badDf = spark.range(0, 2200, 1, 1).mapPartitions { iterator =>
  var i = 0
  val random = new scala.util.Random(42)
  val buffer = new Array[Char](750000)
  iterator.map { id =>
    // the first 200 records have a length of 1K and the remaining 2000 have a length of 750K.
    val numChars = if (i < 200) 1000 else 750000
    i += 1

    // create a random array
    var j = 0
    while (j < numChars) {
      // Generate a char (borrowed from scala.util.Random)
      buffer(j) = (random.nextInt(0xD800 - 1) + 1).toChar
      j += 1
    }

    // create a string: the string constructor will copy the buffer.
    new String(buffer, 0, numChars)
  }
}
badDf.write.parquet("somefile")
val corruptedDf = spark.read.parquet("somefile")
corruptedDf.select(count(lit(1)), max(length($"value"))).show()

The latter fails with the following exception:

java.lang.NegativeArraySizeException
    at org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1064)
    at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:698)
...

-This seems to be fixed by commit https://github.com/apache/parquet-mr/commit/6b605a4ea05b66e1a6bf843353abcb4834a4ced8 in parquet 1.9.x. Is there any chance that we can fix this in 1.8.x?-

Reporter: Herman van Hövell

Note: This issue was originally created as PARQUET-980. Please see the migration documentation for further details.

asfimport commented 7 years ago

Cheng Lian / @liancheng: The current write path ensures that it never writes a page that is larger than 2GB, but the read path may read 1 or more column chunks consisting of multiple pages into a single byte array (or ByteBuffer) no larger than 2GB.

We hit this issue in production because the data distribution happened to be similar to the situation mentioned in the JIRA description and produced a skewed row group containing a column chunk larger than 2GB.

I think there are two separate issues to fix:

  1. On the write path, the strategy that dynamically adjusts memory check intervals needs some tweaking. The assumption that sizes of adjacent records are similar can be easily broken.
  2. On the read path, the ConsecutiveChunkList.readAll() method should support reading data larger than 2GB, probably by using multiple buffers.

Another option is to ensure that no row groups larger than 2GB can be ever written. Thoughts?

BTW, the parquet-python library can read this kind of malformed Parquet files successfully with this patch. We used it to recover our data from the malformed Parquet file.

KunZhou-at commented 2 months ago

I don't think this was fixed in 1.9.x, having the same issue on 1.14.1