trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
10.21k stars 2.94k forks source link

Under what rules will trino's left join be converted into inner join? #23357

Open chengwei977 opened 1 week ago

chengwei977 commented 1 week ago

My trino version is 442. I get incorrect data when executing the following SQL.

SQL1:

WITH temp_1 AS ( SELECT ym, cydymc, kplx, budat, matnr, sum(abs(fkimg)) AS fkimg, sum(abs(srjh)) AS srjh, CASE WHEN sum(abs(fkimg))= 0 THEN 0 ELSE sum(abs(srjh))/ sum(abs(fkimg)) END AS dj FROM ice.dwd.dwd_zb_tzzxfx_cpjg GROUP BY ym, cydymc, kplx, budat, matnr ), temp_2 AS ( SELECT DISTINCT date_sap AS date_sap FROM ice.dim.dim_date WHERE date_sap >= '20180101' AND date_sap <= date_format(current_date, '%Y%m%d') ), temp_3 AS ( SELECT DISTINCT ym FROM ice.dwd.dwd_zb_tzzxfx_cpjg ), temp_4 AS ( SELECT a.date_sap, b.ym FROM temp_2 a JOIN temp_3 b ON date_diff( 'month', date_parse(substring(date_sap, 1, 6), '%Y%m'), date_parse(ym, '%Y%m'))<24 AND date_diff( 'month', date_parse(substring(date_sap, 1, 6), '%Y%m'), date_parse(ym, '%Y%m'))>= 0 ) , temp_5 AS ( SELECT a.date_sap, b.ym, b.budat, b.cydymc, b.kplx, b.budat, b.matnr FROM temp_4 a LEFT JOIN temp_1 b ON a.ym = b.ym AND a.date_sap = b.budat ) SELECT ym, date_sap, count(*) FROM temp_5 WHERE ym = '202408' GROUP BY ym, date_sap;

The expected number of result rows should be 731, but the actual result is 462.

=====================================================

When I execute the temp_4 table alone, the result is 731. The SQL is as follows:

SQL2:

with temp_2 AS ( SELECT DISTINCT date_sap AS date_sap FROM ice.dim.dim_date WHERE date_sap >= '20180101' AND date_sap <= date_format(current_date, '%Y%m%d') ), temp_3 AS ( SELECT DISTINCT ym FROM ice.dwd.dwd_zb_tzzxfx_cpjg ), temp_4 AS ( SELECT a.date_sap, b.ym FROM temp_2 a JOIN temp_3 b ON date_diff( 'month', date_parse(substring(date_sap, 1, 6), '%Y%m'), date_parse(ym, '%Y%m'))<24 AND date_diff( 'month', date_parse(substring(date_sap, 1, 6), '%Y%m'), date_parse(ym, '%Y%m'))>= 0 ) Select * from temp_4 where ym = '202408';

====================================

This is the execution plan of SQL1. I found that in the definition of the temp_5 temporary table, the join part changed from left join to inner join.

Trino version: 442 Fragment 0 [HASH] Output layout: [ym_2, date_sap, count] Output partitioning: SINGLE [] Output[columnNames = [ym, date_sap, _col2]] │ Layout: [ym_2:varchar, date_sap:varchar, count:bigint] │ Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B} │ ym := ym_2 │ _col2 := count └─ Aggregate[keys = [ym_2, date_sap]] │ Layout: [ym_2:varchar, date_sap:varchar, count:bigint] │ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B} │ count := count(*) └─ InnerJoin[criteria = (date_sap = budat_8), distribution = PARTITIONED] │ Layout: [date_sap:varchar, ym_2:varchar] │ Estimates: {rows: ? (?), cpu: ?, memory: 165.20kB, network: 0B} │ Distribution: PARTITIONED │ dynamicFilterAssignments = {budat_8 -> #df_1174} ├─ FilterProject[filterPredicate = ((date_diff('month', date_parse(substring(date_sap, bigint '1', bigint '6'), '%Y%m'), date_parse(ym, '%Y%m')) < bigint '24') AND (date_diff('month', date_parse(substring(date_sap, bigint '1', bigint '6'), '%Y%m'), date_parse(ym, '%Y%m')) >= bigint '0'))] │ │ Layout: [date_sap:varchar] │ │ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B} │ └─ CrossJoin[] │ │ Layout: [date_sap:varchar, ym:varchar] │ │ Estimates: {rows: ? (?), cpu: ?, memory: 10B, network: 0B} │ │ Distribution: REPLICATED │ ├─ Aggregate[type = FINAL, keys = [date_sap]] │ │ │ Layout: [date_sap:varchar] │ │ │ Estimates: {rows: ? (?), cpu: 127.29k, memory: ?, network: 0B} │ │ └─ LocalExchange[partitioning = HASH, arguments = [date_sap]] │ │ │ Layout: [date_sap:varchar] │ │ │ Estimates: {rows: 10851 (127.29kB), cpu: 127.29k, memory: 0B, network: 0B} │ │ └─ RemoteSource[sourceFragmentIds = [1]] │ │ Layout: [date_sap:varchar] │ └─ LocalExchange[partitioning = SINGLE] │ │ Layout: [ym:varchar] │ │ Estimates: {rows: 1 (5B), cpu: 0, memory: 0B, network: 0B} │ └─ RemoteSource[sourceFragmentIds = [2]] │ Layout: [ym:varchar] └─ LocalExchange[partitioning = HASH, arguments = [budat_8]] │ Layout: [ym_2:varchar, budat_8:varchar] │ Estimates: {rows: 12509 (165.20kB), cpu: 165.20k, memory: 0B, network: 0B} └─ RemoteSource[sourceFragmentIds = [4]] Layout: [ym_2:varchar, budat_8:varchar]

Fragment 1 [SOURCE] Output layout: [date_sap] Output partitioning: HASH [date_sap] Aggregate[type = PARTIAL, keys = [date_sap]] │ Layout: [date_sap:varchar] │ Estimates: {rows: 10851 (127.29kB), cpu: ?, memory: ?, network: ?} └─ ScanFilter[table = ice:dim.dim_date$data@6706129778701784257, filterPredicate = ((date_sap BETWEEN varchar '20180101' AND varchar '20240911') AND (date_diff('month', date_parse(substring(date_sap, bigint '1', bigint '6'), '%Y%m'), timestamp(3) '2024-08-01 00:00:00.000') < bigint '24') AND (date_diff('month', date_parse(substring(date_sap, bigint '1', bigint '6'), '%Y%m'), timestamp(3) '2024-08-01 00:00:00.000') >= bigint '0')), dynamicFilters = {date_sap = #df_1174}] Layout: [date_sap:varchar] Estimates: {rows: 12057 (141.43kB), cpu: 141.43k, memory: 0B, network: 0B}/{rows: 10851 (127.29kB), cpu: 141.43k, memory: 0B, network: 0B} date_sap := 1:date_sap:varchar

Fragment 2 [HASH] Output layout: [ym] Output partitioning: BROADCAST [] Aggregate[type = FINAL, keys = [ym]] │ Layout: [ym:varchar] │ Estimates: {rows: 1 (5B), cpu: 70.11k, memory: 5B, network: 0B} └─ LocalExchange[partitioning = HASH, arguments = [ym]] │ Layout: [ym:varchar] │ Estimates: {rows: 13899 (70.11kB), cpu: 70.11k, memory: 0B, network: 0B} └─ RemoteSource[sourceFragmentIds = [3]] Layout: [ym:varchar]

Fragment 3 [SOURCE] Output layout: [ym] Output partitioning: HASH [ym] Aggregate[type = PARTIAL, keys = [ym]] │ Layout: [ym:varchar] │ Estimates: {rows: 13899 (70.11kB), cpu: ?, memory: ?, network: ?} └─ ScanFilter[table = ice:dwd.dwd_zb_tzzxfx_cpjg$data@4280907653966917687, filterPredicate = (ym = varchar '202408')] Layout: [ym:varchar] Estimates: {rows: 1403801 (6.92MB), cpu: 6.92M, memory: 0B, network: 0B}/{rows: 13899 (70.11kB), cpu: 6.92M, memory: 0B, network: 0B} ym := 1:ym:varchar

Fragment 4 [HASH] Output layout: [ym_2, budat_8] Output partitioning: HASH [budat_8] Project[] │ Layout: [ym_2:varchar, budat_8:varchar] │ Estimates: {rows: 12509 (165.20kB), cpu: 165.20k, memory: 0B, network: 0B} └─ Aggregate[type = FINAL, keys = [ym_2, cydymc_3, kplx_7, budat_8, matnr_13]] │ Layout: [ym_2:varchar, cydymc_3:varchar, kplx_7:varchar, budat_8:varchar, matnr_13:varchar] │ Estimates: {rows: 12509 (387.74kB), cpu: 387.74k, memory: 387.74kB, network: 0B} └─ LocalExchange[partitioning = HASH, arguments = [ym_2, cydymc_3, kplx_7, budat_8, matnr_13]] │ Layout: [ym_2:varchar, cydymc_3:varchar, kplx_7:varchar, budat_8:varchar, matnr_13:varchar] │ Estimates: {rows: 12509 (387.74kB), cpu: 387.74k, memory: 0B, network: 0B} └─ RemoteSource[sourceFragmentIds = [5]] Layout: [ym_2:varchar, cydymc_3:varchar, kplx_7:varchar, budat_8:varchar, matnr_13:varchar]

Fragment 5 [SOURCE] Output layout: [ym_2, cydymc_3, kplx_7, budat_8, matnr_13] Output partitioning: HASH [ym_2, cydymc_3, kplx_7, budat_8, matnr_13] Aggregate[type = PARTIAL, keys = [ym_2, cydymc_3, kplx_7, budat_8, matnr_13]] │ Layout: [ym_2:varchar, cydymc_3:varchar, kplx_7:varchar, budat_8:varchar, matnr_13:varchar] │ Estimates: {rows: 12509 (387.74kB), cpu: ?, memory: ?, network: ?} └─ ScanFilter[table = ice:dwd.dwd_zb_tzzxfx_cpjg$data@4280907653966917687, filterPredicate = ((budat_8 BETWEEN varchar '20180101' AND varchar '20240911') AND (ym_2 = varchar '202408') AND (date_diff('month', date_parse(substring(budat_8, bigint '1', bigint '6'), '%Y%m'), timestamp(3) '2024-08-01 00:00:00.000') < bigint '24') AND (date_diff('month', date_parse(substring(budat_8, bigint '1', bigint '6'), '%Y%m'), timestamp(3) '2024-08-01 00:00:00.000') >= bigint '0'))] Layout: [ym_2:varchar, cydymc_3:varchar, kplx_7:varchar, budat_8:varchar, matnr_13:varchar] Estimates: {rows: 1403801 (42.49MB), cpu: 42.49M, memory: 0B, network: 0B}/{rows: 12509 (387.74kB), cpu: 42.49M, memory: 0B, network: 0B} cydymc_3 := 2:cydymc:varchar budat_8 := 7:budat:varchar kplx_7 := 6:kplx:varchar ym_2 := 1:ym:varchar matnr_13 := 12:matnr:varchar

===================================================================

If I change select a.date_sap, b.ym,xxx in temp_5 definition to select a.date_sap,a.ym,xxx, the execution plan is restored to left join. The data is returned as expected, with 731 rows. Although I know this conclusion, I am not sure what happened in the middle. Can anyone give me some help?

Trino version: 442 Fragment 0 [HASH] Output layout: [ym, date_sap, count] Output partitioning: SINGLE [] Output[columnNames = [ym, date_sap, _col2]] │ Layout: [ym:varchar, date_sap:varchar, count:bigint] │ Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B} │ _col2 := count └─ Aggregate[keys = [ym, date_sap]] │ Layout: [ym:varchar, date_sap:varchar, count:bigint] │ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B} │ count := count(*) └─ LeftJoin[criteria = (ym = ym_2) AND (date_sap = budat_8), distribution = PARTITIONED] │ Layout: [date_sap:varchar, ym:varchar] │ Estimates: {rows: ? (?), cpu: ?, memory: 165.20kB, network: 0B} │ Distribution: PARTITIONED ├─ Filter[filterPredicate = ((date_diff('month', date_parse(substring(date_sap, bigint '1', bigint '6'), '%Y%m'), date_parse(ym, '%Y%m')) < bigint '24') AND (date_diff('month', date_parse(substring(date_sap, bigint '1', bigint '6'), '%Y%m'), date_parse(ym, '%Y%m')) >= bigint '0'))] │ │ Layout: [date_sap:varchar, ym:varchar] │ │ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B} │ └─ CrossJoin[] │ │ Layout: [date_sap:varchar, ym:varchar] │ │ Estimates: {rows: ? (?), cpu: ?, memory: 10B, network: 0B} │ │ Distribution: REPLICATED │ ├─ Aggregate[type = FINAL, keys = [date_sap]] │ │ │ Layout: [date_sap:varchar] │ │ │ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B} │ │ └─ LocalExchange[partitioning = HASH, arguments = [date_sap]] │ │ │ Layout: [date_sap:varchar] │ │ │ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B} │ │ └─ RemoteSource[sourceFragmentIds = [1]] │ │ Layout: [date_sap:varchar] │ └─ LocalExchange[partitioning = SINGLE] │ │ Layout: [ym:varchar] │ │ Estimates: {rows: 1 (5B), cpu: 0, memory: 0B, network: 0B} │ └─ RemoteSource[sourceFragmentIds = [2]] │ Layout: [ym:varchar] └─ LocalExchange[partitioning = HASH, arguments = [ym_2, budat_8]] │ Layout: [ym_2:varchar, budat_8:varchar] │ Estimates: {rows: 12509 (165.20kB), cpu: 165.20k, memory: 0B, network: 0B} └─ RemoteSource[sourceFragmentIds = [4]] Layout: [ym_2:varchar, budat_8:varchar]

Fragment 1 [SOURCE] Output layout: [date_sap] Output partitioning: HASH [date_sap] Aggregate[type = PARTIAL, keys = [date_sap]] │ Layout: [date_sap:varchar] └─ ScanFilter[table = ice:dim.dim_date$data@6706129778701784257, filterPredicate = (date_sap BETWEEN varchar '20180101' AND varchar '20240911')] Layout: [date_sap:varchar] Estimates: {rows: 12057 (141.43kB), cpu: 141.43k, memory: 0B, network: 0B}/{rows: ? (?), cpu: 141.43k, memory: 0B, network: 0B} date_sap := 1:date_sap:varchar

Fragment 2 [HASH] Output layout: [ym] Output partitioning: BROADCAST [] Aggregate[type = FINAL, keys = [ym]] │ Layout: [ym:varchar] │ Estimates: {rows: 1 (5B), cpu: 70.11k, memory: 5B, network: 0B} └─ LocalExchange[partitioning = HASH, arguments = [ym]] │ Layout: [ym:varchar] │ Estimates: {rows: 13899 (70.11kB), cpu: 70.11k, memory: 0B, network: 0B} └─ RemoteSource[sourceFragmentIds = [3]] Layout: [ym:varchar]

Fragment 3 [SOURCE] Output layout: [ym] Output partitioning: HASH [ym] Aggregate[type = PARTIAL, keys = [ym]] │ Layout: [ym:varchar] │ Estimates: {rows: 13899 (70.11kB), cpu: ?, memory: ?, network: ?} └─ ScanFilter[table = ice:dwd.dwd_zb_tzzxfx_cpjg$data@4280907653966917687, filterPredicate = (ym = varchar '202408')] Layout: [ym:varchar] Estimates: {rows: 1403801 (6.92MB), cpu: 6.92M, memory: 0B, network: 0B}/{rows: 13899 (70.11kB), cpu: 6.92M, memory: 0B, network: 0B} ym := 1:ym:varchar

Fragment 4 [HASH] Output layout: [ym_2, budat_8] Output partitioning: HASH [budat_8] Project[] │ Layout: [ym_2:varchar, budat_8:varchar] │ Estimates: {rows: 12509 (165.20kB), cpu: 165.20k, memory: 0B, network: 0B} └─ Aggregate[type = FINAL, keys = [ym_2, cydymc_3, kplx_7, budat_8, matnr_13]] │ Layout: [ym_2:varchar, cydymc_3:varchar, kplx_7:varchar, budat_8:varchar, matnr_13:varchar] │ Estimates: {rows: 12509 (387.74kB), cpu: 387.74k, memory: 387.74kB, network: 0B} └─ LocalExchange[partitioning = HASH, arguments = [ym_2, cydymc_3, kplx_7, budat_8, matnr_13]] │ Layout: [ym_2:varchar, cydymc_3:varchar, kplx_7:varchar, budat_8:varchar, matnr_13:varchar] │ Estimates: {rows: 12509 (387.74kB), cpu: 387.74k, memory: 0B, network: 0B} └─ RemoteSource[sourceFragmentIds = [5]] Layout: [ym_2:varchar, cydymc_3:varchar, kplx_7:varchar, budat_8:varchar, matnr_13:varchar]

Fragment 5 [SOURCE] Output layout: [ym_2, cydymc_3, kplx_7, budat_8, matnr_13] Output partitioning: HASH [ym_2, cydymc_3, kplx_7, budat_8, matnr_13] Aggregate[type = PARTIAL, keys = [ym_2, cydymc_3, kplx_7, budat_8, matnr_13]] │ Layout: [ym_2:varchar, cydymc_3:varchar, kplx_7:varchar, budat_8:varchar, matnr_13:varchar] │ Estimates: {rows: 12509 (387.74kB), cpu: ?, memory: ?, network: ?} └─ ScanFilter[table = ice:dwd.dwd_zb_tzzxfx_cpjg$data@4280907653966917687, filterPredicate = ((budat_8 BETWEEN varchar '20180101' AND varchar '20240911') AND (ym_2 = varchar '202408') AND (date_diff('month', date_parse(substring(budat_8, bigint '1', bigint '6'), '%Y%m'), timestamp(3) '2024-08-01 00:00:00.000') < bigint '24') AND (date_diff('month', date_parse(substring(budat_8, bigint '1', bigint '6'), '%Y%m'), timestamp(3) '2024-08-01 00:00:00.000') >= bigint '0'))] Layout: [ym_2:varchar, cydymc_3:varchar, kplx_7:varchar, budat_8:varchar, matnr_13:varchar] Estimates: {rows: 1403801 (42.49MB), cpu: 42.49M, memory: 0B, network: 0B}/{rows: 12509 (387.74kB), cpu: 42.49M, memory: 0B, network: 0B} cydymc_3 := 2:cydymc:varchar budat_8 := 7:budat:varchar kplx_7 := 6:kplx:varchar ym_2 := 1:ym:varchar matnr_13 := 12:matnr:varchar

Praveen2112 commented 4 days ago

Can you provide us a set of reproduction steps with the data so that we could check it out