apache / arrow

Apache Arrow is the universal columnar format and multi-language toolbox for fast data interchange and in-memory analytics
https://arrow.apache.org/
Apache License 2.0
14.6k stars 3.54k forks source link

[C++][Parquet] Using BMI to implement filter pushdown #37559

Open baibaichen opened 1 year ago

baibaichen commented 1 year ago

Describe the enhancement requested

From Selection Pushdown in Column Stores using Bit Manipulation Instructions, they said

We empirically evaluate the proposed techniques in the context of Apache Parquet using both micro-benchmarks and the TPC-H benchmark, and show that our techniques improve the query performance of Parquet by up to one order of magnitude with representative scan queries. Further experimentation using Apache Spark demonstrates speed improvements of up to 5.5X even for end-to-end queries involving complex joins.

And

We implemented Parquet-Select in C++, based on the open-source C++ version of the Arrow/Parquet library^1^ (version 8.0.0). For all operators that use BMI instructions, we implemented a dynamic dispatching mechanism to detect whether BMI support is available on the host processors at run time and fall back to an alternative implementation if BMI support is missing.

BMI was instroduced 10 years ago, so implementing this feature should have big benefits.

Component(s)

C++, Parquet

mapleFU commented 1 year ago

When it goes to rep-def levels, I believe we already have some BMI2 impls:

https://github.com/apache/arrow/blob/a526ba697d4e3f009731bebda6838b41899051e3/cpp/src/parquet/level_conversion.cc#L128

However, other decode part missing this.

Besides, currently, the Arrow Parquet implemention even don't implement the Predicate pushdown in C++. Acero has some SelectionVector, but it doesn't use that.

Maybe we can evaluate and implement that step by step, like Velox 🤔?

baibaichen commented 1 year ago

Maybe we can evaluate and implement that step by step, like Velox 🤔?

I am currently working on gluten and clickhouse backend. Our team try to implement a filter push down like velox but using clickhouse expression evaluate framework which is not good since we have to build full Column before filter operation.

The paper gives right direction, and I think we should implement filter push donw like this in apache arrow.

Would we have a meeting on this?

mapleFU commented 1 year ago

What kind of interface does ClickHouse using around Parquet? I guess there are lots of work todo. first the expression should be cast to arrow::Expression, and then we need support some lazy evaluation / filter reorder by selector or a row-range based on the arrow::Expression.

Technically, the way like arrow-rs might be followed. As for BMI itself, I guess it's better an optional optimization rather than a neccessary-opt (like decode rep-deps and unpack in current parquet implemention)

baibaichen commented 1 year ago

As for BMI itself, I guess it's better an optional optimization rather than a neccessary-opt (like decode rep-deps and unpack in current parquet implemention)

Agreed!

What kind of interface does ClickHouse using around Parquet?

see ArrowBlockInputFormat , clickhouse use RecordBatchFileReader

I guess there are lots of work todo. first the expression should be cast to arrow::Expression, and then we need support some lazy evaluation / filter reorder by selector or a row-range based on the arrow::Expression. Technically, the way like arrow-rs might be followed.

We lack knowledge of how to build an expression evaluate framework based on arrow. It would be great that the arrow community could do this.

By the way, we also find that memmory copy hurt performanace, is it possible to directly decode into Clickhouse's data structure? The current code path looks like this :

parquet encoded buffer => arrow => clickhouse.

see ArrowColumnToCHColumn

mapleFU commented 1 year ago

By the way, we also find that memmory copy hurt performanace, is it possible to directly decode into Clickhouse's data structure?

Hmmm If ClickHouse support non-owned Buffer, or other, it could be simpler 🤔 Otherwise you need to implement the logic under cpp/src/parquet/arrow... I guess thats really huge work todo(it will be easy at first, but finally you will need to write wrapper for every types, timestamp, decimal..). Though memcpy performance for binary type might be pool now, I guess it would be benefits when StringView and other is introduced.

baibaichen commented 1 year ago

. I guess thats really huge work todo(it will be easy at first, but finally you will need to write wrapper for every types, timestamp, decimal..). Though memcpy performance for binary type might be pool now,

Yes! that's why filter push down is important, which colud reduce memory copy significantly.

mapleFU commented 1 year ago

Another thing is that have you do the field pruning and RowGroup statistics pruning? This could be done easily

mapleFU commented 1 year ago

Would we have a meeting on this?

Maybe you can email me and @wgtmac . And when we have some RFC maybe we can deliver it in arrow weekly meeting?

baibaichen commented 1 year ago

Would we have a meeting on this?

Maybe you can email me and @wgtmac . And when we have some RFC maybe we can deliver it in arrow weekly meeting?

Cool! @mapleFU @wgtmac let me know what's your timezone, so I can schedule a meeting.

mapleFU commented 1 year ago

We're at UTC-8 now. We can chat these in mail since github issue is public, and it might not directly related to the issue?

emkornfield commented 1 year ago

I'd suggest that we focus on filter pushdown first for the low-level parquet bindings and be able to plumb that through Arrow. For the lower level bindings we might want to consider something generic like substrait expressions?

Also, not everyone goes to the Arrow sync, sharing a google doc on the mailing list would be appreciated in conjunction to any conversation in Arrow.

CC @fatemehp

wgtmac commented 1 year ago

+1 for @emkornfield 's suggestion. It would be good to have a draft design in google doc and share to the public.

I have just read the paper and it simply focuses on row-level filtering optimization. However, without predicate pushdown (which is a prerequisite in this case), it still involves a lot of unnecessary I/O and filter evaluation (even using BMI on encoded values) on pages that can be filtered by page index.

But these features (i.e. predicate pushdown and selection pushdown) are orthogonal, therefore I am not objecting to implement selection pushdown using BMI. We need to choose expression and selection vector which can be used in the low-level parquet-cpp library and then integrate into the arrow layer.

mapleFU commented 1 year ago

I Propose that:

  1. Use arrow::Expression to represent the expr
  2. Firstly, we can prune by Column index. It can be done like a
Status StatisticsAsScalars(const Statistics& statistics,
                           std::shared_ptr<::arrow::Scalar>* min,
                           std::shared_ptr<::arrow::Scalar>* max);

Status ColumnIndexAsScalars(...,
                           std::shared_ptr<::arrow::Scalar>* min,
                           std::shared_ptr<::arrow::Scalar>* max);

The result might be a RowRanges. We can combine it with specific Filter Pushdown.

  1. When it comes to arrow reader, maybe we can put then within parquet/arrow directory, and try to lazy eval them. And we can drive the underlying leafReader via RowRange.
emkornfield commented 1 year ago

I would really like to try to decouple Arrow types from the core parquet implementation if possible, but this might be too large of a hurdle. There is already a callback at the page level that doesn't Arrow can make use of for page page level statistics. Figuring out a similar mechanism for the index structure makes sense to me.

This only gets to statistics pruning, as noted by the original request we likely need a more advanced API to cover lower level filtering.

mapleFU commented 1 year ago

@emkornfield Do you have any proposed API or reference on this?

zhanglistar commented 1 year ago

Yes, field pruning and row group statistics are already implemented in Clickhouse. I think that the first thing is to implement filter pushdown in cpp parquet, this is a very basic optimization. Then we can try to implement selection push down implies by the paper maybe using BMI to accelerate.

zhanglistar commented 1 year ago

There are several cpp parquet implementations in the c++ community, like starrocks, doris, duckdb, velox to avoid internal layer data structure transformation leading to about 2x performance boost. But the arrow parquet is the most mature implementation, clickhouse use it for long time used by gluten project. It's time to add some new things to arrows cpp-parquet.

fatemehp commented 1 year ago

+1 to implementing the most basic building blocks first.

The building blocks here seem to be: (1) The ability to express and push down a filter to the parquet reader. (2) The ability to evaluate a filter in the parquet reader and return matching rows.

We need to also have the option of returning the matching row ranges. Consider this query: SELECT A, B, C FROM t WHERE A = 10. We want to first read A and find the matching rows. Then we want to use that to skip non-matching rows for B, C.

wgtmac commented 1 year ago

I would really like to try to decouple Arrow types from the core parquet implementation if possible, but this might be too large of a hurdle. There is already a callback at the page level that doesn't Arrow can make use of for page page level statistics. Figuring out a similar mechanism for the index structure makes sense to me.

This only gets to statistics pruning, as noted by the original request we likely need a more advanced API to cover lower level filtering.

I have thought the same thing. But it requires a brand-new expression interface that deals with parquet physical and logical types. I have implemented a native expression interface to the Apache ORC C++ library, so I know how much effort it takes.

In our in-house platform, we have ended up with a solution pretty much similar to the implementation in the parquet-mr:

So my proposal is to use a list of selected row ranges to be the adapter between parquet layer and arrow layer to implement the predicate push down:

The proposal above aligns with the idea presented by the paper. We can use a selection bitmap to represent the list of selected row ranges. Then the idea to apply BMI to push down selection can be integrated seamlessly.

mapleFU commented 1 year ago

@wgtmac Another problem is that, for RowSelector, we need to implement a "lazy" logic.

i.e. use different filter in AND(Filter1, Filter2, ..) to produce sub-rowrange. The interface might be around parquet::ColumnReader with a selector?

baibaichen commented 1 year ago

@wgtmac Another problem is that, for RowSelector, we need to implement a "lazy" logic.

i.e. use different filter in AND(Filter1, Filter2, ..) to produce sub-rowrange. The interface might be around parquet::ColumnReader with a selector?

What does lazy mean? Does it mean late materilization? If so, that would be the required feature.

From experience of implementing filter push down in clickhouse (not in arrow), we have found decoding unnecessary filter columns hurt performance for tpch q6.

mapleFU commented 1 year ago

Yeah. I think during read. The original logic is:

When it turns to filter pushdown, we might need some Late materialization techniques. This might change the procedure to:

[1] https://issues.apache.org/jira/browse/SPARK-36527 [2] https://docs.cloudera.com/cdw-runtime/cloud/impala-reference/topics/impala-lazy-materialization.html

The link above uses this technique. Note that I guess it not always improve the CPU performance. e.g:

For filter output like. 0 1 0 1..., it's not easy to make use of these filter to save CPU time

(And I know ClickHouse user would like to remove un-selected row when Build arrow RecordBatch above the decoded columns, I guess this is another optimization)

baibaichen commented 1 year ago

For filter output like. 0 1 0 1..., it's not easy to make use of these filter to save CPU time

I think this is where BMI goes into. Without BMI, I think such case may hurt performance.

mapleFU commented 1 year ago

This is not only the BMI Part. The underlying data decoding might origin works like decode(length-K). Now changing it to decode(1) ... decode(1) ... might even make thing worse...

baibaichen commented 1 year ago

From the paper, IIUC, they copy the selected encode value out, and then decode them as if all records are decoded.

The following cited from section 4.1

The framework is built upon a simple yet crucial observation: when performing a filter or project operation, records failing to meet prior predicates can be bypassed directly. While this observation is undeniably obvious, previous approaches have not leveraged it effectively. Indeed, in the case of filter operations, previous work tends to perform predicate evaluation on all values [29, 34], intentionally ignoring the fact that some values might have been filtered by prior filters. This is primarily because the additional cost associated with the select operator often outweighs the potential savings in predicate evaluation. However, given the fast select operator that operates on encoded values (Section 3), it has become more favorable to select values upfront, even for filter operations.

mapleFU commented 1 year ago

Ooops, sorry for misunderstand the logic here. After take a glance at the paper, seems it combine selector and the decode logic, and make it able to make the following op fast:

decoder.DecodeSpace(num_rows, selector,  ...);

This also means putting selector or DecodeSpace support under arrow decoder internal, rather than calling a Skip above. I think this might take some time to stablize and testing it, since it would be really bug-prone. But I appreciate the way

emkornfield commented 1 year ago

I have thought the same thing. But it requires a brand-new expression interface that deals with parquet physical and logical types. I have implemented a native expression interface to the Apache ORC C++ library, so I know how much effort it takes.

In our in-house platform, we have ended up with a solution pretty much similar to the implementation in the parquet-mr:

  • Use our internal expression and data type to evaluate predicates on parquet page index and then return the results in the form of a selected row range list for each row group.

  • Modify the parquet row group reader to accept the selected row range list and return records that fall into the ranges. In other words, we can push down a list of row range to the reader to avoid unnecessary I/O, decompression and decoding of filtered pages.

So my proposal is to use a list of selected row ranges to be the adapter between parquet layer and arrow layer to implement the predicate push down:

  • In the parquet-cpp library: Support pushing down a list of row ranges to the parquet file reader or row group reader. Then the reader should only return records that match the row ranges. Reading values via both arrow and non-arrow columnar layout are supported.

Agreed, I think this should be achievable with the callback that was added, or at least we can maybe extend it?

  • In the arrow layer: Use the arrow expression library for predicate evaluation on parquet page index and return the result in the form of a selected row ranges in the file/row group.

Agreed this is a first good step and I think yields the decoupling I was looking for. As long as we are reading arrow data, it makes sense to use the arrow expression library, but my aim was to not intermingle arrow types more directly with the core parquet reader.

The proposal above aligns with the idea presented by the paper. We can use a selection bitmap to represent the list of selected row ranges. Then the idea to apply BMI to push down selection can be integrated seamlessly.

I think the proposal this provides a good first pass. IIUC correctly we probably want to take this further at some point to have a callback that can be presented with encoded (or at least an intermediate form) of data where it makes sense to apply the filter (some forms might be more useful then others) to get back the row set.

@mapleFU haven't given thoughts to specifics beyond this. I think the net new item is an interface for getting presented with encoded parquet data. As noted above at a pseudo-code level, we'd probably needs to be something that can be integrated into the decoders directly that will continue to emit values, but also record filtered ranges.

mapleFU commented 1 year ago

🤔So the user-callback should directly handle the func(c-type of physical data, length, selector*)? I agree this is a good way, but I've no idea when it comes to LogicalType like Timestamp or Decimal. And for reader above, if it need read a dataset with schema evolution, different kind of PhysicalType should be scheduled(But I think it's ok to do this).

Parquet itself only decode data into compact raw-data array, maybe we need a interface for this.

mapleFU commented 1 year ago

Also cc @pitrou for some advice

pitrou commented 1 year ago

From an architectural POV, I think @westonpace and @lidavidm would be the most apt at giving advice.

I just have a couple questions:

  1. are we talking about BMI or BMI2 here? Those are two different things, BMI is far more widespread than BMI2.
  2. do we have to call intrinsics explicitly? are the corresponding operations exposed by https://github.com/xtensor-stack/xsimd and, if not, can we perhaps contribute them?
  3. perhaps some compiler optimizers can even produce those instructions automatically given the right code patterns?

Also cc @cyb70289 for more expert SIMD advice.

fatemehp commented 1 year ago

Are we considering row-level filter evaluation or just pruning based on page stats?

emkornfield commented 1 year ago

Two answer some questions:

are we talking about BMI or BMI2 here? Those are two different things, BMI is far more widespread than BMI2.

BMI2, pext and pdep I believe are what is used in the paper (I'd have to double check if there are other instructions used).

do we have to call intrinsics explicitly? are the corresponding operations exposed by https://github.com/xtensor-stack/xsimd and, if not, can we perhaps contribute them?

Per above, I think we already have the necessary instructions modelled with dynamic dispatch in parquet (we might want o revisit settings for AMD CPUs, as I think later versions now support the instructions efficiently

perhaps some compiler optimizers can even produce those instructions automatically given the right code patterns?

Maybe, this would be an area to look into.

Are we considering row-level filter evaluation or just pruning based on page stats?

It sounds like both.

fatemehp commented 1 year ago

Could we support a simple approach to filter push down first, leaving the door open for more complicated optimizations later?

wgtmac commented 1 year ago

Could we support a simple approach to filter push down first, leaving the door open for more complicated optimizations later?

Agreed.

We had an offline discussion with @zhanglistar and @baibaichen about possible solutions and collaborations. @mapleFU will sort out things in detail and post it here for review.

mapleFU commented 1 year ago

I'm to busy these days and only may have time this weekend, I just list some skeleton and might talk about it in arrow meeting tonight https://docs.google.com/document/d/1SeVcYudu6uD9rb9zRAnlLGgdauutaNZlAaS0gVzjkgM/edit

fatemehp commented 1 year ago

Thanks a lot @mapleFU for putting together the doc. It is very helpful.

A few comments:

SZn5489 commented 1 year ago

I'm a graduate student in Harbin Institute of Technology, I'm very interested in this issue and I want to jointly reproduce the operation of pushdown using BMI.

westonpace commented 1 year ago

Thanks for the summary document @mapleFU . This all sounds pretty cool to me.

I think we have decisions to make in multiple dimensions: 1) Filter expression/parsing 2) Filter pushdown 3) Filter evaluation 4) Types of filters to support (equality, range, etc). It would be nice to call these out to make it clear.

I agree, there is a lot to figure out here. However, I do think arrow-cpp's compute module has a lot of the pieces that are needed already, and I think parquet-cpp already depends on the compute module. If you want to keep depending on arrow-cpp for compute then I think page filtering with statistics should be straightforward (can mostly copy what is in datasets).

The rest would be more effort. It sounds like there is some plan to use selection vectors (which makes a lot of sense) but arrow compute doesn't use selection vectors today. However, there are a lot of really good bitmap utilities in arrow-cpp. I think anyone wanting to implement selection vectors should probably review what's available there first.

Also, https://github.com/RoaringBitmap/CRoaring might be an interesting concept to read up on when it comes to selection vectors.

Maybe a good way to get started on this work would be to create some microbenchmarks that do not currently perform well in parquet-cpp?

mapleFU commented 1 year ago

I'm a graduate student in Harbin Institute of Technology, I'm very interested in this issue and I want to jointly reproduce the operation of pushdown using BMI.

@SZn5489 Thanks, the BMI part is the final part of filter pushdown, it just works on Decoder interface (see src/parquet/encoding.cc and it's decoder). There are lots of previous work todo, including the Selection vector implemention, and filter interface.

You can first try to take a look at DecodeSpaced for DictDecoderImpl:


  int DecodeSpaced(T* buffer, int num_values, int null_count, const uint8_t* valid_bits,
                   int64_t valid_bits_offset) override {
    num_values = std::min(num_values, num_values_);
    if (num_values != idx_decoder_.GetBatchWithDictSpaced(
                          reinterpret_cast<const T*>(dictionary_->data()),
                          dictionary_length_, buffer, num_values, null_count, valid_bits,
                          valid_bits_offset)) {
      ParquetException::EofException();
    }
    num_values_ -= num_values;
    return num_values;
  }

And try to using BMI2 to optimize it. Welcome adding benchmark and optimize this. You can also try to use BMI to implement DecodeSpaced for PLAIN decoder.

mapleFU commented 1 year ago

However, I do think arrow-cpp's compute module has a lot of the pieces that are needed already, and I think parquet-cpp already depends on the compute module.

🤔I think the most tricky part is that:

  1. arrow-compute works on arrow Array
  2. parquet-cpp core module works on Physical data type, like i32, i64 etc

Maybe I should implement the filter kernel in parquet/arrow module, and parquet-cpp module only handled the selector. Otherwise dispatching the filter on physical type would be a disaster.

SZn5489 commented 1 year ago

@mapleFU Thank you for your guidance. I'm going to try to understand the code and try to implement Selection vector.

wgtmac commented 11 months ago

Finally I have got some time to complete the design doc drafted by @mapleFU: https://docs.google.com/document/d/1SeVcYudu6uD9rb9zRAnlLGgdauutaNZlAaS0gVzjkgM/.

The proposal mainly is based on the discussion with @baibaichen @binmahone.

I think it would be good to get a consensus from the community before getting hands dirty on the implementation. Please take a look and any feedback are welcome! @emkornfield @fatemehp @pitrou @westonpace

pitrou commented 11 months ago

Finally I have got some time to complete the design doc drafted by @mapleFU: https://docs.google.com/document/d/1SeVcYudu6uD9rb9zRAnlLGgdauutaNZlAaS0gVzjkgM/.

This proposes a number of reader APIs based on row ranges, but never says how row ranges are computed in the first place?

emkornfield commented 11 months ago

Finally I have got some time to complete the design doc drafted by @mapleFU: https://docs.google.com/document/d/1SeVcYudu6uD9rb9zRAnlLGgdauutaNZlAaS0gVzjkgM/.

This proposes a number of reader APIs based on row ranges, but never says how row ranges are computed in the first place?

@pitrou I think the intent is that it is specifically abstract. I think there are a few different methods to produce the ranges:

  1. RowGroup level selection
  2. Page level selection via indices.
  3. "deletion vectors" from OSS table formats like Delta Lake and iceberg which specify which rows in the file are logically deleted.
  4. Using something like Arrow compute to select specific rows based off a few columns and construct the ranges.

@wgtmac I think this is partially covered in the design but it would be good to maybe make this more explicit

wgtmac commented 11 months ago

Finally I have got some time to complete the design doc drafted by @mapleFU: https://docs.google.com/document/d/1SeVcYudu6uD9rb9zRAnlLGgdauutaNZlAaS0gVzjkgM/.

This proposes a number of reader APIs based on row ranges, but never says how row ranges are computed in the first place?

I agree with @emkornfield that there are many approaches to produce row ranges.

AFAIK, many downstream projects have different expression APIs and only use the arrow layer of parquet-cpp (not the dataset layer). It is difficult to determine a single approach of producing row ranges for all downstream projects, but it is easy to make the agreement to push down row ranges to the parquet reader to achieve filtering. Therefore I leave the freedom of different engines to design their own logic to produce row ranges.

At the moment, I have marked filtering support of parquet dataset reader as a non-goal in the doc. My idea is to use expressions from Arrow compute to do something similar to what parquet-mr does: https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java#L65-L104.

fatemehp commented 11 months ago

Why is it necessary to have a RowRanges API? The ::parquet::internal::RecordReader has ReadRecords and SkipRecords APIs, this should be sufficient to read/skip ranges of rows.

This will reduce the burden of converting whatever upstream format of row ranges to the one compatible with what we define here.

P.S. We have considered removing the RecordReader from the internal scope: https://github.com/apache/arrow/pull/37003#discussion_r1287132438

wgtmac commented 11 months ago

Why is it necessary to have a RowRanges API? The ::parquet::internal::RecordReader has ReadRecords and SkipRecords APIs, this should be sufficient to read/skip ranges of rows.

This will reduce the burden of converting whatever upstream format of row ranges to the one compatible with what we define here.

P.S. We have considered removing the RecordReader from the internal scope: #37003 (comment)

IMO, it would be too late to optimize I/O and decoding when SkipRecords is called. Pushing down row ranges has good separation and makes it easy to do the planning before reading.

fatemehp commented 11 months ago

I added a comment to the doc, I think we should think more about the format of the ranges both in terms of memory and performance.

I would consider analyzing the following options before making a decision:

1) A set of runs of 0s and 1s. These could be storage optimized/encoded and cheaply read.

2) Storing num_to_read/num_to_skip instead of from/to. If we limit the size of the runs, we could store them in uint16_t instead of the proposed int64_t for from/to.

3) A storage-optimized bitmap such as a RoaringBitmap.

emkornfield commented 11 months ago

P.S. We have considered removing the RecordReader from the internal scope: #37003 (comment)

@fatemehp I think we are all ok with this, it is just waiting a PR: https://github.com/apache/arrow/pull/37003#issuecomment-1765502208

wordhardqi commented 10 months ago

do we have a PR for BMI now