Open rterror opened 1 year ago
在生产中,有很多先连接(Join)后聚合(Aggregate)的查询。这类查询中的Join可能会让数据膨胀的非常大,而Aggregate会明显地减少数据。数据膨胀导致如下两个问题
由于此类SQL的Aggregate会明显地减少数据,考虑将Aggregate提前,利用提前聚合来减少中间产生的数据,从而解决前两个问题。
解决思路主要就是拆分aggregate算子, 如下
假设有如下的执行计划
aggregate | exchange | SMJ / \ exchange exchange | | sort sort | | scan t1 scan t2
将aggregate算子拆分成 partial aggregate 和final aggregate后得到如下的执行计划
final aggregate | exchange | partial aggregate | SMJ / \ exchange exchange | | sort sort | | scan t1 scan t2
以下是一个 count(*) 的例子
在拆分aggregate算子的基础之上,又引入了几条优化规则
push partial aggregate through join规则支持的聚合函数如下
push partial aggregate through join
push partial aggregate through join规则不支持的场景包括:不带group by的count相关的聚合
这几条规则的主要目的是:尝试做算子下推,提前减少数据规模。
如果partial aggregate 聚合效果不明显的话,就跳过它。比如:前 R1 行数据经过partial aggregate 后得到R2,R2/R1 > 0.95,就跳过partial aggregate。
这个是通过前R1行数据估计partial aggregate 聚合效果,如果不理想的话后续就放弃partial aggregate,从而避免浪费计算资源。
1.王玉明-Spark SQL Performance Improvement Push down Partial aggregate through Join jira:Issue Navigator - ASF JIRA
目标问题
在生产中,有很多先连接(Join)后聚合(Aggregate)的查询。这类查询中的Join可能会让数据膨胀的非常大,而Aggregate会明显地减少数据。数据膨胀导致如下两个问题
由于此类SQL的Aggregate会明显地减少数据,考虑将Aggregate提前,利用提前聚合来减少中间产生的数据,从而解决前两个问题。
解决方法
算子拆分
解决思路主要就是拆分aggregate算子, 如下
假设有如下的执行计划
将aggregate算子拆分成 partial aggregate 和final aggregate后得到如下的执行计划
以下是一个 count(*) 的例子
算子下推及相关优化
在拆分aggregate算子的基础之上,又引入了几条优化规则
push partial aggregate through join
规则支持的聚合函数如下push partial aggregate through join
规则不支持的场景包括:不带group by的count相关的聚合这几条规则的主要目的是:尝试做算子下推,提前减少数据规模。
自适应执行
如果partial aggregate 聚合效果不明显的话,就跳过它。比如:前 R1 行数据经过partial aggregate 后得到R2,R2/R1 > 0.95,就跳过partial aggregate。
这个是通过前R1行数据估计partial aggregate 聚合效果,如果不理想的话后续就放弃partial aggregate,从而避免浪费计算资源。
实验效果表明
参考资料
1.王玉明-Spark SQL Performance Improvement Push down Partial aggregate through Join jira:Issue Navigator - ASF JIRA