apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.29k stars 973 forks source link

Bad performance on wide tables (1000+ columns) #7698

Open karlovnv opened 8 months ago

karlovnv commented 8 months ago

Describe the bug

I'am testing DataFusion for using it in a system which has several thousand columns and billions of rows. I'm excited about the flexibility and possibilities this technology provides.

The problems we faced with: 1) Optimization of the logical plan works slowly because it has to copy the whole schema in some rules. We workarounded it with prepared queries (we cache parametrized logical plan) 2) Creation of physical plan consume up to 35% on CPU, which is more than it's execution (we use several hundreds of aggregation functions and DF shows pretty good execution time)

Some investigation on that showed, that there a lot of string comparisons (take a look at flamegraph)

  29 %      datafusion_physical_expr::planner::create_physical_expr 
  28.5 %    --> datafusion_common::dfschema::DFSchema::index_of_column
  28.5 %    -- --> datafusion_common::dfschema::DFSchema::index_of_column_by_name
   7.4 %    -- -- --> __memcmp_sse4_1
  14.6 %    -- -- --> datafusion_common::table_reference::TableReference::resolved_eq
   6.8 %    -- -- -- --> __memcmp_sse4_1

photo_2023-09-29_14-29-16

Now algorithm has O(N^2) complexity (N in iterating all the columns in datafusion_common::dfschema::DFSchema::index_of_column_by_name and N in datafusion_common::table_reference::TableReference::resolved_eq).

https://github.com/apache/arrow-datafusion/blob/22d03c127e7c5e56cf97ae33eb4446d5b7022eaa/datafusion/common/src/dfschema.rs#L211

Some ideas to resolve:

Thank you for developing a such great tool!

To Reproduce

It's hard to extract some code from the project, but I will try to build simple repro

Expected behavior

Creation of physical plan spent much less time in CPU than it's execution

Additional context

No response

alamb commented 8 months ago

Thank you for the report @karlovnv -- I agree with your analysis and we are indeed tracking various ways to make DataFusion's planing faster in https://github.com/apache/arrow-datafusion/issues/5637

Another of the performance issues I think is related to the ones you have already identified, which is related to the representation of Schemas and name resolution (often error strings are created and then ignored, for example)

If you (or anyone else) has any time to help with this project it would be most appreciated

karlovnv commented 8 months ago

@alamb , thank you for reply! I will continue posting about bottlenecks in DF (for instance I've noticed degradation DF performance due to aggressive concurrency in tokio scheduller and workarounded it by using multiple tokio runtimes; tested DF pinned to NUMA and so on)

karlovnv commented 8 months ago

various ways to make DataFusion's planing faster

Also it's good to consider implementing prepared physical plans (with parametrization) it will add an ability to cache them

maruschin commented 7 months ago

Good issue, I do it. We can't use HashMap because we need to preserve the insertion order. Something like IndexMap would fit here: https://docs.rs/indexmap/latest/indexmap/ Or store list index in HashMap.

alamb commented 7 months ago

Thanks @maruschin

I wonder if this is a good time to take a step back and see if we could make DF Schema easier to use in general -- I think it is already pretty complicated and optimizing certain methods will likely make it more so.

For example, I wonder if we making the index map will make less complex queries more so, or if we need to take more care to reuse DFSchema

Thus I suggest maybe sketching out the type of change you have in mind in a draft PR that we can discuss prior to spending all the time to get the PR polished up.

maruschin commented 7 months ago

@alamb Add some thoughts (#7895) that appeared during working on #7878. I’ll comment code in PR later.

Main things: create distinguish between qualified and unqualified column, and don't allow qualified name as name in column.

alamb commented 7 months ago

@maruschin -- it would help in general to have some idea of thewhy (rationale) behind https://github.com/apache/arrow-datafusion/pull/7895 -- presumably it is because it makes something easier / less error prone, but I am sorry I don't immediately understand

maruschin commented 7 months ago

@alamb, my intent was to guarantee that in name field we have unqualified name, like: "col1", "col.1`", not "table.col1". And prevent the possibility of initializing Column with qualifier and qualified name. After a few days I think #7895 is too verbose.

karlovnv commented 7 months ago

@alamb take a look at the PR https://github.com/apache/arrow-datafusion/pull/7870 please, where @oleggator has implemented BTree instead of list. It's improved physical plan construction x2 times

maruschin commented 7 months ago

@karlovnv could you please check performance of #7878.

I'm curious how different the performance is between my searching for candidates by field name followed by filtering and searching in a BTree.

alamb commented 7 months ago

I have reviewed https://github.com/apache/arrow-datafusion/pull/7870 and https://github.com/apache/arrow-datafusion/pull/7878. Thank you for your work @maruschin and @karlovnv

Here are my thoughts:

  1. I think some sort of performance benchmark results to know how much it is helps / hurts in in other areas (like how much longer it takes to create one). Can someone please create some benchmarks, similar to scalar.rs for index_of_column_by_name and schema creation?
  2. I think it is likely to be too expensive to build a HashMap with each DFSchema (as it is creating / copying owned strings) if that never is read -- I think it should be built on demand, as suggested by @crepererum at https://github.com/apache/arrow-datafusion/pull/7870/files#r1372786446
  3. I have been long bothered by how expensive it is to create a DFSchema. I have some ideas on how to make it faster to construct -- which might not help this usecase directly I think it might help planning in general. I will take a crack at working on this idea
oleggator commented 7 months ago

Made the benchmark. Here are the results.

zeodtr commented 6 months ago

Hi, I think DFField::qualified_name() is another performance bottleneck. For a simple SELECT many-columns query for a table that has 3617 columns, the function took 73% of the total planning time. The problem is that the function calls format! (which is expensive) every time it is called when it has a qualifier. When I changed the DFField to have a member variable that has precomputed qualified_name and to return its clone() when qualified_name() is called, the planning became 2~3 times faster. But still, clone()ing the precomputed qualified_name takes a significant percentage (~30%? I didn't measure it with the same condition) of the total planning time. (I've tested it with DataFusion 31.0.0)

zeodtr commented 6 months ago

I've applied the precomputed qualified_name I've mentioned above and the btree draft by @oleggator to DataFusion 31.0.0, then ran valgrind with a simple SELECT many-columns query for a table that has 3617 columns.

The following attachment is the resulting call graph in SVG format. (xdb_main is my project's executable name which uses DataFusion as a library.) Still clone()ing precomputed qualified_name takes about 47% of the total planning time (logical plan building + optimizing) while field_with_qualified_name now takes only a negligible percentage.

out 675915

Dandandan commented 6 months ago

You can try to put it behind a Arc to make the cloning faster?

zeodtr commented 6 months ago

You can try to put it behind a Arc to make the cloning faster?

I've additionally changed the type of the precomputed qualified_name from String to Arc<String> after the above tests. Total planning time reduced to 75% of the previous iteration. But I think it is still far from optimal.

karlovnv commented 6 months ago

it is still far from optimal

I think it's a good idea to cache instances of DFSchema (and Arrow Schema as well). Tho most flexible way is to implement user defined SchemaCacheProvider (let users of datafusion decide how cache schemas). For instance, in our case schema is being changed many times rarely then data and we can cache it for a long period of time

karlovnv commented 6 months ago

Another thought is to use cache of physical plan (I tested serialized into protobuf optimized physical plan as a cache and it leads to increasing of performance dramatically)

alamb commented 6 months ago

For instance, in our case schema is being changed many times rarely then data and we can cache it for a long period of time

We do something similar to this in IOx (cache schemas that we know don't change rather than recomputing them)

It is my opinion that in order to make DFSchema behave well and not be a bottleneck we will need to more fundamentally restructure how it works.

Right now the amount of copying required is substantial as has been pointed out several times on this thread. I think with sufficient diligence we could avoid almost all copies when manipulating DFSchema and then the extra complexity of adding a cache or other techniques would become unnecessary.

I've additionally changed the type of the precomputed qualified_name from String to Arc after the above tests. Total planning time reduced to 75% of the previous iteration. But I think it is still far from optimal.

I think this is a great idea. I think optimizing for the case of the same, reused qualifier, is a very good idea.

What do people think about the approach described on https://github.com/apache/arrow-datafusion/pull/7944? I (admittedly biasedly) think that approach would eliminate almost all allocations (instead it would be ref count updates). We can extend it to incorporate ideas like pre-caching qualified names and hash sets for column checks, and I think it could be pretty fast

zeodtr commented 6 months ago

I've tried to optimize the logical planning and optimization routines. As a result, for a wide aggregation query, the logical planning + optimization time was reduced from 49 seconds to 0.8 seconds. The details are as follows:

In the following, each optimization step is accumulated. Elapsed times are reduced accordingly. No steps require deep knowledge for plan building. All time values are in milliseconds. elapsed time after optimization includes elapsed time after creating a logical plan. So, elapsed time after optimization is logical planning time + optimization time.

Note that the following optimization steps were not heavily tested.

No code change: original timing

Optimization 1: In DFField, precompute qualifier_name in new...() functions, set it to a member variable, and use it in qualified_name()

Optimization 2: Apply https://github.com/apache/arrow-datafusion/pull/7870 (Use btree to search fields in DFSchema)

Optimization 3: Change DFField's qualified_name() to return Arc<String> instead of String

And change other codes accordingly, to avoid string clone()ing.

Optimization 4: precompute using_columns in logical_plan::builder::project()

Like this:

    let using_columns = plan.using_columns()?;
    for e in expr {
        let e = e.into();
        match e {
            Expr::Wildcard => {
                projected_expr.extend(expand_wildcard(input_schema, &plan, None)?)
            }
            Expr::QualifiedWildcard { ref qualifier } => projected_expr
                .extend(expand_qualified_wildcard(qualifier, input_schema, None)?),
            _ => projected_expr.push(columnize_expr(
                normalize_col_with_using_columns(e, &plan, &using_columns)?,
                input_schema,
            )),
        }
    }

And implement expr_rewriter::normalize_col_with_using_columns() and logical_plan::builder::normalize_with_using_columns() that receives using_columns as an argument.

Optimization 5: In DFSchema::merge() check duplicated_field with bool-based functions instead of Error-based functions

Like this:

            let duplicated_field = match field.qualifier() {
                Some(q) => self.has_field_with_qualified_name(q, field.name()),
                // for unqualified columns, check as unqualified name
                None => self.has_field_with_unqualified_name(field.name()),
            };

And implement has_field_with_unqualified_name() and has_field_with_qualified_name() which returns bool without involving Error. Since it is not an error condition, receiving bool is more appropriate anyway. Additionally get_index_of_column_by_name() which returns Option<usize> instead of Result<Option<usize>> for the above functions. field_not_found() and unqualified_field_not_found() are heavy when used for a wide table since they return all valid field names in the Error. So they must be avoided when not necessary.

Optimization 6: In expr::utils::columnize_expr() use bool-based functions instead of Error-based functions

Similar to optimization 5. Like this:

        _ => match e.display_name() {
            Ok(name) => match input_schema.get_field_with_unqualified_name(&name) {
                Some(field) => Expr::Column(field.qualified_column()),
                // expression not provided as input, do not convert to a column reference
                None => e,
            },
            Err(_) => e,
        },

And implement get_field_with_unqualified_name() in DFSchema which returns Option<&DFField> instead of Result<&DFField>. Since it is not an error condition, receiving Option is more appropriate anyway.

Optimization 7: Use IndexSet in expr::utils::find_exprs_in_exprs()

Like this:

fn find_exprs_in_exprs<F>(exprs: &[Expr], test_fn: &F) -> Vec<Expr>
where
    F: Fn(&Expr) -> bool,
{
    exprs
        .iter()
        .flat_map(|expr| find_exprs_in_expr(expr, test_fn))
        .fold(IndexSet::new(), |mut acc, expr| {
            acc.insert(expr);
            acc
        })
        .into_iter()
        .collect()
}

IndexSet is in https://docs.rs/indexmap/latest/indexmap/ crate.

Optimization 8: In logical_plan::plan::calc_func_dependencies_for_project() return early when there is no functional dependencies

Like this:

fn calc_func_dependencies_for_project(
    exprs: &[Expr],
    input: &LogicalPlan,
) -> Result<FunctionalDependencies> {
    let input_schema = input.schema();
    if !input_schema.has_functional_dependencies() {
        return Ok(FunctionalDependencies::empty());
    }

And implement DFSchema::has_functional_dependencies() and FunctionalDependencies::is_empty() for it. calc_func_dependencies_for_project() does heavy operation to get proj_indices even before it calls project_functional_dependencies() which is useless when there is no functional dependency (which is common since functional dependency is rare in my opinion). I think that functional dependency-related functions require arguments that require heavy operation even before checking whether they are required or not. So, it would be great if functional dependency-related functions receive FnOnce instead of precomputed data, to skip heavy operations if not required.

Now, the resulting 0.8 seconds is acceptable for me.

alamb commented 6 months ago

Thank you for this very detailed report @zeodtr -- this sound great. Several of the optimizations you describe above seem like they could be pulled out into small PRs for DataFusion.

How do you suggest we proceed to make progress here?

zeodtr commented 6 months ago

@alamb I may not be able to make PRs myself. Maybe other contributors can make PRs based on my report. And I'd like to hear other's opinions and test results about my optimizations. Some optimizations may not be considered appropriate - like the one that adds external dependency. And since I've investigated the 'wide aggregation' (and 'simple wide selection' although not reported above) code path only, there may be other simple performance problems in other code paths like the ones I've found. Anyway, I'm planning to apply all the optimizations I've shown to my version of DataFusion (as long as they are correct).

zeodtr commented 6 months ago

In my (humble, may be wrong) opinion, DataFusion planning code may have the following performance problems.

  1. LogicalPlan (and maybe other modules) does the same operation over and over again without any precomputing or caching in a single planning session. And LogicalPlan cannot cache anything and match each time it is called since it is an enum. In my opinion, it would be better to make it a trait and each concrete plan node implements the trait.
  2. Uses format! as if it has a negligible cost (which is not when it's called tens of thousands of times).
  3. Uses iterators as if they have negligible cost (which is not when the number of elements is not small and the operations in the iterator are not cheap).
  4. Executes operations that can be heavy before it is determined to be necessary (functional dependency case in my report).
  5. Assumes the column list is short (which is not sometimes)

It's just my humble opinion. (I don't have deep knowledge of plan building)

alamb commented 6 months ago

LogicalPlan (and maybe other modules) does the same operation over and over again without any precomputing or caching in a single planning session. And LogicalPlan cannot cache anything and match each time it is called since it is an enum. In my opinion, it would be better to make it a trait and each concrete plan node implements the trait.

I think there are tradeoffs of each approach for sure. We have had some discussions about the various pros and cons in the past that might interest you:

It's just my humble opinion. (I don't have deep knowledge of plan building)

In general, I think you have identified the core improvements necessary to support faster planning with complex schemas.

zeodtr commented 6 months ago

@alamb I've read the discussions you shared. Thank you. Since I'm not quite proficient at Rust, I might not be able to add a useful comment there. However, I do have some general feelings:

Thank you.

karlovnv commented 3 months ago

@alamb Hi! Could you please let us know if any work is planned here? We noticed that performance of DaraFusion in case of wide tables slow down significantly from version to version causes us to stay at 31(

alamb commented 3 months ago

@alamb Hi! Could you please let us know if any work is planned here? We noticed that performance of DaraFusion in case of wide tables slow down significantly from version to version causes us to stay at 31(

Hi @karlovnv -- I would say we are making some progress -- most of the work is tracked in the parent epic, https://github.com/apache/arrow-datafusion/issues/5637 and there have been some improvements recently there.

I think the best help you could give is is to ensure that the planning benchmarks we have added (see https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/benches/sql_planner.rs) accurately reflect what you are doing.

If there are other types of queries / schemas you are using that are not reflected there a PR / examples would be most appreciated as we continue to work to improve things

karlovnv commented 2 weeks ago

@alamb we tested the same perf test on 37.1 and it seems that now 99% of request time is spent on planning and optimizing (creating and optimizing of logical plan, planner, creating and optimizing of logical plan) We have 1000 columns.

It seems that it's a huge degradation since 31 (now we use 31 as it is still much more performant on planning)

image
alamb commented 2 weeks ago

Thank you for the report @karlovnv. Any benchmarks you are able to share / contribute would be most helpful for improving the code.

With the benchmarks we have in place, I am happy to report we see 10x faster planning in 38.0.0 (just released) compared to 37.0.0 for 1000 columns. Not sure if you have tried that version yet.

Details are here https://github.com/apache/datafusion/issues/5637#issuecomment-2113660595

karlovnv commented 2 weeks ago

Thank you for your reply @alamb! We'll check it on 38 and share results.

This particular example is synthetical as we implemented it using pure memory tables without any external dependencies.

In real project (we are developing an in-memory columnar database for antifraud and scoring and using DF as a query engine) we faced with similar perf issues (31 vs 37.1)