apache / incubator-gluten

Gluten is a middle layer responsible for offloading JVM-based SQL engines' execution to native engines.
https://gluten.apache.org/
Apache License 2.0
1.22k stars 439 forks source link

[GLUTEN-7868][CH] Nested column pruning for Project(Filter(Generate)) #7869

Closed taiyang-li closed 2 weeks ago

taiyang-li commented 3 weeks ago

What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

(Fixes: #7868)

How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)

(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

CREATE TEMPORARY VIEW aj
USING parquet
OPTIONS (
path  "/data1/liyang/cppproject/spark/spark-3.3.2-bin-hadoop3/aj"
); 

select * from (
select 
game_name,  event.event_info['tab_type']  as entrance
from aj
lateral view explode(split(nvl(event.event_info['game_name'],'0'),',')) game_name as game_name
where  event.event_info['action'] > 100
)
where game_name = '-1' ;

set spark.gluten.sql.extendedGeneratorNestedColumnAliasing = true;
No rows selected (0.532 seconds)
No rows selected (0.445 seconds)
No rows selected (0.447 seconds)

set spark.gluten.sql.extendedGeneratorNestedColumnAliasing = false;
No rows selected (1.737 seconds)
No rows selected (1.795 seconds)
No rows selected (1.809 seconds)
github-actions[bot] commented 3 weeks ago

https://github.com/apache/incubator-gluten/issues/7868

github-actions[bot] commented 3 weeks ago

Run Gluten Clickhouse CI on x86

github-actions[bot] commented 3 weeks ago

Run Gluten Clickhouse CI on x86

github-actions[bot] commented 3 weeks ago

Run Gluten Clickhouse CI on x86

github-actions[bot] commented 3 weeks ago

Run Gluten Clickhouse CI on x86

taiyang-li commented 3 weeks ago
0: jdbc:hive2://localhost:10000/> set spark.sql.planChangeLog.level = error; 
+--------------------------------+--------+
|              key               | value  |
+--------------------------------+--------+
| spark.sql.planChangeLog.level  | error  |
+--------------------------------+--------+
1 row selected (0.884 seconds)
0: jdbc:hive2://localhost:10000/> set spark.gluten.enabled = true;
+-----------------------+--------+
|          key          | value  |
+-----------------------+--------+
| spark.gluten.enabled  | true   |
+-----------------------+--------+
1 row selected (0.091 seconds)
0: jdbc:hive2://localhost:10000/> 
0: jdbc:hive2://localhost:10000/> CREATE TABLE aj (
. . . . . . . . . . . . . . . . > country STRING,
. . . . . . . . . . . . . . . . > event STRUCT<time:BIGINT, lng:BIGINT, lat:BIGINT, net:STRING,
. . . . . . . . . . . . . . . . > log_extra:MAP<STRING, STRING>, event_id:STRING, event_info:MAP<STRING, STRING>>
. . . . . . . . . . . . . . . . > )
. . . . . . . . . . . . . . . . > USING orc;
+---------+
| Result  |
+---------+
+---------+
No rows selected (0.981 seconds)
0: jdbc:hive2://localhost:10000/> 
0: jdbc:hive2://localhost:10000/> 
0: jdbc:hive2://localhost:10000/> INSERT INTO aj VALUES
. . . . . . . . . . . . . . . . > ('USA', named_struct('time', 1622547800, 'lng', -122, 'lat', 37, 'net',
. . . . . . . . . . . . . . . . > 'wifi', 'log_extra', map('key1', 'value1'), 'event_id', 'event1',
. . . . . . . . . . . . . . . . > 'event_info', map('tab_type', '5', 'action', '13'))),
. . . . . . . . . . . . . . . . > ('Canada', named_struct('time', 1622547801, 'lng', -79, 'lat', 43, 'net',
. . . . . . . . . . . . . . . . > '4g', 'log_extra', map('key2', 'value2'), 'event_id', 'event2',
. . . . . . . . . . . . . . . . > 'event_info', map('tab_type', '4', 'action', '12')));
+---------+
| Result  |
+---------+
+---------+
No rows selected (2.959 seconds)
0: jdbc:hive2://localhost:10000/> ;
0: jdbc:hive2://localhost:10000/> 
0: jdbc:hive2://localhost:10000/> 
0: jdbc:hive2://localhost:10000/> 
0: jdbc:hive2://localhost:10000/> explain extended 
. . . . . . . . . . . . . . . . > SELECT * FROM (
. . . . . . . . . . . . . . . . > SELECT
. . . . . . . . . . . . . . . . > game_name,
. . . . . . . . . . . . . . . . > CASE WHEN
. . . . . . . . . . . . . . . . > event.event_info['tab_type'] IN (5) THEN '1' ELSE '0' END AS entrance
. . . . . . . . . . . . . . . . > FROM aj
. . . . . . . . . . . . . . . . > LATERAL VIEW explode(split(country, ', ')) game_name AS game_name
. . . . . . . . . . . . . . . . > WHERE event.event_info['action'] IN (13)
. . . . . . . . . . . . . . . . > ) WHERE game_name = 'xxx';
+----------------------------------------------------+
|                        plan                        |
+----------------------------------------------------+
| == Parsed Logical Plan ==
'Project [*]
+- 'Filter ('game_name = xxx)
   +- 'SubqueryAlias __auto_generated_subquery_name
      +- 'Project ['game_name, CASE WHEN 'event.event_info[tab_type] IN (5) THEN 1 ELSE 0 END AS entrance#34]
         +- 'Filter 'event.event_info[action] IN (13)
            +- 'Generate 'explode('split('country, , )), false, game_name, ['game_name]
               +- 'UnresolvedRelation [aj], [], false

== Analyzed Logical Plan ==
game_name: string, entrance: string
Project [game_name#42, entrance#34]
+- Filter (game_name#42 = xxx)
   +- SubqueryAlias __auto_generated_subquery_name
      +- Project [game_name#42, CASE WHEN cast(event#41.event_info[tab_type] as string) IN (cast(5 as string)) THEN 1 ELSE 0 END AS entrance#34]
         +- Filter cast(event#41.event_info[action] as string) IN (cast(13 as string))
            +- Generate explode(split(country#40, , , -1)), false, game_name, [game_name#42]
               +- SubqueryAlias spark_catalog.default.aj
                  +- Relation default.aj[country#40,event#41] orc

== Optimized Logical Plan ==
Project [game_name#42, CASE WHEN (_extract_event_info#46[tab_type] = 5) THEN 1 ELSE 0 END AS entrance#34]
+- Filter (game_name#42 = xxx)
   +- Generate explode(split(country#40, , , -1)), [0], false, game_name, [game_name#42]
      +- Project [country#40, event#41.event_info AS _extract_event_info#46]
         +- Filter (isnotnull(event#41.event_info) AND (event#41.event_info[action] = 13))
            +- Relation default.aj[country#40,event#41] orc

== Physical Plan ==
CHNativeColumnarToRow
+- ^(1) ProjectExecTransformer [game_name#42, CASE WHEN (_extract_event_info#46[tab_type] = 5) THEN 1 ELSE 0 END AS entrance#34]
   +- ^(1) FilterExecTransformer (game_name#42 = xxx)
      +- ^(1) CHGenerateExecTransformer explode(split(country#40, , , -1)), [_extract_event_info#46], false, [game_name#42]
         +- ^(1) ProjectExecTransformer [country#40, event#41.event_info AS _extract_event_info#46]
            +- ^(1) FilterExecTransformer (isnotnull(event#41.event_info) AND (event#41.event_info[action] = 13))
               +- ^(1) NativeFileScan orc default.aj[country#40,event#41] Batched: true, DataFilters: [isnotnull(event#41.event_info), (event#41.event_info[action] = 13)], Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/data1/liyang/cppproject/spark/spark-3.3.2-bin-hadoop3/spark-ware..., PartitionFilters: [], PushedFilters: [IsNotNull(event.event_info)], ReadSchema: struct<country:string,event:struct<event_info:map<string,string>>>
 |
+----------------------------------------------------+
1 row selected (1.059 seconds)
0: jdbc:hive2://localhost:10000/> ;
0: jdbc:hive2://localhost:10000/> desc aj; 
+-----------+----------------------------------------------------+----------+
| col_name  |                     data_type                      | comment  |
+-----------+----------------------------------------------------+----------+
| country   | string                                             | NULL     |
| event     | struct<time:bigint,lng:bigint,lat:bigint,net:string,log_extra:map<string,string>,event_id:string,event_info:map<string,string>> | NULL     |
+-----------+----------------------------------------------------+----------+
2 rows selected (0.253 seconds)
0: jdbc:hive2://localhost:10000/> 
0: jdbc:hive2://localhost:10000/> 
0: jdbc:hive2://localhost:10000/> 
0: jdbc:hive2://localhost:10000/> explain formatted 
. . . . . . . . . . . . . . . . > SELECT * FROM (
. . . . . . . . . . . . . . . . > SELECT
. . . . . . . . . . . . . . . . > game_name,
. . . . . . . . . . . . . . . . > CASE WHEN
. . . . . . . . . . . . . . . . > event.event_info['tab_type'] IN (5) THEN '1' ELSE '0' END AS entrance
. . . . . . . . . . . . . . . . > FROM aj
. . . . . . . . . . . . . . . . > LATERAL VIEW explode(split(nvl(event.event_info['game_name'],'0'),',')) game_name as game_name
. . . . . . . . . . . . . . . . > WHERE event.event_info['action'] IN (13)
. . . . . . . . . . . . . . . . > ) WHERE game_name = 'xxx';
+----------------------------------------------------+
|                        plan                        |
+----------------------------------------------------+
| == Physical Plan ==
CHNativeColumnarToRow (8)
+- ^ ProjectExecTransformer (6)
   +- ^ FilterExecTransformer (5)
      +- ^ CHGenerateExecTransformer (4)
         +- ^ ProjectExecTransformer (3)
            +- ^ FilterExecTransformer (2)
               +- ^ Scan orc default.aj (1)

(1) Scan orc default.aj
Output [1]: [event#41]
Batched: true
Location: InMemoryFileIndex [file:/data1/liyang/cppproject/spark/spark-3.3.2-bin-hadoop3/spark-warehouse/aj]
PushedFilters: [IsNotNull(event.event_info)]
ReadSchema: struct<event:struct<event_info:map<string,string>>>

(2) FilterExecTransformer
Input [1]: [event#41]
Arguments: (isnotnull(event#41.event_info) AND (event#41.event_info[action] = 13))

(3) ProjectExecTransformer
Output [1]: [event#41.event_info AS _extract_event_info#73]
Input [1]: [event#41]

(4) CHGenerateExecTransformer
Input [1]: [_extract_event_info#73]
Arguments: explode(split(coalesce(_extract_event_info#73[game_name], 0), ,, -1)), [_extract_event_info#73], false, [game_name#70]

(5) FilterExecTransformer
Input [2]: [_extract_event_info#73, game_name#70]
Arguments: (game_name#70 = xxx)

(6) ProjectExecTransformer
Output [2]: [game_name#70, CASE WHEN (_extract_event_info#73[tab_type] = 5) THEN 1 ELSE 0 END AS entrance#63]
Input [2]: [_extract_event_info#73, game_name#70]

(7) WholeStageCodegenTransformer (2)
Input [2]: [game_name#70, entrance#63]
Arguments: false

(8) CHNativeColumnarToRow
Input [2]: [game_name#70, entrance#63]

 |
+----------------------------------------------------+
1 row selected (0.295 seconds)
github-actions[bot] commented 2 weeks ago

Run Gluten Clickhouse CI on x86

github-actions[bot] commented 2 weeks ago

Run Gluten Clickhouse CI on x86

github-actions[bot] commented 2 weeks ago

Run Gluten Clickhouse CI on x86

taiyang-li commented 2 weeks ago

@PHILO-HE I found vanilla spark nested column pruning doesn't work for Project(Filter(Generate)) (refer to https://github.com/apache/incubator-gluten/pull/7869#issuecomment-2464408071). So I added a rule in CH to improve it. I'm curious if Velox needs it?

github-actions[bot] commented 2 weeks ago

Run Gluten Clickhouse CI on x86

github-actions[bot] commented 2 weeks ago

Run Gluten Clickhouse CI on x86

github-actions[bot] commented 2 weeks ago

Run Gluten Clickhouse CI on x86

PHILO-HE commented 2 weeks ago

@taiyang-li, thanks so much for letting me know your this work! I think it should be applicable to Velox backend. Maybe, you can firstly get this pr merged, then try to move the proposed code to common module in another pr to see whether there is any issue reported for velox backend.

BTW, I note you introduced one dedicated config for the proposed optimization rule. Maybe, it's better to have a generic config in Gluten to allow excluding any optimization rule, like Spark's spark.sql.optimizer.excludedRules. If it makes sense to you, it's ok to do this small improvement in a separate pr. Thanks!

taiyang-li commented 2 weeks ago

@PHILO-HE I'm really glad that Velox could also use it! I'll open another pr after this one is merged.