Closed ninjapapa closed 9 years ago
Also need a method to output the Srdd before the groupBy
step.
As equivalent to
srdd.smvSingleCDSGroupBy('k)(TimeInLastN('t, 3))((Sum('v) as 'nv1), (Count('v) as 'nv2))
Client code could also be
srdd.smvApplyCDS('k)(TimeInLastN('t, 3)).groupBy('k, 't)(
Sum('v) as 'nv,
Count('v) as 'nv2)
The benefit of doing so is that we can chain multiple CDS' together as the following
srdd.
smvApplyCDS('k)(TimeInLastN('t, 3)).
smvApplyCDS('k, 't)(PivotCDS(Seq('p), Seq('v), Seq("pv1", "pv2"))).
groupBy('k, 't)(
Sum('v_pv1) as 'v_pv1,
CountDistinct('v_pv2) as 'dist_cnt_v_pv2)
where PivotCDS
is a CDS object for pivot operation. Need to be implemented as a separated issue.
Implemented a full new design of CDS.
Requirement
As generalizing the idea of running sum and similar requirements, we identified the need of CDS. Using the running sum as an example, the client code looks like
where
smvSingleCDSGroupBy
declared asThe
aggrExprs
is a list ofAggregationExpressions
with alias (as long as groupBy supports)Also will implement custom Catalyst expressions to integrate
SmvCDS
into expressions. The client code looks likeImplementation
From pure relational algebra angle, a CDS defines
Since CDS always used in a grouped (by a set of keys) environment, the self-join could be optimized as local operation. There are 2 approaches to optimize the self-join:
groupByKey
to force data partition and create local iterator to represent the data, and implement self-join on Scala collection operationsSince it is possible that SparkSql team will implement the join optimization in the future, we will implement the first approach in the short term and migrate to the 2nd approach in the future.