在 Apache Spark 中,SELECT 语句用于从一个或多个表中检索数据。
假设我们有一个名为 employees 的表,其结构如下:
CREATE TABLE employees ( id INT, name STRING, salary FLOAT, department STRING )
该表包含以下数据:
id | name | salary | department |
---|---|---|---|
1 | Alice | 70000 | HR |
2 | Bob | 80000 | IT |
3 | Carol | 75000 | HR |
4 | Dave | 85000 | IT |
SELECT * FROM employees;
id | name | salary | department |
---|---|---|---|
1 | Alice | 70000 | HR |
2 | Bob | 80000 | IT |
3 | Carol | 75000 | HR |
4 | Dave | 85000 | IT |
SELECT name, salary FROM employees;
name | salary |
---|---|
Alice | 70000 |
Bob | 80000 |
Carol | 75000 |
Dave | 85000 |
SELECT * FROM employees WHERE department = 'IT';
id | name | salary | department |
---|---|---|---|
2 | Bob | 80000 | IT |
4 | Dave | 85000 | IT |
SELECT * FROM employees ORDER BY salary DESC;
id | name | salary | department |
---|---|---|---|
4 | Dave | 85000 | IT |
2 | Bob | 80000 | IT |
3 | Carol | 75000 | HR |
1 | Alice | 70000 | HR |
SELECT * FROM employees ORDER BY salary DESC LIMIT 2;
id | name | salary | department |
---|---|---|---|
4 | Dave | 85000 | IT |
2 | Bob | 80000 | IT |
在 PySpark 中,你可以使用 DataFrame API 来执行类似的查询:
df = spark.sql("SELECT * FROM employees WHERE salary > 75000") df.show()
id | name | salary | department |
---|---|---|---|
4 | Dave | 85000 | IT |
在 Spark SQL 中,JOIN 子句用于结合来自两个或多个表的数据。根据数据之间的关系,有几种不同类型的 JOIN:
INNER JOIN:只返回两个表中匹配连接条件的行。
LEFT OUTER JOIN 或 LEFT JOIN:返回左表的所有行,即使右表中没有匹配的行。右表中不匹配的行在结果中为 NULL。
RIGHT OUTER JOIN 或 RIGHT JOIN:与 LEFT JOIN 相反,返回右表的所有行,即使左表中没有匹配的行。
FULL OUTER JOIN 或 FULL JOIN:返回左表和右表中的所有行。如果某一侧没有匹配,那么该侧的结果为 NULL。
CROSS JOIN:返回两个表的笛卡尔积,每个左表的行都会与右表的每行组合。
假设我们有两个表:employees 和 departments。
employees 表:
CREATE TABLE employees (employee_id INT, name STRING, department_id INT);
其内容为
employee_id | name | department_id |
---|---|---|
1 | Alice | 2 |
2 | Bob | 3 |
3 | Charlie | 2 |
4 | David | 4 |
departments 表:
CREATE TABLE departments (department_id INT, department_name STRING);
其内容为
department_id | department_name |
---|---|
1 | Accounting |
2 | IT |
3 | HR |
SELECT e.name, d.department_name FROM employees e [INNER] JOIN departments d ON e.department_id = d.department_id;
name | department_name |
---|---|
Alice | IT |
Bob | HR |
Charlie | IT |
SELECT e.name, d.department_name FROM employees e LEFT JOIN departments d ON e.department_id = d.department_id;
name | department_name |
---|---|
Alice | IT |
Bob | HR |
Charlie | IT |
David | NULL |
注意
由于 employees 表中没有 department_id 为 4 的记录,因此 LEFT JOIN 返回的 David 的部门为 NULL。
SELECT e.name, d.department_name FROM employees e RIGHT JOIN departments d ON e.department_id = d.department_id;
name | department_name |
---|---|
NULL | Accounting |
Alice | IT |
Bob | HR |
Charlie | IT |
RIGHT JOIN 的行为与 LEFT JOIN 类似,但是它返回右表(departments)的所有行。由于 employees 表中没有与 Accounting 部门对应的记录,因此 name 为 (null)。
SELECT e.name, d.department_name FROM employees e FULL OUTER JOIN departments d ON e.department_id = d.department_id;
name | department_name |
---|---|
NULL | Accounting |
Alice | IT |
Bob | HR |
Charlie | IT |
David | NULL |
FULL OUTER JOIN 返回左表和右表中的所有行。匹配不上的行的缺失字段置为 NULL。
SELECT e.name, d.department_name FROM employees e CROSS JOIN departments d;
名字 | 职位 |
---|---|
Alice | Accounting |
Alice | IT |
Alice | HR |
Bob | Accounting |
Bob | IT |
Bob | HR |
Charlie | Accounting |
Charlie | IT |
Charlie | HR |
David | Accounting |
David | IT |
David | HR |
CROSS JOIN 会将 employees 表中的每行与 departments 表中的每行组合,形成笛卡尔积。因此,结果集的行数是两个表行数的乘积。
在 Spark SQL 中,SORT BY
子句用于在用户指定的顺序中返回每个分区内排序的结果行。当存在多个分区时,SORT BY
会返回每个分区内的排序结果,然后按照分区再排序。这与 ORDER BY
子句不同,后者保证了输出的总体顺序。
假设我们有一个名为 person 的表,表结构:
CREATE TABLE person (zip_code INT, name STRING, age INT);
其中包含数据:
name | age | zip_code |
---|---|---|
Anil K | 27 | 94588 |
Dan Li | 18 | 94588 |
John V | null | 94588 |
Zen Hui | 50 | 94588 |
Aryan B. | 18 | 94511 |
David K | 42 | 94511 |
Lalit B. | null | 94511 |
现在,我们根据 name 进行排序,排序的时候讲所有的人按照 zip_code 分区。
[ SORT BY { expression [ ASC | DESC ] [ NULLS { FIRST | LAST } ] [ , ... ] } ]
SELECT /*+ REPARTITION(zip_code) */ name, age, zip_code FROM person SORT BY name;
name | age | zip_code |
---|---|---|
Anil K | 27 | 94588 |
Dan Li | 18 | 94588 |
John V | null | 94588 |
Zen Hui | 50 | 94588 |
Aryan B. | 18 | 94511 |
David K | 42 | 94511 |
Lalit B. | null | 94511 |
ORDER BY
子句用于按照用户指定的顺序对结果行进行排序返回。与SORT BY
子句不同,该子句保证了输出中的总顺序。
假设我们有一个名为 person 的表,它包含每个人的 id、name 和 age。表的结构如下:
CREATE TABLE person (id INT, name STRING, age INT);
表中的数据如下:
id | name | age |
---|---|---|
100 | John | 30 |
200 | Mary | NULL |
300 | Mike | 80 |
400 | Jerry | NULL |
500 | Dan | 50 |
我们想要选择所有员工的信息,并按照年龄从低到高进行排序。对于 null,我们让它处于最后。
[ ORDER BY { expression [ ASC | DESC ] [ NULLS { FIRST | LAST } ] [ , ... ] } ]
SELECT name, age FROM person ORDER BY age NULLS LAST;
name | age |
---|---|
John | 30 |
Dan | 50 |
Mike | 80 |
Mary | null |
Jerry | null |
GROUP BY 子句在 SQL 中用于将查询结果按照一个或多个列的值分组,并对每个分组应用聚合函数(如 SUM, AVG, MAX, MIN, COUNT 等)进行计算。在 Apache Spark SQL 中,GROUP BY 的使用与标准 SQL 一致。
假设我们有一个名为 dealer 的表,其中包含了经销商的 id、城市、车型、销售量数据。
表结构:
CREATE TABLE dealer (id INT, city STRING, car_model STRING, quantity INT);
其中包含数据:
id | city | car_model | quantiry |
---|---|---|---|
100 | Fremont | Honda Civic | 10 |
100 | Fremont | Honda Accord | 15 |
100 | Fremont | HondaCRV | 7 |
200 | Dublin | Honda Civic | 20 |
200 | Dublin | Honda Accord | 10 |
200 | Dublin | HondaCRV | 3 |
300 | San Jose | Honda Civic | 5 |
300 | San Jose | Honda Accord | 8 |
现在,我们想要计算一下每个经销商各种车型的销售总额,并按照经销商的 id 排序:
SELECT id, sum(quantity) FROM dealer GROUP BY id ORDER BY id;
id | sum(quantity) |
---|---|
100 | 32 |
200 | 33 |
300 | 13 |
然后,我们想计算一下每个经销商仅对于 Honda Civic 和 Honda CRV 两种车型的销售额:
SELECT id, sum(quantity) FILTER (WHERE car_model IN ('Honda Civic', 'Honda CRV')) AS `sum(quantity)` FROM dealerGROUP BY id ORDER BY id;
id | sum(quantity) |
---|---|
100 | 17 |
200 | 23 |
300 | 5 |
DISTRIBUTE BY
子句用于根据输入表达式对数据进行重分区。与 CLUSTER BY
子句不同,它不会对每个分区内的数据进行排序。
假设我们有一个名为 person 的表,它记录了每个人的姓名和年龄:
CREATE TABLE person (name STRING, age INT);
表中的数据如下:
姓名 | 年龄 |
---|---|
Zen Hui | 25 |
Anil B | 18 |
Shone S | 16 |
Mike A | 25 |
John A | 18 |
Jack N | 16 |
现在我们设置它的分区数为 2:
SET spark.sql.shuffle.partitions = 2;
我们先执行一下 SELECT,能够看到它是没有任何排序的:
SELECT age, name FROM person;
age | name |
---|---|
16 | Shone S |
25 | Zen Hui |
16 | Jack N |
25 | Mike A |
18 | John A |
18 | Anil B |
然后我们根据 age 做一下 distribytedby。根据 distribytedby 的定义,它会将数据按照分区数进行重新分区,但是在分区内不会根据 age 进行排序。
[ DISTRIBUTE BY { expression [, ... ] } ]
SELECT age, name FROM person DISTRIBUTED BY age;
age | name |
---|---|
25 | Zen Hui |
25 | Mike A |
18 | John A |
18 | Anil B |
16 | Shone S |
16 | Jack N |
在这个结果中,由于我们设置了分区数为 2,age 为 25 和 18 的人会被分为一组,age 为 16 的人会分为一组。但是在第一个分区内并没有按照 age 进行排序。
使用CLUSTER BY
子句首先根据输入表达式对数据进行重新分区,然后对每个分区内的数据进行排序。这在语义上相当于先执行DISTRIBUT BY
,然后执行SORT BY
。该子句仅确保结果行在每个分区内排序,并不保证输出的总顺序。
假设我们有一个名为 person 的表,它记录了每个人的姓名和年龄:
CREATE TABLE person (name STRING, age INT);
表中的数据如下:
姓名 | 年龄 |
---|---|
Zen Hui | 25 |
Anil B | 18 |
Shone S | 16 |
Mike A | 25 |
John A | 18 |
Jack N | 16 |
现在我们设置它的分区数为 2:
SET spark.sql.shuffle.partitions = 2;
我们先执行一下 SELECT,能够看到它是没有任何排序的:
SELECT age, name FROM person;
age | name |
---|---|
16 | Shone S |
25 | Zen Hui |
16 | Jack N |
25 | Mike A |
18 | John A |
18 | Anil B |
然后我们根据 age 做一下 clusterby。根据 clusterby 的定义,它会将数据按照分区数进行重新分区,然后分区内排序。
[ CLUSTER BY { expression [ , ... ] } ]
SELECT age, name FROM person CLUSTER BY age;
age | name |
---|---|
18 | John A |
18 | Anil B |
25 | Zen Hui |
25 | Mike A |
16 | Shone S |
16 | Jack N |
由于我们设置的分区数为2,age 为 18 和 25 的人会被分到一个组,age 为 16 的人会被分到另外一个组。
窗口函数对一组行(称为窗口)进行操作,并基于这组行计算每行的返回值。窗口函数在处理任务中非常有用,例如计算移动平均值、计算累积统计数据或根据当前行的相对位置访问行的值。
窗口函数本身比较复杂,其包含三个主要部分:
Rank 函数:用于排序,又分为几个子类
RNAK:为每个窗口内的行分配一个唯一的序号。如果存在相同的值,则会跳过序号。例如,如果有两行并列第一,则下一个序号是3。
DENSE_RANK:与 RANK()
类似,但它不会跳过序号。如果有并列的排名,那么接下来的序号会连续。
PERCENT_RANK:计算每个行在一个窗口内的相对排名,返回一个介于0和1之间的值。该值表示行的排名除以窗口中总行数,再乘以100。
NTILE:将窗口内的行分成 N 组,每组大约有相同数量的行。这个函数返回一个介于 1 和 N 之间的整数,表示行所属的组。
ROW_NUMBER:为每个窗口内的行分配一个连续的序号,从1开始。
Analytic 函数
CUME_DIST:计算一个值在窗口中所有行的分布位置。它返回一个介于0和1之间的值,表示当前行的排名相对于窗口中所有行的排名的累积分布。简单来说,如果有许多行具有相同的值,CUME_DIST()
会将这些行视为一个组,并计算这个组在整个分布中的相对位置。
LAG:用于访问窗口中当前行前面的行中的数据。你可以指定你想要检索的行数,以及如果该行不存在时返回的默认值。
LEAD:与 LAG()
函数相反,它用于访问窗口中当前行后面的行中的数据。同样,你可以指定你想要检索的行数和默认值。
Aggregate 函数
MAX:取最大值
MIN:取最小值
COUNT:计算数量
SUM:求和
AVG:求平均
...
除了窗口的计算部分,窗口还有大小的定义:
RANGE:定义一个区间范围
ROWS:定义包含哪些行
注意
所有的这些窗口函数都需要一个 OVER
子句,用来定义窗口的规则。
OVER
子句可以包含 PARTITION BY
和 ORDER BY
。PARTITION BY
类似于 GROUP BY
,表示按照哪些列对数据进行分区。ORDER BY
定义了行在窗口中的排序规则。
假设我们有一个名为 employee 的表,其中包含了员工的 name、dept、salary 和 age。
表结构:
CREATE TABLE employees (name STRING, dept STRING, salary INT, age INT);
其中包含数据:
name | dept | salary | age |
---|---|---|---|
Chloe | Engineering | 23000 | 25 |
Fred | Engineering | 21000 | 28 |
Paul | Engineering | 29000 | 23 |
Helen | Marketing | 29000 | 40 |
Tom | Engineering | 23000 | 33 |
Jane | Marketing | 29000 | 28 |
Jeff | Marketing | 35000 | 38 |
Evan | Sales | 32000 | 38 |
Lisa | Sales | 10000 | 35 |
Alex | Sales | 30000 | 33 |
现在,让我们计算一下他们分部门的工资排序:
SELECT name, dept, RANK() OVER (PARTITION BY dept ORDER BY salary) AS rank FROM employees;
name | dept | salary | rank |
---|---|---|---|
Lisa | sales | 10000 | 1 |
Alex | sales | 30000 | 2 |
Evan | sales | 32000 | 3 |
Fred | Engineering | 21000 | 1 |
Tom | Engineering | 23000 | 2 |
Chloe | Engineering | 23000 | 2 |
Paul | Engineering | 29000 | 4 |
Helen | Marketing | 29000 | 1 |
Jane | Marketing | 29000 | 1 |
Jeff | Marketing | 35000 | 3 |
现在我们分部门按照工资进行排序,并且分配给排序一个序号。这个序号采用 DENSE_RANK
的形式,也就是如果有相同的,那么分配同一个号码,下一个号码继续上一个号码。
SELECT name, dept, DENSE_RANK() OVER (PARTITION BY dept ORDER BY salary) AS dense_rank FROM employees;
name | dept | salary | dense_rank |
---|---|---|---|
Lisa | Sales | 10000 | 1 |
Alex | Sales | 30000 | 2 |
Evan | Sales | 32000 | 3 |
Fred | Engineering | 21000 | 1 |
Tom | Engineering | 23000 | 2 |
Chloe | Engineering | 23000 | 2 |
Paul | Engineering | 29000 | 3 |
Helen | Marketing | 29000 | 1 |
Jane | Marketing | 29000 | 1 |
Jeff | Marketing | 35000 | 2 |