trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
10.35k stars 2.98k forks source link

Predicates aren't pushed down for constants outside the scope of subquery #10938

Open takezoe opened 2 years ago

takezoe commented 2 years ago

Since 320, predicate pushdown does't work for constants from outside the scope of subquery and it slowdowns specific type of queries significantly.

For example, some queries have a chance to reduce the number of records in early stages by pushdown constants and it greatly helps the join performance in later stages. This type of queries got very slow or even suffered from the memory exceeding error on 320 or later.

A simple query to reproduce:

WITH params AS(
SELECT
  '1992-01-01 00:00:00' start_date,
  '1992-02-01 00:00:00' end_date
)
SELECT * FROM tpch.sf1.customer, params
WHERE custkey in (
  SELECT custkey FROM tpch.sf1.orders 
  WHERE orderdate >= date_parse(start_date, '%Y-%m-%d %H:%i:%s') AND orderdate < date_parse(end_date, '%Y-%m-%d %H:%i:%s')
);

Explain on 319

 Output[custkey, name, address, nationkey, phone, acctbal, mktsegment, comment, start_date, end_date]
 │   Layout: [custkey:bigint, name:varchar(25), address:varchar(40), nationkey:bigint, phone:varchar(15), acctbal:double, mktsegment:varchar(10), comment:varchar(117), expr:varchar(19), expr_0:varchar(19)]
 │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
 │   start_date := expr
 │   end_date := expr_0
 └─ RemoteExchange[GATHER]
    │   Layout: [custkey:bigint, name:varchar(25), address:varchar(40), nationkey:bigint, phone:varchar(15), acctbal:double, mktsegment:varchar(10), comment:varchar(117), expr:varchar(19), expr_0:varchar(19)]
    │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
    └─ FilterProject[filterPredicate = (CASE WHEN ("countmatches" > BIGINT '0') THEN true WHEN ("countnullmatches" > BIGINT '0') THEN CAST(null AS boolean) ELSE false END)]
       │   Layout: [custkey:bigint, name:varchar(25), address:varchar(40), nationkey:bigint, phone:varchar(15), acctbal:double, mktsegment:varchar(10), comment:varchar(117), expr:varchar(19), expr_0:varchar(19)]
       │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 339.98kB}/{rows: ? (?), cpu: ?, memory: ?, network: 339.98kB}
       └─ Aggregate(STREAMING)[custkey, name, address, nationkey, phone, acctbal, mktsegment, comment, expr, expr_0, unique]
          │   Layout: [custkey:bigint, name:varchar(25), address:varchar(40), nationkey:bigint, phone:varchar(15), acctbal:double, mktsegment:varchar(10), comment:varchar(117), expr:varchar(19), expr_0:varchar(19), unique:
          │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 339.98kB}
          │   countmatches := count(*) (mask = matchconditionsymbol_59)
          │   countnullmatches := count(*) (mask = nullmatchconditionsymbol_60)
          └─ Project[]
             │   Layout: [address:varchar(40), acctbal:double, expr_0:varchar(19), nullmatchconditionsymbol_60:boolean, nationkey:bigint, mktsegment:varchar(10), phone:varchar(15), matchconditionsymbol_59:boolean, custkey:
             │   Estimates: {rows: ? (?), cpu: ?, memory: 339.98kB, network: 339.98kB}
             │   nullmatchconditionsymbol_60 := ((NOT ("buildsideknownnonnull" IS NULL)) AND (("custkey" IS NULL) OR ("custkey_10" IS NULL)))
             │   matchconditionsymbol_59 := ((NOT ("custkey" IS NULL)) AND (NOT ("custkey_10" IS NULL)))
             └─ LeftJoin[((("custkey" IS NULL) OR ("custkey" = "custkey_10")) OR ("custkey_10" IS NULL))]
                │   Layout: [nationkey:bigint, mktsegment:varchar(10), address:varchar(40), phone:varchar(15), custkey:bigint, unique:bigint, name:varchar(25), comment:varchar(117), expr:varchar(19), acctbal:double, expr_0
                │   Estimates: {rows: ? (?), cpu: ?, memory: 339.98kB, network: 339.98kB}
                │   Distribution: REPLICATED
                ├─ AssignUniqueId
                │  │   Layout: [custkey:bigint, name:varchar(25), address:varchar(40), nationkey:bigint, phone:varchar(15), acctbal:double, mktsegment:varchar(10), comment:varchar(117), expr:varchar(19), expr_0:varchar(19)
                │  │   Estimates: {rows: 150000 (43.14MB), cpu: 95.36M, memory: 110B, network: 110B}
                │  └─ CrossJoin
                │     │   Layout: [custkey:bigint, name:varchar(25), address:varchar(40), nationkey:bigint, phone:varchar(15), acctbal:double, mktsegment:varchar(10), comment:varchar(117), expr:varchar(19), expr_0:varchar(
                │     │   Estimates: {rows: 150000 (41.85MB), cpu: 94.08M, memory: 110B, network: 110B}
                │     │   Distribution: REPLICATED
                │     ├─ TableScan[tpch:customer:sf1.0]
                │     │      Layout: [custkey:bigint, name:varchar(25), address:varchar(40), nationkey:bigint, phone:varchar(15), acctbal:double, mktsegment:varchar(10), comment:varchar(117)]
                │     │      Estimates: {rows: 150000 (26.11MB), cpu: 26.11M, memory: 0B, network: 0B}
                │     │      nationkey := tpch:nationkey
                │     │      mktsegment := tpch:mktsegment
                │     │      address := tpch:address
                │     │      phone := tpch:phone
                │     │      custkey := tpch:custkey
                │     │      name := tpch:name
                │     │      comment := tpch:comment
                │     │      acctbal := tpch:acctbal
                │     └─ LocalExchange[SINGLE] ()
                │        │   Layout: [expr:varchar(19), expr_0:varchar(19)]
                │        │   Estimates: {rows: 1 (110B), cpu: 110, memory: 0B, network: 110B}
                │        └─ RemoteExchange[REPLICATE]
                │           │   Layout: [expr:varchar(19), expr_0:varchar(19)]
                │           │   Estimates: {rows: 1 (110B), cpu: 110, memory: 0B, network: 110B}
                │           └─ Project[]
                │              │   Layout: [expr:varchar(19), expr_0:varchar(19)]
                │              │   Estimates: {rows: 1 (110B), cpu: 110, memory: 0B, network: 0B}
                │              │   expr := '1992-01-01 00:00:00'
                │              │   expr_0 := '1992-02-01 00:00:00'
                │              └─ LocalExchange[ROUND_ROBIN] ()
                │                 │   Layout: []
                │                 │   Estimates: {rows: 1 (0B), cpu: 0, memory: 0B, network: 0B}
                │                 └─ Values
                │                        Layout: []
                │                        Estimates: {rows: 1 (0B), cpu: 0, memory: 0B, network: 0B}
                │                        ()
                └─ LocalExchange[SINGLE] ()
                   │   Layout: [custkey_10:bigint, buildsideknownnonnull:bigint]
                   │   Estimates: {rows: 19335 (339.87kB), cpu: 40.39M, memory: 0B, network: 339.87kB}
                   └─ RemoteExchange[REPLICATE]
                      │   Layout: [custkey_10:bigint, buildsideknownnonnull:bigint]
                      │   Estimates: {rows: 19335 (339.87kB), cpu: 40.39M, memory: 0B, network: 339.87kB}
                      └─ ScanFilterProject[table = tpch:orders:sf1.0, filterPredicate = (("orderdate" >= DATE '1992-01-01') AND ("orderdate" < DATE '1992-02-01'))] <---- This filter disappears on 320 or later
                             Layout: [custkey_10:bigint, buildsideknownnonnull:bigint]
                             Estimates: {rows: 1500000 (25.75MB), cpu: 20.03M, memory: 0B, network: 0B}/{rows: 19335 (339.87kB), cpu: 40.05M, memory: 0B, network: 0B}/{rows: 19335 (339.87kB), cpu: 40.39M, memory: 0B, netwo
                             buildsideknownnonnull := BIGINT '0'
                             custkey_10 := tpch:custkey
                             orderdate := tpch:orderdate
                             tpch:orderstatus
                                 :: [[F], [O], [P]]

Explain on the latest master:

 Output[custkey, name, address, nationkey, phone, acctbal, mktsegment, comment, start_date, end_date]
 │   Layout: [custkey:bigint, name:varchar(25), address:varchar(40), nationkey:bigint, phone:varchar(15), acctbal:double, mktsegment:varchar(10), comment:varchar(117), expr:varchar(19), expr_0:varchar(19)]
 │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
 │   start_date := expr
 │   end_date := expr_0
 └─ RemoteExchange[GATHER]
    │   Layout: [custkey:bigint, name:varchar(25), address:varchar(40), nationkey:bigint, phone:varchar(15), acctbal:double, mktsegment:varchar(10), comment:varchar(117), expr:varchar(19), expr_0:varchar(19)]
    │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
    └─ FilterProject[filterPredicate = (CASE WHEN ("countmatches" > BIGINT '0') THEN true WHEN ("countnullmatches" > BIGINT '0') THEN CAST(null AS boolean) ELSE false END)]
       │   Layout: [custkey:bigint, name:varchar(25), address:varchar(40), nationkey:bigint, phone:varchar(15), acctbal:double, mktsegment:varchar(10), comment:varchar(117), expr:varchar(19), expr_0:varchar(19)]
       │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 32.90MB}/{rows: ? (?), cpu: ?, memory: ?, network: 32.90MB}
       └─ Aggregate(STREAMING)[custkey, name, address, nationkey, phone, acctbal, mktsegment, comment, expr, expr_0, unique]
          │   Layout: [custkey:bigint, name:varchar(25), address:varchar(40), nationkey:bigint, phone:varchar(15), acctbal:double, mktsegment:varchar(10), comment:varchar(117), expr:varchar(19), expr_0:varchar(19), unique:
          │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 32.90MB}
          │   countmatches := count(*) (mask = matchconditionsymbol_59)
          │   countnullmatches := count(*) (mask = nullmatchconditionsymbol_60)
          └─ Project[]
             │   Layout: [address:varchar(40), acctbal:double, expr_0:varchar(19), nullmatchconditionsymbol_60:boolean, nationkey:bigint, mktsegment:varchar(10), phone:varchar(15), matchconditionsymbol_59:boolean, custkey:
             │   Estimates: {rows: ? (?), cpu: ?, memory: 32.90MB, network: 32.90MB}
             │   nullmatchconditionsymbol_60 := ((NOT ("buildsideknownnonnull" IS NULL)) AND (("custkey" IS NULL) OR ("custkey_10" IS NULL)))
             │   matchconditionsymbol_59 := ((NOT ("custkey" IS NULL)) AND (NOT ("custkey_10" IS NULL)))
             └─ LeftJoin[((((("custkey" IS NULL) OR ("custkey" = "custkey_10")) OR ("custkey_10" IS NULL)) AND (CAST("orderdate" AS timestamp) >= "date_parse"("expr", '%Y-%m-%d %H:%i:%s'))) AND (CAST("orderdate" AS timesta
                │   Layout: [custkey:bigint, name:varchar(25), address:varchar(40), nationkey:bigint, phone:varchar(15), acctbal:double, mktsegment:varchar(10), comment:varchar(117), expr:varchar(19), expr_0:varchar(19), u
                │   Estimates: {rows: ? (?), cpu: ?, memory: 32.90MB, network: 32.90MB}
                │   Distribution: REPLICATED
                ├─ AssignUniqueId
                │  │   Layout: [custkey:bigint, name:varchar(25), address:varchar(40), nationkey:bigint, phone:varchar(15), acctbal:double, mktsegment:varchar(10), comment:varchar(117), expr:varchar(19), expr_0:varchar(19)
                │  │   Estimates: {rows: 150000 (43.14MB), cpu: 95.36M, memory: 110B, network: 110B}
                │  └─ CrossJoin
                │     │   Layout: [custkey:bigint, name:varchar(25), address:varchar(40), nationkey:bigint, phone:varchar(15), acctbal:double, mktsegment:varchar(10), comment:varchar(117), expr:varchar(19), expr_0:varchar(
                │     │   Estimates: {rows: 150000 (41.85MB), cpu: 94.08M, memory: 110B, network: 110B}
                │     │   Distribution: REPLICATED
                │     ├─ TableScan[tpch:customer:sf1.0]
                │     │      Layout: [custkey:bigint, name:varchar(25), address:varchar(40), nationkey:bigint, phone:varchar(15), acctbal:double, mktsegment:varchar(10), comment:varchar(117)]
                │     │      Estimates: {rows: 150000 (26.11MB), cpu: 26.11M, memory: 0B, network: 0B}
                │     │      nationkey := tpch:nationkey
                │     │      mktsegment := tpch:mktsegment
                │     │      address := tpch:address
                │     │      phone := tpch:phone
                │     │      custkey := tpch:custkey
                │     │      name := tpch:name
                │     │      comment := tpch:comment
                │     │      acctbal := tpch:acctbal
                │     └─ LocalExchange[SINGLE] ()
                │        │   Layout: [expr:varchar(19), expr_0:varchar(19)]
                │        │   Estimates: {rows: 1 (110B), cpu: 110, memory: 0B, network: 110B}
                │        └─ RemoteExchange[REPLICATE]
                │           │   Layout: [expr:varchar(19), expr_0:varchar(19)]
                │           │   Estimates: {rows: 1 (110B), cpu: 110, memory: 0B, network: 110B}
                │           └─ Project[]
                │              │   Layout: [expr:varchar(19), expr_0:varchar(19)]
                │              │   Estimates: {rows: 1 (110B), cpu: 110, memory: 0B, network: 0B}
                │              │   expr := '1992-01-01 00:00:00'
                │              │   expr_0 := '1992-02-01 00:00:00'
                │              └─ LocalExchange[ROUND_ROBIN] ()
                │                 │   Layout: []
                │                 │   Estimates: {rows: 1 (0B), cpu: 0, memory: 0B, network: 0B}
                │                 └─ Values
                │                        Layout: []
                │                        Estimates: {rows: 1 (0B), cpu: 0, memory: 0B, network: 0B}
                │                        ()
                └─ LocalExchange[SINGLE] ()
                   │   Layout: [custkey_10:bigint, orderdate:date, buildsideknownnonnull:bigint]
                   │   Estimates: {rows: 1500000 (32.90MB), cpu: 52.93M, memory: 0B, network: 32.90MB}
                   └─ RemoteExchange[REPLICATE]
                      │   Layout: [custkey_10:bigint, orderdate:date, buildsideknownnonnull:bigint]
                      │   Estimates: {rows: 1500000 (32.90MB), cpu: 52.93M, memory: 0B, network: 32.90MB}
                      └─ ScanProject[table = tpch:orders:sf1.0]
                             Layout: [custkey_10:bigint, orderdate:date, buildsideknownnonnull:bigint]
                             Estimates: {rows: 1500000 (32.90MB), cpu: 20.03M, memory: 0B, network: 0B}/{rows: 1500000 (32.90MB), cpu: 52.93M, memory: 0B, network: 0B}
                             buildsideknownnonnull := BIGINT '0'
                             custkey_10 := tpch:custkey
                             orderdate := tpch:orderdate
                             tpch:orderstatus
                                 :: [[F], [O], [P]]

I confirmed that the following scope check introduced in 320 prevents the pushdown, but not sure if just removing this scope check is fine. https://github.com/trinodb/trino/pull/1550/files#diff-d5f6d85a278c69d6969856f07362e37d18eb2fe6fd7c79526f035c3e9b4cfca0R323-R333

takezoe commented 2 years ago

@martint Hi, could you take a look at this since you are the author of https://github.com/trinodb/trino/pull/1550?

takezoe commented 2 years ago

I tried removing the scope check which was added in #1550 however the following test case failed with Division by zero which is the same error on 319. https://github.com/trinodb/trino/blob/ae713b7699e807eea3fccaa56abaf4cc8e393ccc/core/trino-main/src/test/java/io/trino/sql/query/TestPredicatePushdown.java#L45-L62 Preventing constants pushdown seems to be beneficial in this case. However, I feel that the downsides by it outweighs the benefits...

takezoe commented 2 years ago

Asking on Slack: https://trinodb.slack.com/archives/CGB0QHWSW/p1645535658457569