pingcap / tiflash

The analytical engine for TiDB and TiDB Cloud. Try free: https://tidbcloud.com/free-trial
https://docs.pingcap.com/tidb/stable/tiflash-overview
Apache License 2.0
946 stars 409 forks source link

Expand the functionality of Local Runtime Filter #7891

Open yibin87 opened 1 year ago

yibin87 commented 1 year ago

Enhancement

Current Local Runtime Filter design can be found here. However, there is no noticable performance improvement for TPCH 100 benchmark. After doing some quick experiments, we confirmed that we can expand the current local runtime filter to acheive 10%+ performance improvements:

  1. Expand the scope of current local runtime filter to include broadcast hash join that across multiple tasks. It can reduce some exchange cost. For example, A Join (Agg_on_B), A is chosen as build table and broadcast to all TiFlash probe nodes. B first applies two-phase agg operator, and the agg's output is used as probe side. This case doesn't match current local runtime filter pattern, thus won't generate any runtime filters. By expanding the scope, we can push down the filter to TableScan of B, reducing exchange cost introduced by two-phase-agg.

  2. In TiFlash, push runtime filter down to storage layer using late materialization techs. In TiFlash, current runtime filters will take effect as RS operator in storage layer. RS operator uses min max index inside to implement RS-IN filter operation. And the filter effect of RS-IN filter is not good in TPCH cases even if the Real-IN filter has very good filter effect. For example: if one column "pack" contains 1024 integer values, from 1 to 1024, no duplicate values. And the pushed down IN filter contains {3, 10, 1023}, then for RS-IN filter, the total "pack" pass the filter, and no data is filtered; for Real-IN filter, only 3 values can pass the filter.

TiDB & TiPB

TiFlash

Lloyd-Pottiger commented 1 year ago

For example: if one column "pack" contains 1024 integer values, from 1 to 1024, no duplicate values. And the pushed down IN filter contains {3, 10, 1023}, then for RS-IN filter, the total "pack" pass the filter, and no data is filtered; for Real-IN filter, only 3 values can pass the filter.

but even with late materialization, we also need to read this pack, which means can not help reduce IO.

yibin87 commented 1 year ago

For example: if one column "pack" contains 1024 integer values, from 1 to 1024, no duplicate values. And the pushed down IN filter contains {3, 10, 1023}, then for RS-IN filter, the total "pack" pass the filter, and no data is filtered; for Real-IN filter, only 3 values can pass the filter.

but even with late materialization, we also need to read this pack, which means can not help reduce IO.

It might help reduce IO of other columns, if none of the filter_column pack pass the filter condition. Besides the major improvement will be the reduction of network communications for exchange.

Lloyd-Pottiger commented 1 year ago

For example: if one column "pack" contains 1024 integer values, from 1 to 1024, no duplicate values. And the pushed down IN filter contains {3, 10, 1023}, then for RS-IN filter, the total "pack" pass the filter, and no data is filtered; for Real-IN filter, only 3 values can pass the filter.

but even with late materialization, we also need to read this pack, which means can not help reduce IO.

It might help reduce IO of other columns, if none of the filter_column pack pass the filter condition. Besides the major improvement will be the reduction of network communications for exchange.

make sense, then why we need apply_late_materialization ? Why not just always apply?

yibin87 commented 1 year ago

For example: if one column "pack" contains 1024 integer values, from 1 to 1024, no duplicate values. And the pushed down IN filter contains {3, 10, 1023}, then for RS-IN filter, the total "pack" pass the filter, and no data is filtered; for Real-IN filter, only 3 values can pass the filter.

but even with late materialization, we also need to read this pack, which means can not help reduce IO.

It might help reduce IO of other columns, if none of the filter_column pack pass the filter condition. Besides the major improvement will be the reduction of network communications for exchange.

make sense, then why we need apply_late_materialization ? Why not just always apply?

For TPCH100, since the major improvement is achieved by reducing network communications for exchange, identified it with 'apply_late_materialization'. If no exchange in probe side, there seems little performance improvement.

Lloyd-Pottiger commented 1 year ago

For TPCH100, since the major improvement is achieved by reducing network communications for exchange, identified it with 'apply_late_materialization'. If no exchange in probe side, there seems little performance improvement.

If there is a filter like regex_like on tablescan, it can help reduce computing regex_like?

yibin87 commented 1 year ago

Not quite get your point. Using late materialization will improve performance in two ways:

  1. Reduce disk IO
  2. Reduce exchange overhead, since filter is pushed from join build node to table scan node, acrossing exchange nodes. And its effect is equal to add a new normal filter in compute layer before exchange node.

For TPCH100, the major improvement is introduced by the second one, and might introduce performance degration when no exhcange cost reduced. Thus introduce "apply_late_materialization" flag to indicate whether exchange cost can be reduced.

Lloyd-Pottiger commented 1 year ago
mysql> explain analyze select * from a join b on a.id = b.id where regexp_like(a.p, '.*') and  regexp_like(b.p, '.*');
+--------------------------------------+----------+---------+--------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------+-----------+------+
| id                                   | estRows  | actRows | task         | access object | execution info                                                                                                                                                                                                                                                                                                 | operator info                                            | memory    | disk |
+--------------------------------------+----------+---------+--------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------+-----------+------+
| TableReader_37                       | 9990.00  | 0       | root         |               | time:15.1ms, loops:1, RU:0.000000, cop_task: {num: 1, max: 0s, proc_keys: 0, copr_cache_hit_ratio: 0.00}                                                                                                                                                                                                       | MppVersion: 2, data:ExchangeSender_36                    | 595 Bytes | N/A  |
| └─ExchangeSender_36                  | 9990.00  | 0       | mpp[tiflash] |               | tiflash_task:{time:13ms, loops:0, threads:36}                                                                                                                                                                                                                                                                  | ExchangeType: PassThrough                                | N/A       | N/A  |
|   └─HashJoin_35                      | 9990.00  | 0       | mpp[tiflash] |               | tiflash_task:{time:12ms, loops:0, threads:36}                                                                                                                                                                                                                                                                  | inner join, equal:[eq(test.a.id, test.b.id)]             | N/A       | N/A  |
|     ├─ExchangeReceiver_16(Build)     | 7992.00  | 0       | mpp[tiflash] |               | tiflash_task:{time:10ms, loops:0, threads:36}                                                                                                                                                                                                                                                                  |                                                          | N/A       | N/A  |
|     │ └─ExchangeSender_15            | 7992.00  | 0       | mpp[tiflash] |               | tiflash_task:{time:7.56ms, loops:0, threads:36}                                                                                                                                                                                                                                                                | ExchangeType: Broadcast, Compression: FAST               | N/A       | N/A  |
|     │   └─Selection_14               | 7992.00  | 0       | mpp[tiflash] |               | tiflash_task:{time:6.56ms, loops:0, threads:36}                                                                                                                                                                                                                                                                | not(isnull(test.a.id)), regexp_like(test.a.p, ".*")      | N/A       | N/A  |
|     │     └─TableFullScan_13         | 10000.00 | 0       | mpp[tiflash] | table:a       | tiflash_task:{time:6.56ms, loops:0, threads:36}, tiflash_scan:{dtfile:{total_scanned_packs:0, total_skipped_packs:0, total_scanned_rows:0, total_skipped_rows:0, total_rs_index_load_time: 0ms, total_read_time: 0ms}, total_create_snapshot_time: 0ms, total_local_region_num: 1, total_remote_region_num: 0} | pushed down filter:empty, keep order:false, stats:pseudo | N/A       | N/A  |
|     └─Selection_18(Probe)            | 7992.00  | 0       | mpp[tiflash] |               | tiflash_task:{time:10ms, loops:0, threads:36}                                                                                                                                                                                                                                                                  | not(isnull(test.b.id)), regexp_like(test.b.p, ".*")      | N/A       | N/A  |
|       └─TableFullScan_17             | 10000.00 | 0       | mpp[tiflash] | table:b       | tiflash_task:{time:10ms, loops:0, threads:36}, tiflash_scan:{dtfile:{total_scanned_packs:0, total_skipped_packs:0, total_scanned_rows:0, total_skipped_rows:0, total_rs_index_load_time: 0ms, total_read_time: 0ms}, total_create_snapshot_time: 0ms, total_local_region_num: 1, total_remote_region_num: 0}   | pushed down filter:empty, keep order:false, stats:pseudo | N/A       | N/A  |
+--------------------------------------+----------+---------+--------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------+-----------+------+
9 rows in set (0.02 sec)

late materialization may help reduce the cost of Selection_18 due to less rows needs to compute regexp_like.

yibin87 commented 1 year ago

I see, you're right, it acts more like adding a filter above TableScan operator.

Lloyd-Pottiger commented 1 year ago

Yes, and since runtime filter usually can filter out many rows, and it is light weight in, so maybe always apply late materialization is acceptable.

yibin87 commented 1 year ago

Yes, and since runtime filter usually can filter out many rows, and it is light weight in, so maybe always apply late materialization is acceptable.

If there are no heavy operators between table scan and join, late materialization seems degrade performance a little. And we can simply treat all exchange node as heavy operator, and we can take "heavy filter function" into consideration later if we have bandwith:).