zhouqingqing / qpmodel

A Relational Optimizer and Executor
MIT License
64 stars 18 forks source link

Add exploration rule for global aggregate split #212

Closed arzuschen closed 3 years ago

arzuschen commented 3 years ago

Based on the assumption that aggregation cannot increase the cardinality, or that the cardinality is less or the same as before the aggregation. Therefore, it can be assumed that it is likely to be better to split the aggregate into a pair of global agg and local agg. The default of all aggregation is global aggregation, which only accept serialized data stream. A exploration transformation rule is introduced to account for this situation. For a global aggregation that cannot be directly turned into local aggregation, it will be turned into two aggregation, a global on top of a local. The new global logic node will be placed in the same group as the original global aggregation, with its child, the new local logic node placed in a new group, and its child is the same child group as the original global aggregation. For global aggregation, since it should be placed in the same group, the signature is forced to be the same as the original aggregate. Meanwhile, the newly created local agg node will have different signature and thus will be in its own memo group. (#143)

The aggregation functions are transformed as listed below: (e represents some expression) sum(e) = sum(sum(e)) min(e) = min(min(e)) max(e) = max(max(e)) count(e) = sum(count(e)) count() = sum(count() avg(e) = sum(sum(e)) / sum(count(e))

For groupby attribute, the local and global should use the same groupby as the original aggregate. For having constrain, it needs to be separated, the expression involving aggregate function will be placed on the global aggregate node, while the others are place on the local aggregate node. For example, having count(a1) > 1 can only be determined after the whole aggregate process is complete, thus must be on the global node. Meanwhile, having a1 > 1 is just a local constraint that can be acted on rows, so it can be placed just on local node.

Another critical fix is that previously the aggrcore in PhysicHashAgg.Exec() is shared among threads (different copy of the phyiscal node). Now, new copy of logicaggrFns is created for each thread. List aggrcore = new List(); logic.aggrFns_.ForEach(x => aggrcore.Add(x.Clone() as AggFunc)); (line 1161 of PhysicNode.cs)

TODOs: Stddev is not implemented in this patch, therefore query like select a3/2*2, sum(a3), count(a3), stddev_samp(a3) from a group by 1; cannot be successfully executed. Also, select d1, sum(d2) from (select c1/2, sum(c1) from (select b1, count(*) as a1 from b group by b1)c(c1, c2) group by c1/2) d(d1, d2) group by d1; (both queries mentioned above are from TestAggregation unit test)

zhouqingqing commented 3 years ago

@pkommoju Please take a look at ResolveOrdinal related part. This change splits aggregation into one part ("gloal") on top of gather andanother part ("local") underneath gather exchange. For example, avg(a) => global = sum(bs)/sum(bc) and local = sum(a) as bs, count(a) as bc.

Since ordinal resolution is top down, so the top node will request "avg(a)" from its child, but now the child's expression becomes sum/sum, so expr matching won't work and we use Epxr._ id field to bind the connection.