apache / arrow-rs

Official Rust implementation of Apache Arrow
https://arrow.apache.org/
Apache License 2.0
2.63k stars 803 forks source link

Optimize take/filter from multiple input arrays to a single large output array #6692

Open alamb opened 2 weeks ago

alamb commented 2 weeks ago

Is your feature request related to a problem or challenge? Please describe what you are trying to do. Upstream in DataFusion, there is a common common pattern where we have multiple input RecordBatches and want to produce an output RecordBatch with some subset of the rows from the input batches. This happens in

The kernels used here are:

The use of these kernels and patterns has two downsides:

  1. Performance overhead due to a second copy: Calling filter/take immediately copies the data, which is copied again in CoalesceBatches (see illustration below)
  2. Memory Overhead / Performance Overhead for GarbageCollecting StringView: Buffering up several RecordBatches with StringView may consume significant amounts of memory for mostly filtered rows, which requires us to run gc periodically which actually slows some things down (see https://github.com/apache/datafusion/issues/11628)

Here is an ascii art picture (from https://github.com/apache/datafusion/issues/7957) that shows the extra copy in action


┌────────────────────┐        Filter                                                                          
│                    │                    ┌────────────────────┐            Coalesce                          
│                    │    ─ ─ ─ ─ ─ ─ ▶   │    RecordBatch     │             Batches                          
│    RecordBatch     │                    │   num_rows = 234   │─ ─ ─ ─ ─ ┐                                   
│  num_rows = 8000   │                    └────────────────────┘                                              
│                    │                                                    │                                   
│                    │                                                                ┌────────────────────┐  
└────────────────────┘                                                    │           │                    │  
┌────────────────────┐                    ┌────────────────────┐                      │                    │  
│                    │        Filter      │                    │          │           │                    │  
│                    │                    │    RecordBatch     │           ─ ─ ─ ─ ─ ▶│                    │  
│    RecordBatch     │    ─ ─ ─ ─ ─ ─ ▶   │   num_rows = 500   │─ ─ ─ ─ ─ ┐           │                    │  
│  num_rows = 8000   │                    │                    │                      │    RecordBatch     │  
│                    │                    │                    │          └ ─ ─ ─ ─ ─▶│  num_rows = 8000   │  
│                    │                    └────────────────────┘                      │                    │  
└────────────────────┘                                                                │                    │  
                                                    ...                    ─ ─ ─ ─ ─ ▶│                    │  
          ...                   ...                                       │           │                    │  
                                                                                      │                    │  
┌────────────────────┐                                                    │           └────────────────────┘  
│                    │                    ┌────────────────────┐                                              
│                    │       Filter       │                    │          │                                   
│    RecordBatch     │                    │    RecordBatch     │                                              
│  num_rows = 8000   │   ─ ─ ─ ─ ─ ─ ▶    │   num_rows = 333   │─ ─ ─ ─ ─ ┘                                   
│                    │                    │                    │                                              
│                    │                    └────────────────────┘                                              
└────────────────────┘                                                                                        

                      FilterExec                                          RepartitonExec copies the data      
                      creates output batches with copies                  *again* to form final large         
                      of  the matching rows (calls take()                 RecordBatches                       
                      to make a copy)                                                                         

Describe the solution you'd like

I would like to apply filter/take to each incoming RecordBatchas it arrives, copying the data to an in progress output array, in a way that is as fast as the filter and take operations. This would reduce the extra copy that is currently required.

Note this is somewhat like the interleave kernel, except that

  1. We only need the output rows to be in the same order as the input batches (so the second usize batch index is not needed)
  2. We don't want to have to buffer all the input

Describe alternatives you've considered

One thing I have thought about is extending the builders so they can append more than one row at a time. For example:

So for example, to filter a stream of StringViewArrays I might do something like;

let mut builder = StringViewBuilder::new();
while let Some(input) = stream.next() {
  // compute some subset of input rows that make it to the output
  let filter: BooleanArray = compute_filter(&input, ....); 
  // append all rows from input where filter[i] is true
  builder.append_filtered(&input, &filter);
}

And also add an equivalent for append_take

I think if we did this right, it wouldn't be a lot of new code, we could just refactor the existing filter/take implementations. For example, I would expect that the filter kernel would then devolve into something like

fn filter(..) {
  match data_type {
    DataType::Int8 => Int8Builder::with_capacity(...)
      .append_filter(input, filter)
      .build()
...
}

Additional context

alamb commented 2 weeks ago

FYI

devanbenz commented 2 weeks ago

@alamb just to clarify your idea is to modify the existing take & filter kernels. Not create a new one right?

jayzhan211 commented 2 weeks ago

@alamb just to clarify your idea is to modify the existing take & filter kernels. Not create a new one right?

If creating a new one helps, no reason not to do it.

tustvold commented 2 weeks ago

I think the idea is sound in principle, but needs a concrete API proposal.

I'm not sure the proposed builder API makes sense, as the typing for nested types like ListBuilder and DictionaryBuilder is not what we want here, and they can't easily be type erased. We also ideally want to avoid overly bloating the arrow-array crate with kernel logic. This isn't even touching on the fact these kernels don't use the builders for performance reasons.

I think we'd need to introduce a new type-erased MutableArray abstraction or something, potentially replacing the rather problematic MutableArrayData.

The only remaining challenge concerns dictionaries, as the output dictionary needs to be computed up front. Simply not supporting dictonaries could potentially be a valid workaround though.

jayzhan211 commented 2 weeks ago

Not sure about other type but for StringView, I can only think of iterating all the filtered row and append_value one by one. If there is no further optimization we can do, I think we can implement the append logic in datafusion

devanbenz commented 2 weeks ago

Not sure about other type but for StringView, I can only think of iterating all the filtered row and append_value one by one. If there is no further optimization we can do, I think we can implement the append logic in datafusion

'append_value' does a copy though. Wouldn't that effectively still be a large amount of copies?

jayzhan211 commented 2 weeks ago

Not sure about other type but for StringView, I can only think of iterating all the filtered row and append_value one by one. If there is no further optimization we can do, I think we can implement the append logic in datafusion

'append_value' does a copy though. Wouldn't that effectively still be a large amount of copies?

With this approach, we reduce the copying to a single step. Compared to the current approach, where copying happens in multiple stages (filtering, garbage collection, and coalescing), my proposal combines these steps into one. While benchmarks are needed to confirm any performance gains, this method should, at the very least, not perform worse than the existing one

devanbenz commented 2 weeks ago

Not sure about other type but for StringView, I can only think of iterating all the filtered row and append_value one by one. If there is no further optimization we can do, I think we can implement the append logic in datafusion

'append_value' does a copy though. Wouldn't that effectively still be a large amount of copies?

With this approach, we reduce the copying to a single step. Compared to the current approach, where copying happens in multiple stages (filtering, garbage collection, and coalescing), my proposal combines these steps into one. While benchmarks are needed to confirm any performance gains, this method should, at the very least, not perform worse than the existing one

That makes sense. This would be a change downstream within Datafusion then correct?

jayzhan211 commented 2 weeks ago

Not sure about other type but for StringView, I can only think of iterating all the filtered row and append_value one by one. If there is no further optimization we can do, I think we can implement the append logic in datafusion

'append_value' does a copy though. Wouldn't that effectively still be a large amount of copies?

With this approach, we reduce the copying to a single step. Compared to the current approach, where copying happens in multiple stages (filtering, garbage collection, and coalescing), my proposal combines these steps into one. While benchmarks are needed to confirm any performance gains, this method should, at the very least, not perform worse than the existing one

That makes sense. This would be a change downstream within Datafusion then correct?

yes, you can work on it if you want to

alamb commented 2 weeks ago

So it sounds like the consensus is to work out how this might look downstream in DataFusion (maybe starting with StringView as that is what is giving us the most trouble now) and then use some of that knowledge to propose something upstream in Arrow -- sounds like a good idea to me

alamb commented 2 weeks ago

Not sure about other type but for StringView, I can only think of iterating all the filtered row and append_value one by one. If there is no further optimization we can do, I think we can implement the append logic in datafusion

@jayzhan211 yes I think this is effectively what would happen -- however the actual iteration over filtered values is quite optimized in the filter kernel (checkout what the FilterBuilder does) based on how many values are filtered and other aspect

The fact that filter is so fast in arrow means it is quite hard to get as good / faster :)

alamb commented 2 weeks ago

@tustvold

I'm not sure the proposed builder API makes sense, as the typing for nested types like ListBuilder and DictionaryBuilder is not what we want here, and they can't easily be type erased. We also ideally want to avoid overly bloating the arrow-array crate with kernel logic.

This is reasonable -- though I could imagine adding type erased builders like DynListBuilder for this usecase

This isn't even touching on the fact these kernels don't use the builders for performance reasons.

Is there some fundamental reason the builders can't made faster? If we could make the builders fast enough to use for filter that would seem to be valuable in its own right. But I am likely just dreaming here

The only remaining challenge concerns dictionaries, as the output dictionary needs to be computed up front. Simply not supporting dictonaries could potentially be a valid workaround though.

A builder based approach could help (e.g. optimize for the case where the input batches had the same dictionary and handle the case where they didn't -- either via deferred computation or on the fly or something else)

Rachelint commented 2 weeks ago

Not sure about other type but for StringView, I can only think of iterating all the filtered row and append_value one by one. If there is no further optimization we can do, I think we can implement the append logic in datafusion

Does approach seems like that for filter?

And at least, we can avoid generate multiple small batches, and concat them(a ton of copies) when big enough.

tustvold commented 2 weeks ago

This is reasonable -- though I could imagine adding type erased builders like DynListBuilder for this usecase

This sort of partially type-erased API seems like the worst of both worlds, you either want something that is completely type-erased (e.g. MutableArrayData), or fully typed (e.g. ListBuilder).

I could see us adding some sort of MutableArray abstraction to arrow-select that allows appending values from arrays based on a mask and/or selection. This would be useful not just for this use-case, but potentially as a MutableBuffer abstraction for databases, etc... However, it would be very complex to implement, especially for dictionaries.

Is there some fundamental reason the builders can't made faster

Not without changing their APIs :sweat_smile:. For the primitive builders one could simply move the current kernel implementations into the builders, but this doesn't really achieve much IMO.

A builder based approach could help (e.g. optimize for the case where the input batches had the same dictionary and handle the case where they didn't -- either via deferred computation or on the fly or something else)

Yeah, it gets very complicated and fiddly. A similar challenge likely exists for StringView, although I'm not sure what level of sophistication we've reached w.r.t automatic GC.

Does approach seems like that for filter?

That would be a very naive way to implement the filter kernel, I would encourage looking at what the selection kernels actually do.

Rachelint commented 2 weeks ago

That would be a very naive way to implement the filter kernel, I would encourage looking at what the selection kernels actually do.

I agree with it seems a naive version for filter.

Is it possible to public something like filter_native but return a iterator, then we can reduce some copied in downstream and reuse the well optimized filter in arrow:

// current
filter --> intemediate buffer in array --> final buffer

// optimized
filter --> final buffer
jayzhan211 commented 2 weeks ago

https://github.com/apache/arrow-rs/blob/e9bf8aa6bf67ec192fce1a6f3e7ab604c9689fef/arrow-select/src/filter.rs#L740

This line of code extends buffer (byte copied) regardless of the filtered result, I think it is the reason why we need gc. If we do append_value here, we have additional hash lookup and insert, but less buffer copied especially in low selectivity and no gc required later on.

alamb commented 2 weeks ago

https://github.com/apache/arrow-rs/blob/e9bf8aa6bf67ec192fce1a6f3e7ab604c9689fef/arrow-select/src/filter.rs#L740

This line of code extends buffer (byte copied) regardless of the filtered result, I think it is the reason why we need gc. If we do append_value here, we have additional hash lookup and insert, but less buffer copied especially in low selectivity and no gc required later on.

To be clear -- I think the copy is of a Vec<Buffer> (which is a Vec of pointers to the data)

jayzhan211 commented 1 week ago

This line of code extends buffer (byte copied) regardless of the filtered result, I think it is the reason why we need gc. If we do append_value here, we have additional hash lookup and insert, but less buffer copied especially in low selectivity and no gc required later on.

I would try to implement a builder like in datafusion

jayzhan211 commented 1 week ago

This line of code extends buffer (byte copied) regardless of the filtered result, I think it is the reason why we need gc. If we do append_value here, we have additional hash lookup and insert, but less buffer copied especially in low selectivity and no gc required later on.

Didn't see improvement on this approach https://github.com/apache/datafusion/pull/13450

alamb commented 1 week ago

This line of code extends buffer (byte copied) regardless of the filtered result, I think it is the reason why we need gc. If we do append_value here, we have additional hash lookup and insert, but less buffer copied especially in low selectivity and no gc required later on.

Didn't see improvement on this approach apache/datafusion#13450

I didn't have a chance to full look at https://github.com/apache/datafusion/pull/13450 -- can you summarize what approach it implemented?

jayzhan211 commented 6 days ago

The idea is to append the string view array as early as possible in the optimal memory usage to eliminate the need of garbage collection and aggregate those filtered rows in a single large batches instead of many small batches.

Current implementation

We first do Filter then Coalesce in two different operator. The coalescer push_batch try to do gc for string view type. And then concat those small batches again.

This PR

I try to combine Filter and Coalesce in Filter operator. And append string view type to the coalescer until the batch size (8192) or no more incoming batch. Then sent it to Coalesce which ideally there should do nothing and pass to the next operator.

This approach has additional cost of computing each view and lookup view's hash again, but eliminate the need of gc.

The result meet my assumption that has no much difference, but currently no further improvement in my mind too.