apache / parquet-java

Apache Parquet Java
https://parquet.apache.org/
Apache License 2.0
2.66k stars 1.42k forks source link

ColumnIndex should provide number of records skipped #2536

Open asfimport opened 4 years ago

asfimport commented 4 years ago

When integrating Parquet ColumnIndex, I found we need to know from Parquet that how many records that we skipped due to ColumnIndex filtering. When rowCount is 0, readNextFilteredRowGroup() just advance to next without telling the caller. See code here https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L969

 

In Iceberg, it reads Parquet record with an iterator. The hasNext() has the following code():

valuesRead + skippedValues < totalValues

See (https://github.com/apache/iceberg/pull/1566/commits/cd70cac279d3f14ba61f0143f9988d4cc9413651#diff-d80c15b3e5376265436aeab8b79d5a92fb629c6b81f58ad10a11b9b9d3bfcffcR115). 

So without knowing the skipped values, it is hard to determine hasNext() or not. 

 

Currently, we can workaround by using a flag. When readNextFilteredRowGroup() returns null, we consider it is done for the whole file. Then hasNext() just retrun false. 

 

 

 

Reporter: Xinli Shang / @shangxinli Assignee: Xinli Shang / @shangxinli

Note: This issue was originally created as PARQUET-1927. Please see the migration documentation for further details.

asfimport commented 4 years ago

Gabor Szadovszky / @gszadovszky: @shangxinli, I am not sure I get the problem. If rowCount is 0 after column-index filtering we just skip the whole row-group similarly to the row-group level filters (dictionary/statistics or bloom). You don't know the number of rows skipped in case of row-group level filters either.

asfimport commented 4 years ago

Xinli Shang / @shangxinli: @gszadovszky, the way that Iceberg Parquet reader iterator implements is that it relies on the check of 'valuesRead < totalValues'. When intergrating ColumnIndex, we relace readNextRowGroup() with readNextFilteredRowGroup(). Because readNextFilteredRowGroup() will skip some records, we change the check as 'valuesRead + skippedValues < totalValues'. The skippedValues is calculated as 'blockRowCount - counts_Retuned_from_readNextFilteredRowGroup'.This works great. But when the whole row group is skipped, readNextFilteredRowGroup() advance to next row group internally without Iceberg's knowledge. Hence Icerberg doesn't know how to calculate the skippedValues. 

So if readNextFilteredRowGroup() can return how many records it skipped, or tell the index of the row group that it gets the returned pages, Iceberg can calcuate the skippedValues. 

asfimport commented 4 years ago

Gabor Szadovszky / @gszadovszky: I get it now. Thanks for explaining.

I guess you already know about ParquetFileReader.getRecordCount() and ParquetFileReader.getFilteredRecordCount(). These values are for the whole file and not for the actual row group so they might not good for Iceberg but these are the ones parquet-mr uses at higher levels.

asfimport commented 4 years ago

Xinli Shang / @shangxinli: That is correct @gszadovszky! We need a finer-grained filter count in row group level to let the iterator to use. Do you think it makes sense that we add the API for that? 

If yes, do you think we can release the 1.11.2 version? I see usually no more release after 1.xx.1. 

 

asfimport commented 4 years ago

Xinli Shang / @shangxinli: Add @rdblue, [~shardulm] as FYI**

asfimport commented 4 years ago

Gabor Szadovszky / @gszadovszky: I think, it is fine extending the current API if it is required by one of our clients. Also, Iceberg might be the first one who already uses 1.11. This extension might be useful for others (e.g. Hive, Spark).

Releasing 1.11.2 depends on the issues we would like to fix in it. If they are regressions introduced in 1.11 and they are sever we clearly would like to release the fix in a maintenance release. So the question is if this issue is severe enough and doesn't have a proper workaround.

asfimport commented 4 years ago

Xinli Shang / @shangxinli: The workaround I can think of is to apply ColumnIndex to row groups, something like (columnIndex, rowGroup) => recordCount, before calling readNextFilteredRowGroup() in Iceberg. If recordCount is 0, we skip calling readNextFilteredRowGroup() for that row group. By doing this way, it is ensured that readNextFilteredRowGroup() will never advance to the next row group without Iceberg's knowledge. But this workaround has several issues. 1) It is not a trivial implementation because we need to implement all types of filters against columnIndex, which pretty much duplicate the implementation in Parquet. 2) The two implementations(in Parquet and in Iceberg) have to be consistent. If one has issues, it will cause Iceberg to be in an unknown state. 3) It requires other adoption like Hive, Spark to reimplement their own too.  

This is not regression because ColumnIndex is a new feature in 1.11.x. But I think releasing 1.11.2 would be better because it helps the adoption of 1.11.x  as the ColumnIndex feature is one of the major features in 1.11.x. 

 

asfimport commented 4 years ago

Gabor Szadovszky / @gszadovszky: Rechecked the code again and found that PageReadStore.getRowCount() should return the required value meaning the number of rows after column-index filtering. See the implementation for details. So you do not need to calculate anything and extend the current API just use this value as total row count in the row group.

asfimport commented 4 years ago

Xinli Shang / @shangxinli: @gszadovszky, the problem is when rowCount is 0(line 966 https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L966), readNextFilteredRowGroup() will just call  advanceToNextBlock() and then recurse itself to next row group. In that case, the returned count of PageReadStore.getRowCount() will be the filtered count of the next row group. Iceberg doesn't have the knowledge to know these row counts are from which row group. It has to assume it is from the previous group. The result is it is wrongly counted and Iceberg iterator will just return true in hasNext() even all the records are read. 

 

The fix could be just to add a count for a skipped count including the skipped count as a whole row group. 

 

asfimport commented 4 years ago

Gabor Szadovszky / @gszadovszky: [~shangx@uber.com], sorry for keep bothering with my ideas but it seems I still not get the concept.

As far as I understand iceberg keeps reading the rows until it reaches the total number of rows in the row group or the file (not sure which one). Both the numbers of (filtered) rows are available for the row group (PageReadStore.getRowCount()) and the whole file (ParquetFileReader.getFilteredRecordCount()). I am not sure why you try to align the number of rows already read with the number of filtered rows instead of using the proper number for the total. (Instead of valuesRead + skippedValues < totalValues you may use valuesRead < totalFilteredValues.) Of course if you have to use the number of total (filtered) rows in the file you have to calculate the filtering for all row groups before starting to read any value but you have to do it anyway so I don't think it should be a problem.

Meanwhile, if you think the API change is required I am happy to review the related PR.

asfimport commented 4 years ago

Xinli Shang / @shangxinli: ParquetFileReader.getFilteredRecordCount() cannot be used because Iceberg applied RowGroup stats filter and Dcitionary filter also.

I think what we can do is to make getRowRanges() public. Iceberg call getRowRanges() to calculate the filteredRecordCount for the RowGroup that is determined(by RowGroup stats and Dictionary filter) to be read.

asfimport commented 4 years ago

Gabor Szadovszky / @gszadovszky:

ParquetFileReader.getFilteredRecordCount() cannot be used because Iceberg applied RowGroup stats filter and Dcitionary filter also. I don't see why it is a problem. ParquetFileReader filters the row groups based on stats and dictionary (and bloom filters from 1.12) in the constructor so getFilteredRecordCount will be executed on the filtered row groups. I am curious why the currently available values are not suitable for iceberg. parquet-mr high level API (the record readers) works based on these and if they are not correct for iceberg it might highlight some issues inside parquet-mr as well. (I don't think this is the case, though. We have a lot of unit tests in the different API levels.)

I think what we can do is to make getRowRanges() public. I would rather not make this public. The object RowRanges is not designed for public use. If it is really necessary I would provide the required values instead.

asfimport commented 4 years ago

Xinli Shang / @shangxinli: Thanks @gszadovszky for the explanation. I see it now. The confusing part is Iceberg creates ParquetFileReader object without passing in the filter. Instead, it rewrites RowGroup and Dictionary filtering.

Hi @rdblue, do you know why Iceberg rewrites the RowGroup and Dictionary filtering? From what Gabor mentioned above, if we pass the filter to the ParquetFileReader constructor, all the row groups that we need to deal with later are already filtered. When we upgrade to 1.12.0, bloomfilter will be automatically applied to those row groups.

asfimport commented 4 years ago

Xinli Shang / @shangxinli: @gszadovszky, I just realized the RowGroupFilter only applies the stats from ColumnChunkMetaData instead of page-level stats. There is a chance that ColumnChunkMetaData stats say yes, but page-level stats say no. In that case, readNextFilteredRowGroup() can still skip block.

asfimport commented 4 years ago

Gabor Szadovszky / @gszadovszky: [~shangx@uber.com], yes, it may occur that the row group level filters says the searched values might be in the row group but the page level filtering skips all the pages. In this case the whole row group is skipped.

asfimport commented 4 years ago

Gabor Szadovszky / @gszadovszky: What is the current status of this one? Is it a blocker for 1.12.0?

asfimport commented 4 years ago

Xinli Shang / @shangxinli: It is still not decided yet in the last Iceberg meeting. But I think if adding the 'skipped number of records' is minimal for us, we can go ahead just to add it. Otherwise, we can release without this.

Add @rdblue for FYI