apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.37k stars 1.2k forks source link

Reduce copying in `CoalesceBatchesExec` for StringViews #11628

Open alamb opened 4 months ago

alamb commented 4 months ago

Is your feature request related to a problem or challenge?

In pictures, what https://github.com/apache/datafusion/pull/11587 does is like this (to ensure lots of unreachable "garbage" does not accumulate in the output batch)

┌────────────────────┐
│    RecordBatch     │                ┌────────────────────┐
│   num_rows = 23    │                │    RecordBatch     │
└────────────────────┘                │   num_rows = 23    │              ┌────────────────────┐
                                      └────────────────────┘              │                    │
┌────────────────────┐                                        Coalesce    │                    │
│                    │ StringView::gc ┌────────────────────┐   Batches    │                    │
│    RecordBatch     │                │    RecordBatch     │              │                    │
│   num_rows = 50    │                │   num_rows = 50    │  ─ ─ ─ ─ ─▶  │                    │
│                    │    ─ ─ ─ ─ ─▶  │                    │              │    RecordBatch     │
│                    │                └────────────────────┘              │   num_rows = 106   │
└────────────────────┘                                                    │                    │
                                                                          │                    │
┌────────────────────┐                ┌────────────────────┐              │                    │
│                    │                │    RecordBatch     │              │                    │
│    RecordBatch     │                │   num_rows = 33    │              │                    │
│   num_rows = 33    │                │                    │              └────────────────────┘
│                    │                └────────────────────┘
└────────────────────┘

However, as @2010YOUY01 pointed out in https://github.com/apache/datafusion/pull/11587/files#r1686678665

So here inside gc string buffer will be copied once, (below) in concat_batches() string buffer will be copied again, it seems possible to copy only once by changing the internal implementation of concat_batches()

This implementation will effectively copy the data twice -- once for the call to gc and once for the call coalsece batches.

Due to the nature of StringView the actual strings vaules are only copied once, but the u128 view value will be copied twice

Describe the solution you'd like

Somehow structure the code to avoid copying the views again. Like this

┌────────────────────┐                                       
│    RecordBatch     │                                       
│   num_rows = 23    │                 ┌────────────────────┐
└────────────────────┘                 │                    │
                       StringView::gc  │                    │
┌────────────────────┐  and Coalesce   │                    │
│                    │ Batches in same │                    │
│    RecordBatch     │    operation    │                    │
│   num_rows = 50    │                 │    RecordBatch     │
│                    │    ─ ─ ─ ─ ─▶   │   num_rows = 106   │
│                    │                 │                    │
└────────────────────┘                 │                    │
                                       │                    │
┌────────────────────┐                 │                    │
│                    │                 │                    │
│    RecordBatch     │                 └────────────────────┘
│   num_rows = 33    │                                       
│                    │                                       
└────────────────────┘                                       

Describe alternatives you've considered

https://github.com/apache/datafusion/pull/11587/files#r1687099239

I think given how concat is implemented for StringView it will only copy the fixed parts (not the actual string data)

Perhaps what we could do is implement a wrapper around arrow::concat_batches that has the datafusion specific GC trigger for sparse arrays, and falls back to concat for other types: https://docs.rs/arrow-select/52.1.0/src/arrow_select/concat.rs.html#150

/// wrapper around [`arrow::compute::concat`] that 
pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
 // loop over columns here and handle StringView specially, 
 // or fallback to concat
 }

Additional context

https://github.com/apache/datafusion/issues/7957 is another related idea for avoding copies

XiangpengHao commented 4 months ago

Got some time to think about this and want to share my thoughts here:

Implementation

The goal is to reduce copying string, specifically, only copying string once and only constructing view array once.

I implemented the gc in concat_batches on my local branch, the code looks like this:

    for i in 0..field_num {
        let data_type = schema.field(i).data_type();
        match data_type {
            &arrow_schema::DataType::Utf8View => {
                let mut string_view_builder = StringViewBuilder::with_capacity(row_count)
                    .with_block_size(1024 * 1024 * 2);
                for b in batches.iter() {
                    let array = b.column(i).as_string_view();
                    for v in array.iter() {
                        string_view_builder.append_option(v);
                    }
                }
                let array = string_view_builder.finish();
                arrays.push(Arc::new(array) as ArrayRef);
            }
            _ => {
                let array = arrow::compute::concat(
                    &batches
                        .iter()
                        .map(|batch| batch.column(i).as_ref())
                        .collect::<Vec<_>>(),
                )?;
                arrays.push(array);
            }
        }
    }

Benchmark this implement on ClickBench Q20:

SELECT COUNT(*) FROM hits WHERE "URL" LIKE '%google%';

The performance is slower by about 20%.

Profiling

I checked the flamegraph and found the new implementation takes significantly more time on page fault.

image

Then I ran heaptrack (need to disable mimalloc) and found the peak RSS (Peak Resident Set Size) increased from 1.3GB to 5.2GB.

I believe the performance regression is due to late GC. Previously, we called GC immediately after we ran the filter. Now, we call GC only after we accumulate enough values in the buffer, which can hold the underlying buffer for an excessively long time, leading to high memory consumption because the StringView buffer was not released timely.

Solution?

I think the discussion around reduce copying can be divide into two sub questions:

  1. what is the current overhead of StringView in CoalesceBatchesExec? The current implementation does not copy string data. The overhead (extra steps) comes from that we constructed the view three times (one in the filter step, one in the coalesce gc, one in the concate_batches). The implementation above gets rid of the second one, but it is done in an improper timing.
  2. should we refactor filter-then-coalesce into one operator? In that way, we don't have intermediate small batches, thus reduce copy. This is a bigger project and can potentially solve the first problem along the way.

I think this is another example of getting StringView fast in practice requires a lot of careful analysis and implementation!

cc @alamb @2010YOUY01

alamb commented 4 months ago

I believe the performance regression is due to late GC. Previously, we called GC immediately after we ran the filter. Now, we call GC only after we accumulate enough values in the buffer,

This makes sense to me and I think your analysis is very clear. Thank you

should we refactor filter-then-coalesce into one operator? In that way, we don't have intermediate small batches, thus reduce copy. This is a bigger project and can potentially solve the first problem along the way.

I think this is what we should pursue and I think what is covered by https://github.com/apache/datafusion/issues/7957. As you say it is likely the thing that will perform the best.

Maybe we could explore a solution that builds an the output StringViewArray as data came in, rather than wait for enough data to be accumulated. The code might look like

while let Some(batch) = input.read_batch() {
  // append new rows to inprogress output, producing a complete batch if ready
  if let Some(output_batch) = coalescer.push_batch(batch) {
    output.emit(output_batch)
  }
}

The idea would be that coalescer stores an in-progress StringViewBuilder so that as batches were pushed the data was copied

struct Coalescer {
  in_progress: StringViewBuilder 
  // and similiar things for other types 🤔 
}

impl Coalescer {
  fn push_bach(&mut self, batch: RecordBatch) -> Option<RecordBatch> {
    // copy relevant values to self.in_progress
    // if in_progress.len is greater than threshold emit a batch
  }
}

You might recognize this high level structure from https://github.com/apache/datafusion/pull/11610 :)

I think this is another example of getting StringView fast in practice requires a lot of careful analysis and implementation!

100% agree

2010YOUY01 commented 4 months ago

Benchmark this implement on ClickBench Q20:

SELECT COUNT(*) FROM hits WHERE "URL" LIKE '%google%';

The performance is slower by about 20%.

This benchmark is a great inspiration, I think this query has low selectivity and processed strings are longer, so it's preferred to do early GC and extra copies of views's influence are not dominant. And the goal is to find a strategy to work for all possible cases: low/high selectivity filter + short/long string.