Open sundy-li opened 1 year ago
This would also help us in IOx (see https://github.com/influxdata/influxdb_iox/issues/7470, per @crepererum )
It would also be aweome to get a similar treatment for Data pages (the so called "page index" values)
Maybe we could add something to the parquet
with the arrow
featre: https://docs.rs/parquet/41.0.0/parquet/arrow/index.html
That could read the parquet statistics as an ArrayRef
@tustvold notes that the translation from parquet data model to arrow data model is quite tricky -- and he may have time to do this in a week or two
Note we have started refactoring the code in DataFusion into a format that could reasonable be ported upstream (see https://github.com/apache/arrow-datafusion/pull/8294)
@sundy-li I wonder if there is code that handles this feature? I didn't see any obvious PRs
https://github.com/apache/arrow-rs/pulls?q=is%3Apr+statistics+is%3Aclosed
(I still harbor goals of getting this feature into arrow-rs)
Just wondering if there's anything left to do to address this issue please? If so, I'm happy to pick this up if that's ok.
Just wondering if there's anything left to do to address this issue please? If so, I'm happy to pick this up if that's ok.
That would be amazing -- thank you very much @opensourcegeek
What I think would be idea is an an API in parquet::arrow
that looks like this:
/// statistics extracted from `Statistics` as Arrow `ArrayRef`s
///
/// # Note:
/// If the corresponding `Statistics` is not present, or has no information for
/// a column, a NULL is present in the corresponding array entry
pub struct ArrowStatistics {
/// min values
min: ArrayRef,
/// max values
max: ArrayRef,
/// Row counts (UInt64Array)
row_count: ArrayRef,
/// Null Counts (UInt64Array)
null_count: ArrayRef,
}
// (TODO accessors for min/max/row_count/null_count)
/// Extract `ArrowStatistics` from the parquet [`Statistics`]
pub fn parquet_stats_to_arrow(
arrow_datatype: &DataType,
statistics: impl IntoIterator<Item = Option<&Statistics>>
) -> Result<ArrowStatisics> {
todo!()
}
(This is similar to the existing API parquet::arrow::parquet_to_arrow_schema)
Note it is this Statistics
There is a version of this code here in DataFusion that could perhaps be` adapted: https://github.com/apache/datafusion/blob/accce9732e26723cab2ffc521edbf5a3fe7460b3/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs#L179-L186
I suggest you add a new top level test binary in https://github.com/apache/arrow-rs/tree/master/parquet/tests called statistics.rs
The tests should look like:
let record_batch = make_batch_with_relevant_datatype();
// write batch/batches to file
// open file / extract stats from metadata
// compare stats
I can help writing these tests
I personally suggest:
cc @tustvold in case you have other ideas
Thanks @alamb - I'll take a stab tomorrow and see how I get on. I'm new to Parquet so please bear with me.
I had a chance to go through the code on a high level, thanks @alamb for the pointers, it helped me to get started. What will call this new function please? Just trying to understand the whole flow if that's ok. Thanks
I had a chance to go through the code on a high level, thanks @alamb for the pointers, it helped me to get started. What will call this new function please? Just trying to understand the whole flow if that's ok. Thanks
The major usecase I have initially is to implement the PruningStatistics API in DataFusion which supports pruning(skipping) Row Groups based on a range anaylsis of min/max values, documented here
So for example, given a filter in a query such as a = 5
, DataFusion would use the min and max values of a
in each row group to determine if there were any rows in that row group that could match
Does that make sense?
That makes sense - thanks @alamb
pub fn parquet_stats_to_arrow(
arrow_datatype: &DataType,
statistics: impl IntoIterator<Item = Option<&Statistics>>
) -> Result<ArrowStatisics> {
todo!()
}
To implement the above function, I'm just trying to suss out the details now. Below are the questions (probably very basic - apologies) using your a = 5
example,
arrow_datatype
, this will be a
s arrow data type => Int64 or the likes?
impl IntoIterator<Item = Option<&Statistics>>
, will this be Parquet Statistics of all columns in 'current' row group? So I'd have to fish out a
? Not sure if I've interpreted correctly, to be able to fish statistics out for a
I'd need to know I'm fishing out for a
. So I'm wondering if it is already Parquet Statistics for a
only, if that's the case why it's impl IntoIterator
and not just Option<&Statistics>
?
Result<ArrowStatistics>
, once I get a handle on a
's Parquet statistic, I think I'd need to convert each of the ValueStatistic to ArrayRef based on a
's type? I couldn't find row_count()
in ValueStatistics
though.
Sorry, just trying to get an understanding of all the moving parts.
arrow_datatype, this will be as arrow data type => Int64 or the likes?
I was thinking https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html
impl IntoIterator<Item = Option<&Statistics>>, will this be Parquet Statistics of all columns in 'current' row group
I think it would be Statistics, where each Statistics represents the values for a single row group.
if that's the case why it's impl IntoIterator and not just Option<&Statistics>?
The idea is to be able to create (efficiently) statistics for multiple row groups at a time -- since each arrow Array has significant overhead, they only make sense when they store multiple values
Sorry, just trying to get an understanding of all the moving parts.
Yeah, I agree this is a complex issue....
Thanks @alamb - I'll take a closer look tonight
I think I get the context now, after going through the RowGroupPruningStatistics
code in datafusion (relevant code below for future reference) - thanks for that pointer @alamb.
fn column(&self, name: &str) -> Option<(&ColumnChunkMetaData, &FieldRef)> {
let (idx, field) = parquet_column(self.parquet_schema, self.arrow_schema, name)?;
Some((self.row_group_metadata.column(idx), field))
}
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
let (column, field) = self.column(&column.name)?;
// here we already have mapped the statistics relevant just for the column
min_statistics(field.data_type(), std::iter::once(column.statistics())).ok()
}
I was thinking that ParquetStatistics iterator that is getting passed into the function, was for all columns in a row group. That's where I got a bit confused. I'll continue with it, documenting this as it was a light bulb moment for me and may help someone looking into this in the future.
Sorry I am a bit late to the party here, correctly interpreting the statistics requires more than just Statistics, as there is additional information that specifies things like sort order, truncation, logical types, etc... It is very likely the existing logic in DF is incorrect, which is fine, but we shouldn't commit to an API here that prevents us doing this correctly.
Additionally the API needs to be able to also handle the Page Index which exposes slightly different information from what is encoded in the file metadata.
I don't mean to discourage you, but this is one of the most arcane and subtle areas of parquet and I wonder if it might be worth starting out with something a little simpler as a first contribution? I'd recommend any of the issues marked "good first issue". As it stands this ticket needs extensive research and design work from someone with a good deal of knowledge about parquet, before even getting started on what will likely be pretty complex code. There are still ongoing discussions on parquet-format about correctly interpreting statistics, the standard under-specified a number of key things :sweat_smile:.
In any event I think the discussion on this ticket so far has been helpful.
I was purposely trying to avoid a general purpose "correctly interpret all the possible statistics in parquet" type ticket for precisely the reasons @tustvold mentions about the scope being unclear
If we wait for a complete well understood API I fear it is such a big barrier we will never start.
I personally believe the functionality spelled out on this ticket (for which we have an existing example of use) would be beneficial to create.
However, that being said, maybe it would be better to start by implementing it in DataFusion where the usecase is clear and we don't have to commit to a long term API, and once it looks good we can decide if we should port it upstream
I will endeavour to pick this ticket up as a matter of priority once I have finished the current non arrow related work that is eating all my available time
Understood @tustvold - I was looking for something in the parquet sort of area to work on after doing a "good first issue" and landed on this one.
@tustvold / @alamb - I'm happy to leave this alone or if you think it's worth doing what's been proposed in this ticket in DataFusion, I can do that too. I'm easy, thanks both for your feedback.
Given @tustvold 's concerns I think we should not do it in this repo
If you wanted to work in the DataFusion repo to start vectorizing this code (and writing tests for that) I think it is valuable but it may be tricky and require non trivial research
Filed https://github.com/apache/datafusion/issues/10453 for the work in DataFusion and I think I may have recruited @NGA-TRAN to help with that
Here is a proposed API for DataFusion: https://github.com/apache/datafusion/issues/10453#issuecomment-2117733259
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Describe the solution you'd like
Add util function to convert from
ParquetStatistics
toArrayRef
Describe alternatives you've considered
arrow-datafusion has a util trait
PruningStatistics
that convertsRowGroupPruningStatistics
intoArrayRef
used to prune the blocks.~https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs#L229~
https://github.com/apache/arrow-datafusion/blob/b8f90fe9366a7406afbf5bb3f3afe5854adcf26a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs#L103-L228
But the util function like
get_min_max_values
will convert thestatistics
into datafusion'sScalarValue
and convert it back intoArrayRef
which seems very redundant because it could be done without datafusion.So I suggest that arrow-rs could support this trait like arrow2 did
Additional context