MaterializeInc / materialize

The Cloud Operational Data Store: use SQL to transform, deliver, and act on fast-changing data.
https://materialize.com
Other
5.72k stars 466 forks source link

Projection pushdown through window functions #17522

Open ggevay opened 1 year ago

ggevay commented 1 year ago

A window function (OVER clause) is currently translated to a complicated pattern in the HIR-to-MIR lowering. This prevents ProjectionPushdown from realizing if a window function and later operations use only a certain subset of input columns.

This might be an argument for creating a MirRelationExpr enum variant for window functions. Alternatively, ProjectionPushdown could just recognize the big window function pattern created by the lowering, and work with that.

Edit: Or maybe a complicated pattern matching is not needed for this. We could just make ProjectionPushdown track requests at a finer granularity: not just the requested columns, but if a column is a record, then also the requests of the individual record fields.

An example:

CREATE TABLE cities (
    name text NOT NULL,
    state text NOT NULL,
    pop int NOT NULL
);

INSERT INTO cities VALUES
    ('Los_Angeles', 'CA', 3979576),
    ('Phoenix', 'AZ', 1680992),
    ('Houston', 'TX', 2320268),
    ('San_Diego', 'CA', 1423851),
    ('San_Francisco', 'CA', 881549),
    ('New_York', 'NY', 8336817),
    ('Dallas', 'TX', 1343573),
    ('San_Antonio', 'TX', 1547253),
    ('San_Jose', 'CA', 1021795),
    ('Chicago', 'IL', 2695598),
    ('Austin', 'TX', 978908);

materialize=> explain SELECT pop, CAST(FIRST_VALUE(pop) OVER (PARTITION BY state ORDER BY pop DESC) AS float)/pop
FROM cities;
                                               Optimized Plan                                               
------------------------------------------------------------------------------------------------------------
 Explained Query:                                                                                          +
   Project (#2, #3)                                                                                        +
     Map (record_get[2](record_get[1](#1)), (integer_to_double(record_get[0](#1)) / integer_to_double(#2)))+
       FlatMap unnest_list(#0)                                                                             +
         Project (#1)                                                                                      +
           Reduce group_by=[#1] aggregates=[first_value(row(row(row(#0, #1, #2), #2), #2))]                +
             Get materialize.public.cities                                                                 +

We should Project state and pop before the Reduce.

aalexandrov commented 1 year ago

Does the same apply for PredicatePushdown?

ggevay commented 1 year ago

This is occurring for a customer here: https://materializeinc.slack.com/archives/C03648YB34N/p1692825460915989?thread_ts=1692823994.245369&cid=C03648YB34N

ggevay commented 1 year ago

Does the same apply for PredicatePushdown?

~Amazingly, PredicatePushdown seems to work:~

materialize=> 
create table t(g int, x int, y int, z int);
CREATE TABLE
materialize=> 
explain
select g, x, lag(x) over (partition by g order by x)
from t
where x = 5;
                                                        Optimized Plan                                                        
------------------------------------------------------------------------------------------------------------------------------
 Explained Query:                                                                                                            +
   Project (#3..=#5)                                                                                                         +
     Map (record_get[1](#1), record_get[0](#2), record_get[1](#2), record_get[0](#1))                                        +
       FlatMap unnest_list(#0)                                                                                               +
         Project (#1)                                                                                                        +
           Reduce group_by=[#0] aggregates=[lag[order_by=[#0 asc nulls_last]](row(row(row(#0, 5, #1, #2), [5, 1, null]), 5))]+
             Project (#0, #2, #3)                                                                                            +
               Filter (#1 = 5)                                                                                               +
                 Get materialize.public.t                                                                                    +
                                                                                                                             +
 Source materialize.public.t                                                                                                 +
   filter=((#1 = 5))                                                                                                         +

(1 row)

materialize=> 
explain
select g, x, lag(x) over (partition by g order by x)
from t
where y = 5;
                                                          Optimized Plan                                                           
-----------------------------------------------------------------------------------------------------------------------------------
 Explained Query:                                                                                                                 +
   Project (#3..=#5)                                                                                                              +
     Map (record_get[1](#1), record_get[0](#2), record_get[1](#2), record_get[0](#1))                                             +
       FlatMap unnest_list(#0)                                                                                                    +
         Project (#1)                                                                                                             +
           Reduce group_by=[#0] aggregates=[lag[order_by=[#0 asc nulls_last]](row(row(row(#0, #1, 5, #2), row(#1, 1, null)), #1))]+
             Project (#0, #1, #3)                                                                                                 +
               Filter (#2 = 5)                                                                                                    +
                 Get materialize.public.t                                                                                         +
                                                                                                                                  +
 Source materialize.public.t                                                                                                      +
   filter=((#2 = 5))                                                                                                              +

(1 row)

materialize=> 
explain
select g, x, lag(x) over (partition by g order by x)
from t
where g = 5;
                                                  Optimized Plan                                                   
-------------------------------------------------------------------------------------------------------------------
 Explained Query:                                                                                                 +
   Project (#3..=#5)                                                                                              +
     Map (record_get[1](#1), record_get[0](#2), record_get[1](#2), record_get[0](#1))                             +
       FlatMap unnest_list(#0)                                                                                    +
         Reduce aggregates=[lag[order_by=[#0 asc nulls_last]](row(row(row(5, #0, #1, #2), row(#0, 1, null)), #0))]+
           Project (#1..=#3)                                                                                      +
             Filter (#0 = 5)                                                                                      +
               Get materialize.public.t                                                                           +
                                                                                                                  +
 Source materialize.public.t                                                                                      +
   filter=((#0 = 5))                                                                                              +

(1 row)

Edit: No, in the above example, the predicate started out from below the window function. It actually doesn't work for PredicatePushdown either:

materialize=> 
create table t(g int, x int, y int, z int);

explain with (arity)
select g, x, l
from (
  select g, x, lag(x) over (partition by g order by x) as l
  from t
)
where g = 6;

CREATE TABLE
                                                                    Optimized Plan                                                                    
------------------------------------------------------------------------------------------------------------------------------------------------------
 Explained Query:                                                                                                                                    +
   Project (#5, #3, #4) // { arity: 3 }                                                                                                              +
     Filter (6 = record_get[0](#2)) // { arity: 6 }                                                                                                  +
       Map (record_get[1](#1), record_get[1](#2), record_get[0](#1), 6) // { arity: 6 }                                                              +
         FlatMap unnest_list(#0) // { arity: 2 }                                                                                                     +
           Project (#1) // { arity: 1 }                                                                                                              +
             Reduce group_by=[#0] aggregates=[lag[order_by=[#0 asc nulls_last]](row(row(row(#0, #1, #2, #3), row(#1, 1, null)), #1))] // { arity: 2 }+
               ReadStorage materialize.public.t // { arity: 4 }                                                                                      +
                                                                                                                                                     +
 Source materialize.public.t                                                                                                                         +
                                                                                                                                                     +
 Target cluster: quickstart                                                                                                                          +

(1 row)

Note that PredicatePushdown would be valid only for the column that is in the PARTITION BY clause, otherwise the results for the window function would change.

Created a separate issue: https://github.com/MaterializeInc/materialize/issues/27531

ggevay commented 5 months ago

In a big plan at a customer, there is a situation where arity goes from 124 to 33 after a window function call. And the same plan also has several other occurrences with smaller arity reductions: 22 -> 13, 12 -> 2, 23 -> 9.

So we should do this soon.

Edit: There is also a 78 -> 33 first_value at the same customer.

bosconi commented 3 months ago

@ggevay assigned to you and made P0 as a placeholder so we know to consider this in the first wave of triage.

ggevay commented 1 month ago

Occurring at https://github.com/MaterializeInc/accounts/issues/77 They pushed it down manually for now.

ggevay commented 1 month ago

Moved this to a paused status, because I haven't been able to write a single line of code in this for the past ~2 weeks due to lots of other things coming up, which seem more urgent. But this is also important, so I hope I can get back to this in the not too far future.

ggevay commented 3 weeks ago

There is a bad case of this also at https://github.com/MaterializeInc/accounts/issues/12, where arity goes from 86 to 45. Importantly, this is at the very top of a complicated dataflow, whose total mem usage is 4.6 GB (their biggest dataflow on this cluster by far), which would probably be cut almost in half by this optimization.