apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.86k stars 1.11k forks source link

use StringViewArray when reading String columns from Parquet #10921

Closed alamb closed 1 month ago

alamb commented 2 months ago

Is your feature request related to a problem or challenge?

Part of https://github.com/apache/datafusion/issues/10918

In order to take advantage of the parquet writer generating StringViewArrays ( https://github.com/apache/arrow-rs/issues/5530 from @ariesdevil (❤️ ) ) we need to make sure datafusion doesn't immediately cast the array back to StringView which would undo the benefits

                ▲                       
┌ ─ ─ ─ ─ ─ ─ ┐ │   After filtering,    
  StringArray   │   any unfiltered rows 
└ ─ ─ ─ ─ ─ ─ ┘ │   are gathered via    
      ...       │   the `take` kernel   
                │                       
 ┌────────────────────────────┐         
 │                            │         
 │         FilterExec         │         
 │                            │         
 └────────────────────────────┘         
                ▲                       
┌ ─ ─ ─ ─ ─ ─ ┐ │                       
  StringArray   │                       
└ ─ ─ ─ ─ ─ ─ ┘ │   Reading String data 
                │   from a Parquet file 
      ...       │   results in          
                │   StringArrays passed 
┌ ─ ─ ─ ─ ─ ─ ┐ │                       
  StringArray   │                       
└ ─ ─ ─ ─ ─ ─ ┘ │                       
                │                       
 ┌────────────────────────────┐         
 │                            │         
 │        ParquetExec         │         
 │                            │         
 └────────────────────────────┘         

      Current situation                 

Describe the solution you'd like

To support a phased rollout of this feature, I recommend we focus at first on only the first filtering operation

Specifically get to the point where the parquet reader will read data out as StringView like this:

                ▲              
┌ ─ ─ ─ ─ ─ ─ ┐ │              
  StringArray   │              
└ ─ ─ ─ ─ ─ ─ ┘ │              
      ...       │              
                │              
 ┌────────────────────────────┐
 │                            │
 │         FilterExec         │
 │                            │
 └────────────────────────────┘
┌ ─ ─ ─ ─ ─ ─ ┐ ▲              
 StringViewArr  │              
│     ay      │ │              
 ─ ─ ─ ─ ─ ─ ─  │              
      ...       │              
                │              
┌ ─ ─ ─ ─ ─ ─ ┐ │              
 StringViewArr  │              
│     ay      │ │              
 ─ ─ ─ ─ ─ ─ ─  │              
                │              
 ┌────────────────────────────┐
 │                            │
 │        ParquetExec         │
 │                            │
 └────────────────────────────┘

      Intermediate             
      Situation 1: pass        
      StringViewArray          
      between ParquetExec      

Describe alternatives you've considered

I suggest we:

  1. Make a configuration setting like "force StringViewArray" when reading parquet so we can test this. When this setting is enabled, DataFusion should configure the ParquetExec to produce StringViewArray regardless of the type stored in the parquet file
  2. Then work on incrementally rolling out support / testing for various filter expressions (especially string functions like substring and https://github.com/apache/datafusion/issues/10919)

Additional context

No response

XiangpengHao commented 2 months ago

I'll take this one, can you assign me @alamb ?

alamb commented 2 months ago

I'll take this one, can you assign me @alamb ?

BTW you can assign yourself (single word comment take): https://datafusion.apache.org/contributor-guide/index.html#finding-and-creating-issues-to-work-on

XiangpengHao commented 2 months ago

It seems that the current filter and parquet exec play nicely, and the following code will directly filter on the string view array (instead of converting to StringArrary), which is quite nice.

(the test case requires the latest arrow-rs to run)


#[tokio::test]
async fn parquet_read_filter_string_view() {
    let tmp_dir = TempDir::new().unwrap();

    let values = vec![Some("small"), None, Some("Larger than 12 bytes array")];
    let c1: ArrayRef = Arc::new(StringViewArray::from_iter(values.iter()));
    let c2: ArrayRef = Arc::new(StringArray::from_iter(values.iter()));

    let batch =
        RecordBatch::try_from_iter(vec![("c1", c1.clone()), ("c2", c2.clone())]).unwrap();

    let file_name = {
        let table_dir = tmp_dir.path().join("parquet_test");
        std::fs::create_dir(&table_dir).unwrap();
        let file_name = table_dir.join("part-0.parquet");
        let mut writer = ArrowWriter::try_new(
            fs::File::create(&file_name).unwrap(),
            batch.schema(),
            None,
        )
        .unwrap();
        writer.write(&batch).unwrap();
        writer.close().unwrap();
        file_name
    };

    let ctx = SessionContext::new();
    ctx.register_parquet("t", file_name.to_str().unwrap(), Default::default())
        .await
        .unwrap();

    async fn display_result(sql: &str, ctx: &SessionContext) {
        let result = ctx.sql(sql).await.unwrap().collect().await.unwrap();

        arrow::util::pretty::print_batches(&result).unwrap();

        for b in result {
            println!("schema: {:?}", b.schema());
        }
    }

    display_result("SELECT * from t", &ctx).await;
    display_result("SELECT * from t where c1 <> 'small'", &ctx).await;
    display_result("SELECT * from t where c2 <> 'small'", &ctx).await;
}

I'll take a closer look at the generated logical/physical plans to verify that the string view array is never being converted to string array. And if that is the case, the remaining work of this issue is probably (1) add an option to force reading StringArray as StringView array, and (2) add more tests and potentially test the generated plan uses StringViewArray consistently.

XiangpengHao commented 2 months ago

I have manually checked that there isn't any unexpected conversion.

The logical plan is

Projection: t.c1, t.c2 [c1:Utf8View;N, c2:Utf8;N]
  Filter: t.c2 != Utf8("small") [c1:Utf8View;N, c2:Utf8;N]
    TableScan: t [c1:Utf8View;N, c2:Utf8;N]

The physical plan is

CoalesceBatchesExec: target_batch_size=8192
  FilterExec: c2@1 != small
    ParquetExec: file_groups={1 group: [[tmp/.tmp4ADQEP/parquet_test/part-0.parquet]]}, projection=[c1, c2], predicate=c2@1 != small, pruning_predicate=CASE WHEN c2_null_count@2 = c2_row_count@3 THEN false ELSE c2_min@0 != small OR small != c2_max@1 END, required_guarantees=[c2 not in (small)]

I looked at the filter exec, it will eventually call into: https://github.com/XiangpengHao/datafusion/blob/string-view/datafusion/physical-expr/src/expressions/binary.rs#L260

Which calls compare_op from arrow.

I'll add more tests to demonstrate that the filter works out of the box.

alamb commented 2 months ago

@jayzhan211 and I are talking about something similar in https://github.com/apache/datafusion/issues/9403#issuecomment-2178347730

What do you think about potentially adding a new OptimizerRule, something like

struct UseStringView {}

/// Optimizer rule that rewrites portions of the Plan to use `StringViewArray` instead of 
/// `StringArray` where the operators support the new type
///
/// (some background on StringArray and why it is better for some operators)
///
/// This rule currently supports:
/// 1. Reading strings from ParquetExec (which can save a copy of the string)
/// 2. GroupBy operation
/// ...
impl OptimzierRule for UseViews { 
...
}
jayzhan211 commented 2 months ago

@jayzhan211 and I are talking about something similar in #9403 (comment)

What do you think about potentially adding a new OptimizerRule, something like

struct UseStringView {}

/// Optimizer rule that rewrites portions of the Plan to use `StringViewArray` instead of 
/// `StringArray` where the operators support the new type
///
/// (some background on StringArray and why it is better for some operators)
///
/// This rule currently supports:
/// 1. Reading strings from ParquetExec (which can save a copy of the string)
/// 2. GroupBy operation
/// ...
impl OptimzierRule for UseViews { 
...
}

We usually ensure the schema remain unchanged after applying the optimize rule, should this be the special case?

https://github.com/apache/datafusion/blob/4109f581ce9bca956e01f13ff16b30d59720e96b/datafusion/optimizer/src/optimizer.rs#L389-L393

alamb commented 2 months ago

We usually ensure the schema remain unchanged after applying the optimize rule, should this be the special case?

I think we need to keep the schema the same (that is a pretty far reaching invariant)

But maybe we could do something like add a projection to convert it bac

So like if the input plan was

FilterExec [Utf8]
  ParquetExec[Utf8]

We could implement an optimizer rule that made a plan like

ProjectionExec(cast(col) as Utf8) [utf8]
  Filter[Utf8View]
    ParquetExec[Utf8View]

🤔

XiangpengHao commented 2 months ago

I like the idea of using an optimizer rule to optimistically/optionally use StringView!

alamb commented 2 months ago

BTW the more I think about this the more I think it should probably be a PhysicalOptimizer rule (not a logical optimizer rule) as the change is related to the particular properties of the ExecutionPlans (rather than the logical type)

https://docs.rs/datafusion/latest/datafusion/physical_optimizer/optimizer/trait.PhysicalOptimizerRule.html

XiangpengHao commented 2 months ago

BTW the more I think about this the more I think it should probably be a PhysicalOptimizer rule (not a logical optimizer rule) as the change is related to the particular properties of the ExecutionPlans (rather than the logical type)

https://docs.rs/datafusion/latest/datafusion/physical_optimizer/optimizer/trait.PhysicalOptimizerRule.html

I'll try to prototype a physical optimizer

jayzhan211 commented 2 months ago

Without changing schema, we need to convert StringArray to StringViewArray -> compute -> convert back to StringArray. It would be nice if the conversion is negligible

In my case #9403 , we need to

  1. Convert to StringViewArray and introduce CastExpr to keep the schema unchanged in physical optimziation
  2. Run GroupStream with StringViewArray
  3. Apply CastExpr to get the StringArray back.

And, the cost of these 3 should always beat with processing with StringArray directly regardless of what kind of second computation is.

alamb commented 2 months ago
  • Convert to StringViewArray and introduce CastExpr to keep the schema unchanged in physical optimziation

I was actually thinking the schema will be different between some ExecutionPlans (e.g. to pass a StringView between ParquetExec and a Filter)

alamb commented 2 months ago

An update here is I think @XiangpengHao has this working on a branch, but found that performance is actually slower for StringViewArray rather than StringView. Thus he is working on https://github.com/apache/arrow-rs/issues/5904 now

XiangpengHao commented 1 month ago

Want to share some thoughts here on when to use StringViewArray and when not.

We only consider the cost of loading data from parquet to narrow the scope.

To load a StringArray, we need to copy the data to a new buffer and build offset array. The extra memory we need to setup is array len * (string len + offset size). Specifically, StringArray is array len * (string len + 4), BigStringArray is array len * (string len + 8)

To load a StringViewArray, we only need to build view array and can reuse the buffer from parquet decoder. The extra memory to setup is array len * view size, i.e., array len * 16. Note that the memory consumption of StringViewArray is constant to string length, i.e., it takes 16 bytes of memory no matter how long the underlying string is.

For a sufficiently large array, the time to build the array should be proportional to the extra memory we set up.

This means that if each of the individual string is small, i.e., smaller than 12 bytes, StringArray is actually faster than StringViewArray. In other words, we should use StringViewArray only when strings are larger than 12 bytes.

alamb commented 1 month ago

Update -- @XiangpengHao found the root cause of the "small string is slower" -- read about it in this great writeup: https://github.com/apache/arrow-rs/pull/6031

TLDR is that we can make arrow/parquet reading faster than StringView always with (even) more! work

alamb commented 1 month ago

This is done on the string-view2 branch. Once we mereg https://github.com/apache/datafusion/pull/11667 we can close this ticket I think

alamb commented 1 month ago

https://github.com/apache/datafusion/pull/11667 is ready for review 🥳