apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.15k stars 1.16k forks source link

Proposal: Change `Accumulator` trait to accept `RecordBatch` / `num_rows` to allow faster `Count` #8067

Open Dandandan opened 11 months ago

Dandandan commented 11 months ago

Is your feature request related to a problem or challenge?

Currently the CountAccumulator implementation requires values: &[ArrayRef] to be passed.

In order to eliminate scanning a (first) column, we need to be able to accept a RecordBatch or num_rows instead of values: &[ArrayRef].

Describe the solution you'd like

Rather than changing every method to accept a RecordBatch (and needing to update the code), I propose adding two new methods:

update_record_batch(&mut self, recordbatch: &RecordBatch) retract_record_batch(&mut self, recordbatch: &RecordBatch)

The default implementation of the methods can use update_batch and update_record_batch (i.e. assume having at least one column).

In the aggregation code, we call update_record_batch/retract_record_batch instead.

Describe alternatives you've considered

No response

Additional context

No response

2010YOUY01 commented 11 months ago

Was that because this counting operation is possible to be done during scanning?

Looks like it's a case of aggregate pushdown. For min()/max()/count() aggregate functions on Parquet, it's possible to get the result on whole column only use metadata, without full scan.

To do that i think update_record_batch() is needed, possibly also allow RecordBatch to carry more flexible payloads

alamb commented 11 months ago

I think using a RecordBatch rather than &[ArrayRef] makes sense to me

If we are going to change the API anyways, I recommend considering changing the signature to ColumnarValue so it can handle either a RecordBatch or a ScalarValue

alamb commented 11 months ago

The other thing maybe we can think about while messing with the Accumulator trait is how we might expose GroupsAccumulator as well 🤔

Dandandan commented 11 months ago

I looked a bit more into this, it looks currently we're getting away mostly by converting 1 scalars as "count expression" (count(Int64(1)) to an array with to_array_of_size. This is a bit wasteful, but also not extremely bad (as long as the size is not enormous).