apache / pinot

Apache Pinot - A realtime distributed OLAP datastore
https://pinot.apache.org/
Apache License 2.0
5.41k stars 1.27k forks source link

[multistage][bug] 3 way join result in inconsistent data #9963

Open walterddr opened 1 year ago

walterddr commented 1 year ago

using SSB as example, the following produce in consistent results (num of rows)

select 
P_MFGR,
D_YEAR,
LO_REVENUE
from lineorder 
  JOIN dates ON LO_ORDERDATE = D_DATEKEY 
  JOIN part ON LO_PARTKEY = P_PARTKEY 
where 1=1
  and (D_YEAR = 1997 or D_YEAR = 1998) 
  and P_CATEGORY = 'MFGR#14' 

where as

select 
P_MFGR,
LO_REVENUE
from lineorder 
  JOIN part ON LO_PARTKEY = P_PARTKEY 
where 1=1
  and P_CATEGORY = 'MFGR#14' 

select 
D_YEAR,
LO_REVENUE
from lineorder 
  JOIN dates ON LO_ORDERDATE = D_DATEKEY 
where 1=1
  and (D_YEAR = 1997 or D_YEAR = 1998) 

produce correct number of rows consistently.

walterddr commented 1 year ago

generated logical plans

    LogicalProject(P_MFGR=[$3], D_YEAR=[$2], LO_REVENUE=[$1]): rowcount = 843.75, cumulative cost = 36927.36252915846, id = 5207 
      LogicalJoin(condition=[=($0, $4)], joinType=[inner]): rowcount = 843.75, cumulative cost = 35239.86252915846, id = 5218 

        LogicalExchange(distribution=[hash[0]]): rowcount = 375.0, cumulative cost = 33941.1465050262, id = 5213 
          LogicalProject(LO_PARTKEY=[$0], LO_REVENUE=[$1], D_YEAR=[$4]): rowcount = 375.0, cumulative cost = 7269.9793881593505, id = 5164 
            LogicalJoin(condition=[=($2, $3)], joinType=[inner]): rowcount = 375.0, cumulative cost = 6894.9793881593505, id = 5225 \

              LogicalExchange(distribution=[hash[2]]): rowcount = 100.0, cumulative cost = 5726.20422318571, id = 5220 
                LogicalProject(LO_PARTKEY=[$9], LO_REVENUE=[$10], LO_ORDERDATE=[$14]): rowcount = 100.0, cumulative cost = 200.0, id = 5156 
                  LogicalTableScan(table=[[lineorder]]): rowcount = 100.0, cumulative cost = 100.0, id = 5109 
              LogicalExchange(distribution=[hash[0]]): rowcount = 25.0, cumulative cost = 793.7751649736401, id = 5221 
                LogicalProject(D_DATEKEY=[$4], D_YEAR=[$17]): rowcount = 25.0, cumulative cost = 150.0, id = 5182 
                  LogicalFilter(condition=[OR(=($17, 1997), =($17, 1998))]): rowcount = 25.0, cumulative cost = 125.0, id = 5211 
                    LogicalTableScan(table=[[dates]]): rowcount = 100.0, cumulative cost = 100.0, id = 5111 

        LogicalExchange(distribution=[hash[1]]): rowcount = 15.0, cumulative cost = 454.9660241322652, id = 5214 
          LogicalProject(P_MFGR=[$2], P_PARTKEY=[$6]): rowcount = 15.0, cumulative cost = 130.0, id = 5189 
            LogicalFilter(condition=[=($9, 'MFGR#14')]): rowcount = 15.0, cumulative cost = 115.0, id = 5184 
              LogicalTableScan(table=[[part]]): rowcount = 100.0, cumulative cost = 100.0, id = 5115

vs. normal single join plan

    LogicalProject(P_MFGR=[$2], LO_REVENUE=[$1]): rowcount = 225.0, cumulative cost = 4789.102172922739, id = 6785 
      LogicalJoin(condition=[=($0, $3)], joinType=[inner]): rowcount = 225.0, cumulative cost = 4564.102172922739, id = 6808 
        LogicalExchange(distribution=[hash[0]]): rowcount = 100.0, cumulative cost = 3884.1361487904733, id = 6803 
          LogicalProject(LO_PARTKEY=[$9], LO_REVENUE=[$10]): rowcount = 100.0, cumulative cost = 200.0, id = 6777 
            LogicalTableScan(table=[[lineorder]]): rowcount = 100.0, cumulative cost = 100.0, id = 6757 
        LogicalExchange(distribution=[hash[1]]): rowcount = 15.0, cumulative cost = 454.9660241322652, id = 6804 
          LogicalProject(P_MFGR=[$2], P_PARTKEY=[$6]): rowcount = 15.0, cumulative cost = 130.0, id = 6800 
            LogicalFilter(condition=[=($9, 'MFGR#14')]): rowcount = 15.0, cumulative cost = 115.0, id = 6795 
              LogicalTableScan(table=[[part]]): rowcount = 100.0, cumulative cost = 100.0, id = 6759

looks correct. i think it has to do with the hash join operator's key selection