lqshow / notes

Cheat Sheet
10 stars 2 forks source link

Window Functions in Spark SQL #24

Open lqshow opened 6 years ago

lqshow commented 6 years ago

传统聚合函数

将某列多行中的值按照聚合规则合并为一行,通常来说聚合后的行数都要小于聚合前的行数,即每组中只有一个结果。

窗口函数

输出行数等于输入行数

窗口函数使用OVER函数实现,OVER函数分带参和不带参两种

OVER函数由三部分组成:分区, 排序和框架

基本语法

Function() OVER ([PARTITION BY <column1, column2..>] [ORDER BY <column3..>] [window_clause])

Functions

Aggregate Functions desc
COUNT
SUM
MIN
MAX
AVG
Ranking Functions DataFrame API desc
ROW_NUMBER() 没有重复值的排序。
1.实现分页。
2.对数据进行分组并取每个分组中的TopN数据
RANK() 生成数据项在分组中的排名,跳跃排序。
两个第二名下来就是第四名, 排名相等会在名次中留下空位
DENSE_RANK() 生成数据项在分组中的排名,连续排序。
两个第二名仍然跟着第三名,排名相等不会留下空位
PERCENT_RANK() 分组内当前行(RANK值-1) / (分组内总行数-1)
NTILE(n) 将分组数据按照顺序切分成n片,返回当前切片值
# [partition by col1]可选,即不进行分组

ROW_NUMBER() OVER([PARTITION BY col1] ORDER BY col2)
RANK() OVER([PARTITION BY col1] ORDER BY col2) 
DENSE_RANK() OVER([PARTITION BY col1] ORDER BY col2) 

排序函数(降序)默认空值或NULL值排在最后

 # NULLS LAST
 ORDER BY salary DESC NULLS LAST

排序函数(升序)默认空值或NULL值排在最前

# NULLS FIRST
ORDER BY salary ASC NULLS FIRST

Ranking functions example


Dataset<Row> sqlDF = spark
.sql(
"SELECT " +
"depname, " +
"salary, " +
"rank() OVER (PARTITION BY depname ORDER BY salary DESC) as rank, " +
"dense_rank() OVER (PARTITION BY depname ORDER BY salary DESC) as dense_rank, " +
"percent_rank() OVER (PARTITION BY depname ORDER BY salary DESC) as percent_rank, " +
"row_number() OVER (PARTITION BY depname ORDER BY salary DESC) as rowNo " +
"FROM empsalary"
);
sqlDF.show();

/**

Partition By

Partition By子句称为查询分区子句(用于给结果集进行分区),类似于Group By,都是将数据按照边界值分组。而Over之前的函数在每一个分组之内进行,如果超出了分组,则函数会重新计算。

使用一个或者多个原始数据类型的列。

Order By

会让输入的数据强制排序

默认为升序

Frame clause

窗口从句规范

(ROW | RANGE) BETWEEN (UNBOUNDED | [num]) PRECEDING AND ([num] PRECEDING | CURRENT ROW | (UNBOUNDED | [num]) FOLLOWING)
(ROW | RANGE) BETWEEN CURRENT ROW AND (CURRENT ROW | (UNBOUNDED | [num]) FOLLOWING)
(ROW | RANGE) BETWEEN [num] PRECEDING AND (UNBOUNDED | [num]) FOLLOWING
clause desc
PRECEDING 往前
FOLLOWING 往后
CURRENT ROW 当前行
UNBOUNDED 无界限(起点或终点)
UNBOUNDED PRECEDING 表示从前面的起点
UNBOUNDED FOLLOWING 表示到后面的终点

ROWS与RANGE之间的区别

key Desc
ROWS 定义当前行的固定前后记录,行取决于窗口的ORDER BY从句(在物理层面定义窗口有多少行)
RANGE 行取决于窗口的ORDER BY的重复行(在逻辑层面定义窗口由多少行)

以sum为例子

Dataset<Row> sqlDF = spark
        .sql(
                "SELECT " +
                        "depname, " +
                        "salary, " +
                        // 默认从起点到当前所有重复行
                        "sum(salary) OVER (PARTITION BY depname ORDER BY salary ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) salary_1, " +
                        // 默认从起点到当前行
                        "sum(salary) OVER (PARTITION BY depname ORDER BY salary ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) salary_2," +
                        // 不指定ORDER BY,则将分组内所有值累加: sum(salary) OVER (PARTITION BY depname ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
                        "sum(salary) OVER (PARTITION BY depname)  as salary_3 " +
                        "FROM empsalary"
        );
sqlDF.show();

output

+---------+------+--------+--------+--------+
|  depname|salary|salary_1|salary_2|salary_3|
+---------+------+--------+--------+--------+
|  develop|  4200|    4200|    4200|   25100|
|  develop|  4500|    8700|    8700|   25100|
|  develop|  5200|   19100|   13900|   25100|
|  develop|  5200|   19100|   19100|   25100|
|  develop|  6000|   25100|   25100|   25100|
|    sales|  4800|    9600|    4800|   14600|
|    sales|  4800|    9600|    9600|   14600|
|    sales|  5000|   14600|   14600|   14600|
|personnel|  3500|    3500|    3500|    7400|
|personnel|  3900|    7400|    7400|    7400|
+---------+------+--------+--------+--------+

Over函数不带任何参数,默认补全窗口规范

SELECT depname, salary, avg(salary) OVER () FROM empsalary

# 窗口规范默认如下
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING

Analytic/Aggregate function ORDER BY后缺少窗口从句条件,默认补全窗口规范

SELECT depname, salary, AVG(salary) OVER (order by salary) FROM empsalary

# 窗口规范默认如下
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW

Rank functions ORDER BY后缺少窗口从句条件

ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW

过滤提取

需使用子查询的方式完成目标数据的过滤.

Dataset<Row> sqlDF = spark.sql("select year, occupation, rowNo " +
                "from (" +
                "select year, occupation, show, group, raw_guest, " +
                "row_number() over (order by 1) rowNo FROM daily_show_guests" +
                ") " +
                "where rowNo <= 5");

sqlDF.show();

References