trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
10.21k stars 2.94k forks source link

Incorrect results for correlated query with aggregation #20830

Open Bhargavi-Sagi opened 7 months ago

Bhargavi-Sagi commented 7 months ago

For the following query pattern we are seeing incorrect results(multiple rows with same grouping keys).

select max(A.col1), B.col1, B.col2, C.col1, C.col2, C.col3, C.col4, D.col1, D.col2, D.col3, D.col4 from A,B,C,D 
where A.col2 = C.col1 
    AND C.col1 = coalesce( 
        (select e.col1 from C e where e.col4 = constant1 AND e.col5 = constant2 AND e.col6 = C.col6 AND e.col1 = C.col1),
        (select DISTINCT e.col1 from C e where e.col4 = constant1 AND e.col5 = constant3 AND e.col6 = C.col6 AND e.col1 = C.col1))
    AND B.col2 = C.col6
    AND D.col2 = B.col1
    AND C.col4 = constant1
    AND AND coalesce (D.col5, 'N') = 'Y'
    AND D.col6 IS NULL
group by B.col1, B.col2, C.col1, C.col2, C.col3, C.col4, D.col1, D.col2, D.col3, D.col4;

Before trino#19932 distinct aggregration correlated subquery is decorrelated before TransformCorrelatedScalarSubquery and hash aggregation is used in the final stage. But after trino#19932 distinct aggregation is also decorrelated as part of TransformCorrelatedScalarSubquery rule and causing streaming aggregation to be used in final stage. This is is causing multiple rows with same grouping keys in the final query result. plan_after_19932.txt plan_before_19932.txt query.txt

raunaqmorarka commented 6 months ago

@martint @sopel39 @findepi

sopel39 commented 6 months ago

@Bhargavi-Sagi could you reproduce with either tpch/tpcds query?

sopel39 commented 6 months ago

@Bhargavi-Sagi from you attachments plan after doesn't contain STREAMING aggregation and

       └─ Aggregate[keys = [account_owner_org_id, period_name, set_of_books_id, bank_account_id, currency_code_77, asset_code_combination_id, period_num, period_year, end_date], hash = [$hashvalue_238]]

from after query cannot produce duplicate rows.

Are you sure after is after change and before is before?

Bhargavi-Sagi commented 6 months ago

Apologies for the confusion. File names got interchanged. Corrected the file names in description now. Before #19932 it was hash aggregation and after #19932 plan has streaming aggregation. We are not able to repro incorrect results with tpch dataset. Even though we are able to repro similar plan with streaming aggregation it is always returning correct results with following tpch query. tpch_query.txt

sopel39 commented 6 months ago

I think I know where the issue is. It's external to https://github.com/trinodb/trino/pull/19932

sopel39 commented 6 months ago

I cannot reproduce with similar plan

explain SELECT
    ->   max(x_l_partkey + y_l_partkey)
    -> FROM
    ->   (SELECT
    ->      i.l_suppkey,
    ->      l_partkey x_l_partkey,
    ->     (SELECT j.l_partkey FROM tpch.sf1.lineitem j WHERE i.l_suppkey = j.l_suppkey) y_l_partkey FROM tpch.sf1.lineitem i)
    -> GROUP BY l_suppkey;
 Trino version: dev
 Fragment 0 [HASH]
     Output layout: [max]
     Output partitioning: SINGLE []
     Output[columnNames = [_col0]]
     │   Layout: [max:bigint]
     │   Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}
     │   _col0 := max
     └─ Project[]
        │   Layout: [max:bigint]
        │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
        └─ Aggregate[keys = [l_suppkey]]
           │   Layout: [l_suppkey:bigint, max:bigint]
           │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
           │   max := max("l_partkey_2")
           └─ FilterProject[filterPredicate = (CASE "is_distinct" WHEN true THEN true ELSE CAST(fail(28, VARCHAR 'Scalar sub-query has returned multiple rows') AS boolean) END)]
              │   Layout: [l_suppkey:bigint, l_partkey_2:bigint]
              │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
              └─ Project[]
                 │   Layout: [l_suppkey:bigint, l_partkey_2:bigint, is_distinct:boolean]
                 │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
                 └─ MarkDistinct[distinct = [l_suppkey:bigint, unique:bigint], marker = is_distinct]
                    │   Layout: [l_suppkey:bigint, unique:bigint, l_partkey_2:bigint, is_distinct:boolean]
                    └─ LocalExchange[partitioning = HASH, arguments = ["l_suppkey"]]
                       │   Layout: [l_suppkey:bigint, unique:bigint, l_partkey_2:bigint]
                       │   Estimates: {rows: 3604458755 (90.61GB), cpu: 90.61G, memory: 0B, network: 0B}
                       └─ LeftJoin[criteria = ("l_suppkey" = "l_suppkey_3"), distribution = PARTITIONED]
                          │   Layout: [l_suppkey:bigint, unique:bigint, l_partkey_2:bigint]
                          │   Estimates: {rows: 3604458755 (90.61GB), cpu: 90.82G, memory: 103.02MB, network: 0B}
                          │   Distribution: PARTITIONED
                          ├─ AssignUniqueId[]
                          │  │   Layout: [l_suppkey:bigint, unique:bigint]
                          │  │   Estimates: {rows: 6001215 (103.02MB), cpu: 51.51M, memory: 0B, network: 0B}
                          │  └─ RemoteSource[sourceFragmentIds = [1]]
                          │         Layout: [l_suppkey:bigint]
                          └─ LocalExchange[partitioning = HASH, arguments = ["l_suppkey_3"]]
                             │   Layout: [l_partkey_2:bigint, l_suppkey_3:bigint]
                             │   Estimates: {rows: 6001215 (103.02MB), cpu: 103.02M, memory: 0B, network: 0B}
                             └─ RemoteSource[sourceFragmentIds = [2]]
                                    Layout: [l_partkey_2:bigint, l_suppkey_3:bigint]

@Bhargavi-Sagi what kind of Trino are you using? I'm not familiar with

       └─ Project[projectLocality = LOCAL, protectedBarrier = NONE]

in explains

Bhargavi-Sagi commented 6 months ago

The explain plan was from athena. But with oss fork as well we are seeing similar plan.

findepi commented 6 months ago

@Bhargavi-Sagi can you please provide steps (create table, insert, select) that reproduce the problem on Trino without any Athena modifications?

sopel39 commented 6 months ago

The explain plan was from athena. But with oss fork as well we are seeing similar plan.

I tried even more sophisticated query:

trino> explain SELECT
    ->      max(y_l_partkey + z_l_partkey)
    ->    FROM
    ->      (SELECT
    ->         i.l_suppkey,
    ->        (SELECT  j.l_partkey FROM tpch.sf1.lineitem j WHERE i.l_suppkey = j.l_suppkey) y_l_partkey,
    ->        (SELECT  k.l_partkey FROM tpch.sf1.lineitem k WHERE i.l_suppkey = k.l_suppkey) z_l_partkey
    ->       FROM (SELECT l_suppkey FROM tpch.sf1.lineitem) i)
    ->   GROUP BY l_suppkey;

that produces:

 Trino version: dev                                                                                                                                                                    >
 Fragment 0 [HASH]                                                                                                                                                                     >
     Output layout: [max]                                                                                                                                                              >
     Output partitioning: SINGLE []                                                                                                                                                    >
     Output[columnNames = [_col0]]                                                                                                                                                     >
     │   Layout: [max:bigint]                                                                                                                                                          >
     │   Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}                                                                                                                     >
     │   _col0 := max                                                                                                                                                                  >
     └─ Project[]                                                                                                                                                                      >
        │   Layout: [max:bigint]                                                                                                                                                       >
        │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}                                                                                                                  >
        └─ Aggregate[keys = [l_suppkey]]                                                                                                                                               >
           │   Layout: [l_suppkey:bigint, max:bigint]                                                                                                                                  >
           │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}                                                                                                                >
           │   max := max("expr")                                                                                                                                                      >
           └─ Project[]                                                                                                                                                                >
              │   Layout: [expr:bigint, l_suppkey:bigint]                                                                                                                              >
              │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}                                                                                                            >
              │   expr := ("l_partkey_2" + "l_partkey_19")                                                                                                                             >
              └─ FilterProject[filterPredicate = (CASE "is_distinct" WHEN true THEN true ELSE CAST(fail(28, VARCHAR 'Scalar sub-query has returned multiple rows') AS boolean) END)]   >
                 │   Layout: [l_suppkey:bigint, l_partkey_2:bigint, l_partkey_19:bigint]                                                                                               >
                 │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}                                                          >
                 └─ Project[]                                                                                                                                                          >
                    │   Layout: [l_suppkey:bigint, l_partkey_2:bigint, l_partkey_19:bigint, is_distinct:boolean]                                                                       >
                    │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}                                                                                                      >
                    └─ MarkDistinct[distinct = [unique:bigint], marker = is_distinct]                                                                                                  >
                       │   Layout: [l_suppkey:bigint, l_partkey_2:bigint, unique:bigint, l_partkey_19:bigint, is_distinct:boolean]                                                     >
                       └─ LeftJoin[criteria = ("l_suppkey" = "l_suppkey_20"), distribution = PARTITIONED]                                                                              >
                          │   Layout: [l_suppkey:bigint, l_partkey_2:bigint, unique:bigint, l_partkey_19:bigint]                                                                       >
                          │   Estimates: {rows: ? (?), cpu: ?, memory: 103.02MB, network: 0B}                                                                                          >
                          │   Distribution: PARTITIONED                                                                                                                                >
                          ├─ AssignUniqueId[]                                                                                                                                          >
                          │  │   Layout: [l_suppkey:bigint, l_partkey_2:bigint, unique:bigint]                                                                                         >
                          │  │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}                                                                                             >
                          │  └─ FilterProject[filterPredicate = (CASE "is_distinct_35" WHEN true THEN true ELSE CAST(fail(28, VARCHAR 'Scalar sub-query has returned multiple rows') AS>
                          │     │   Layout: [l_suppkey:bigint, l_partkey_2:bigint]                                                                                                     >
                          │     │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}                                           >
                          │     └─ Project[]                                                                                                                                           >
                          │        │   Layout: [l_suppkey:bigint, l_partkey_2:bigint, is_distinct_35:boolean]                                                                          >
                          │        │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}                                                                                       >
                          │        └─ MarkDistinct[distinct = [l_suppkey:bigint, unique_34:bigint], marker = is_distinct_35]                                                           >
                          │           │   Layout: [l_suppkey:bigint, unique_34:bigint, l_partkey_2:bigint, is_distinct_35:boolean]                                                     >
                          │           └─ LocalExchange[partitioning = HASH, arguments = ["l_suppkey"]]                                                                                 >
                          │              │   Layout: [l_suppkey:bigint, unique_34:bigint, l_partkey_2:bigint]                                                                          >
                          │              │   Estimates: {rows: 3604458755 (90.61GB), cpu: 90.61G, memory: 0B, network: 0B}                                                             >
                          │              └─ LeftJoin[criteria = ("l_suppkey" = "l_suppkey_3"), distribution = PARTITIONED]                                                             >
                          │                 │   Layout: [l_suppkey:bigint, unique_34:bigint, l_partkey_2:bigint]                                                                       >
                          │                 │   Estimates: {rows: 3604458755 (90.61GB), cpu: 90.82G, memory: 103.02MB, network: 0B}                                                    >
                          │                 │   Distribution: PARTITIONED                                                                                                              >
                          │                 ├─ AssignUniqueId[]                                                                                                                        >
                          │                 │  │   Layout: [l_suppkey:bigint, unique_34:bigint]                                                                                        >
                          │                 │  │   Estimates: {rows: 6001215 (103.02MB), cpu: 51.51M, memory: 0B, network: 0B}                                                         >
                          │                 │  └─ RemoteSource[sourceFragmentIds = [1]]                                                                                                >
                          │                 │         Layout: [l_suppkey:bigint]

Still no success in making Aggregate[keys = [l_suppkey]] streaming. It could become streaming only if l_suppkey is ordered.

I also peaked at PropertyDerivations.Visitor#visitProject and it will correctly prune GroupingProperty if projection itself is pruning (see io.trino.spi.connector.GroupingProperty#translate)

In your case:

             └─ Project[projectLocality = LOCAL, protectedBarrier = NONE]
                │   Layout: [bank_account_id:decimal(15,0), statement_date:bigint, set_of_books_id:decimal(15,0), account_owner_org_id:decimal(15,0), period_name:varchar, end_date:bigint, period_year:decimal(15,0), period_num:decimal(15,0), currency_code_77:varchar, asset_code_combination_id:decimal(15,0), period_name_116:varchar, period_name_164:varchar, is_distinct:boolean]
                │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}

should strip any grouping properties so that:

    └─ Aggregate[type =  (STREAMING), keys = [account_owner_org_id, period_name, set_of_books_id, bank_account_id, currency_code_77, asset_code_combination_id, period_num, period_year, end_date]]
       │   Layout: [account_owner_org_id:decimal(15,0), period_name:varchar, set_of_books_id:decimal(15,0), bank_account_id:decimal(15,0), currency_code_77:varchar, asset_code_combination_id:decimal(15,0), period_num:decimal(15,0), period_year:decimal(15,0), end_date:bigint, max:date]
       │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
       │   max := max("expr")

is not derived

Bhargavi-Sagi commented 6 months ago

I am able to repro plan with streaming aggregation with this query(Also spill-enabled is set to true)

explain SELECT max (L.l_linenumber), L.l_orderkey, O.o_orderstatus FROM lineitem L, orders O
                        WHERE L.l_orderkey = O.o_orderkey
                              AND O.o_orderstatus = coalesce(
                              (SELECT o_orderstatus FROM orders O1 WHERE O1.o_orderkey = O.o_orderkey
                                           AND O1.o_orderstatus = O.o_orderstatus AND o_orderkey = 1),
                              (SELECT DISTINCT o_orderstatus FROM orders O1 WHERE  O1.o_orderkey = O.o_orderkey
                                           AND O1.o_orderstatus = O.o_orderstatus
                                    ))
                     GROUP BY L.l_orderkey, O.o_orderstatus;
sopel39 commented 6 months ago

@Bhargavi-Sagi yup. Maybe it could happen for a spill enabled query (although it's not clear for me how). Do you think you could fix the issue? I would suggest looking at PropertyDerivations.Visitor#visitJoin and property derivation in general. Try to debug what LocalProperty are propagated in AddLocalExchanges.Rewriter#visitAggregation, because that's the place:

            AggregationNode result = AggregationNode.builderFrom(node)
                    .setSource(child.getNode())
                    .setPreGroupedSymbols(preGroupedSymbols)
                    .build();

where aggregation becomes streaming