apache / parquet-java

Apache Parquet Java
https://parquet.apache.org/
Apache License 2.0
2.65k stars 1.41k forks source link

should not use seek() for skipping very small column chunks. better to read and ignore data. #3076

Open Arnaud-Nauwynck opened 1 day ago

Arnaud-Nauwynck commented 1 day ago

Describe the enhancement requested

When reading some column chunks but not all, parquet is building a list of "ConsecutivePartList", then trying to call the Hadoop api for vectorized reader of FSDataInputStream#readVectored(List ...)

Unfortunatly, many implementations of "FSDataInputStream" do not override the readVectored() method, which trigger many distinct calls to read.

For example on hadoop-azure, the Azure Datalake Storage is much slower at establishing a new Https connection (using infamous calls HttpURLConnection for jdk 1.0, then doing TLS hand-shake), that to get only few more megas of data on an existing socket !!

The case with small wholes to avoid reading is very frequent when having columns in parquet files that are not read, and are highly compressed because of RLE encoding. Typically, a very sparse column with only few values, or even always null within a page. Such a column could be encoded in only few hundred of bytes by parquet, so it is NOT a problem of reading 100 bytes more.

Parquet should at least honor the following method from hadoop class FileSystem, that says that a seek of less than 4096 bytes is NOT reasonable.

  /**
   * What is the smallest reasonable seek?
   * @return the minimum number of bytes
   */
  default int minSeekForVectorReads() {
    return 4 * 1024;
  }

The logic for building this List for a list of column chunks is here: org.apache.parquet.hadoop.ParquetFileReader#internalReadRowGroup

  private ColumnChunkPageReadStore internalReadRowGroup(int blockIndex) throws IOException {
    ...
    for (ColumnChunkMetaData mc : block.getColumns()) {
        ...
        // first part or not consecutive => new list
        if (currentParts == null || currentParts.endPos() != startingPos) {  // <===== SHOULD honor minSeekForVectorReads()
          currentParts = new ConsecutivePartList(startingPos);
          allParts.add(currentParts);
        }
        currentParts.addChunk(new ChunkDescriptor(columnDescriptor, mc, startingPos, mc.getTotalSize()));
      }
    }
    // actually read all the chunks
    ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount());
    readAllPartsVectoredOrNormal(allParts, builder);
    rowGroup.setReleaser(builder.releaser);
    for (Chunk chunk : builder.build()) {
      readChunkPages(chunk, block, rowGroup);
    }

    return rowGroup;
}

maybe a possible implementation could be to add fictive "ConsecutivePartList" that are to be ignored while receiving the data, but that would avoid having some wholes in the ranges to read.

Component(s)

No response

Arnaud-Nauwynck commented 1 day ago

see also related issue: https://github.com/apache/parquet-java/issues/3077 : AzureBlobFileSystem.open() should return a sub-class fsDataInputStream that override readVectored() much more efficiently for small reads

steveloughran commented 1 day ago

Anyway, I agree with you about read and discard is better for cloud stores, what I don't know is what is a good value here

What do you think, at least in your deployments?

The velox paper actually set the value as 500K

IO reads for nearby columns are typically coalesced (merged) if the gap between them is small enough (currently about 20K for SSD and 500K for disaggregated storage)

Maybe in hadoop we should

  1. change the default size to 20k
  2. s3a and abfs set to 500K
  3. review the gcs settings too, though that's in google's code.