You need to enable JavaScript to run this app.
导航
SELECT 语句
最近更新时间:2024.05.14 16:21:43首次发布时间:2024.05.14 16:21:43

在 Apache Spark 中,SELECT 语句用于从一个或多个表中检索数据。

1 普通查询

假设我们有一个名为 employees 的表,其结构如下:

CREATE TABLE employees (
  id INT,
  name STRING,
  salary FLOAT,
  department STRING
)

该表包含以下数据:

idnamesalarydepartment
1Alice70000HR
2Bob80000IT
3Carol75000HR
4Dave85000IT

1.1 查询所有列

  • 执行语句示例
SELECT * FROM employees;
  • 执行结果
idnamesalarydepartment
1Alice70000HR
2Bob80000IT
3Carol75000HR
4Dave85000IT

1.2 查询特定列

  • 执行语句示例
SELECT name, salary FROM employees;
  • 执行结果
namesalary
Alice70000
Bob80000
Carol75000
Dave85000

1.3 带条件的查询

  • 执行语句示例
SELECT * FROM employees WHERE department = 'IT';
  • 执行结果
idnamesalarydepartment
2Bob80000IT
4Dave85000IT

1.4 排序查询结果

  • 执行语句示例
SELECT * FROM employees ORDER BY salary DESC;
  • 执行结果
idnamesalarydepartment
4Dave85000IT
2Bob80000IT
3Carol75000HR
1Alice70000HR

1.5 排序并限制结果数量

  • 执行语句示例
SELECT * FROM employees ORDER BY salary DESC LIMIT 2;
  • 执行结果
idnamesalarydepartment
4Dave85000IT
2Bob80000IT

1.6 使用 Spark SQL 的 DataFrame API

在 PySpark 中,你可以使用 DataFrame API 来执行类似的查询:

  • 执行语句示例
df = spark.sql("SELECT * FROM employees WHERE salary > 75000")
df.show()
  • 执行结果
idnamesalarydepartment
4Dave85000IT

2 Join

在 Spark SQL 中,JOIN 子句用于结合来自两个或多个表的数据。根据数据之间的关系,有几种不同类型的 JOIN:

  1. INNER JOIN:只返回两个表中匹配连接条件的行。

  2. LEFT OUTER JOIN 或 LEFT JOIN:返回左表的所有行,即使右表中没有匹配的行。右表中不匹配的行在结果中为 NULL。

  3. RIGHT OUTER JOIN 或 RIGHT JOIN:与 LEFT JOIN 相反,返回右表的所有行,即使左表中没有匹配的行。

  4. FULL OUTER JOIN 或 FULL JOIN:返回左表和右表中的所有行。如果某一侧没有匹配,那么该侧的结果为 NULL。

  5. CROSS JOIN:返回两个表的笛卡尔积,每个左表的行都会与右表的每行组合。

假设我们有两个表:employees 和 departments。
employees 表:

CREATE TABLE employees (employee_id INT, name STRING, department_id INT);

其内容为

employee_idnamedepartment_id
1Alice2
2Bob3
3Charlie2
4David4

departments 表:

CREATE TABLE departments (department_id INT, department_name STRING);

其内容为

department_iddepartment_name
1Accounting
2IT
3HR

2.1 INNER JOIN

  • 执行语句示例
SELECT e.name, d.department_name 
    FROM employees e 
    [INNER] JOIN departments d ON e.department_id = d.department_id;
  • 执行结果
namedepartment_name
AliceIT
BobHR
CharlieIT

2.2 LEFT JOIN

  • 执行语句示例
SELECT e.name, d.department_name
    FROM employees e
    LEFT JOIN departments d ON e.department_id = d.department_id;
  • 执行结果
namedepartment_name
AliceIT
BobHR
CharlieIT
DavidNULL

注意

由于 employees 表中没有 department_id 为 4 的记录,因此 LEFT JOIN 返回的 David 的部门为 NULL。

2.3 RIGHT JOIN

  • 执行语句示例
SELECT e.name, d.department_name
    FROM employees e
    RIGHT JOIN departments d ON e.department_id = d.department_id;
  • 执行结果
namedepartment_name
NULLAccounting
AliceIT
BobHR
CharlieIT

RIGHT JOIN 的行为与 LEFT JOIN 类似,但是它返回右表(departments)的所有行。由于 employees 表中没有与 Accounting 部门对应的记录,因此 name 为 (null)。

2.4 FULL OUTER JOIN

  • 执行语句示例
SELECT e.name, d.department_name
    FROM employees e
    FULL OUTER JOIN departments d ON e.department_id = d.department_id;
  • 执行结果
namedepartment_name
NULLAccounting
AliceIT
BobHR
CharlieIT
DavidNULL

FULL OUTER JOIN 返回左表和右表中的所有行。匹配不上的行的缺失字段置为 NULL。

2.5 CROSS JOIN

  • 执行语句示例
SELECT e.name, d.department_name
    FROM employees e
    CROSS JOIN departments d;
  • 执行结果
名字职位
AliceAccounting
AliceIT
AliceHR
BobAccounting
BobIT
BobHR
CharlieAccounting
CharlieIT
CharlieHR
DavidAccounting
DavidIT
DavidHR

CROSS JOIN 会将 employees 表中的每行与 departments 表中的每行组合,形成笛卡尔积。因此,结果集的行数是两个表行数的乘积。

3 Sortby 子句

在 Spark SQL 中,SORT BY 子句用于在用户指定的顺序中返回每个分区内排序的结果行。当存在多个分区时,SORT BY 会返回每个分区内的排序结果,然后按照分区再排序。这与 ORDER BY 子句不同,后者保证了输出的总体顺序。
假设我们有一个名为 person 的表,表结构:

CREATE TABLE person (zip_code INT, name STRING, age INT);

其中包含数据:

nameagezip_code
Anil K2794588
Dan Li1894588
John Vnull94588
Zen Hui5094588
Aryan B.1894511
David K4294511
Lalit B.null94511

现在,我们根据 name 进行排序,排序的时候讲所有的人按照 zip_code 分区。

  • 语句
[ SORT BY { expression [ ASC | DESC ] [ NULLS { FIRST | LAST } ] [ , ... ] } ]
  • 执行语句示例
SELECT /*+ REPARTITION(zip_code) */ name, age, zip_code FROM person SORT BY name;
  • 执行结果
nameagezip_code
Anil K2794588
Dan Li1894588
John Vnull94588
Zen Hui5094588
Aryan B.1894511
David K4294511
Lalit B.null94511

4 Orderby 子句

ORDER BY子句用于按照用户指定的顺序对结果行进行排序返回。与SORT BY子句不同,该子句保证了输出中的总顺序。
假设我们有一个名为 person 的表,它包含每个人的 id、name 和 age。表的结构如下:

CREATE TABLE person (id INT, name STRING, age INT);

表中的数据如下:

idnameage
100John30
200MaryNULL
300Mike80
400JerryNULL
500Dan50

我们想要选择所有员工的信息,并按照年龄从低到高进行排序。对于 null,我们让它处于最后。

  • 语句
[ ORDER BY { expression [ ASC | DESC ] [ NULLS { FIRST | LAST } ] [ , ... ] } ]
  • 执行语句示例
SELECT name, age FROM person ORDER BY age NULLS LAST;
  • 执行结果
nameage
John30
Dan50
Mike80
Marynull
Jerrynull

5 Groupby 子句

GROUP BY 子句在 SQL 中用于将查询结果按照一个或多个列的值分组,并对每个分组应用聚合函数(如 SUM, AVG, MAX, MIN, COUNT 等)进行计算。在 Apache Spark SQL 中,GROUP BY 的使用与标准 SQL 一致。
假设我们有一个名为 dealer 的表,其中包含了经销商的 id、城市、车型、销售量数据。
表结构:

CREATE TABLE dealer (id INT, city STRING, car_model STRING, quantity INT);

其中包含数据:

idcitycar_modelquantiry
100FremontHonda Civic10
100FremontHonda Accord15
100FremontHondaCRV7
200DublinHonda Civic20
200DublinHonda Accord10
200DublinHondaCRV3
300San JoseHonda Civic5
300San JoseHonda Accord8

现在,我们想要计算一下每个经销商各种车型的销售总额,并按照经销商的 id 排序:

  • 执行语句示例
SELECT id, sum(quantity) FROM dealer GROUP BY id ORDER BY id;
  • 执行结果
idsum(quantity)
10032
20033
30013

然后,我们想计算一下每个经销商仅对于 Honda Civic 和 Honda CRV 两种车型的销售额:

SELECT id, sum(quantity) FILTER (WHERE car_model IN ('Honda Civic', 'Honda CRV')) AS `sum(quantity)` FROM dealerGROUP BY id ORDER BY id;
  • 执行结果
idsum(quantity)
10017
20023
3005

6 Distributedby 子句

DISTRIBUTE BY 子句用于根据输入表达式对数据进行重分区。与 CLUSTER BY 子句不同,它不会对每个分区内的数据进行排序。
假设我们有一个名为 person 的表,它记录了每个人的姓名和年龄:

CREATE TABLE person (name STRING, age INT);

表中的数据如下:

姓名年龄
Zen Hui25
Anil B18
Shone S16
Mike A25
John A18
Jack N16

现在我们设置它的分区数为 2:

SET spark.sql.shuffle.partitions = 2;

我们先执行一下 SELECT,能够看到它是没有任何排序的:

SELECT age, name FROM person;
agename
16Shone S
25Zen Hui
16Jack N
25Mike A
18John A
18Anil B

然后我们根据 age 做一下 distribytedby。根据 distribytedby 的定义,它会将数据按照分区数进行重新分区,但是在分区内不会根据 age 进行排序。

  • 语句
[ DISTRIBUTE BY { expression [, ... ] } ]
  • 执行语句示例
SELECT age, name FROM person DISTRIBUTED BY age;
  • 执行结果
agename
25Zen Hui
25Mike A
18John A
18Anil B
16Shone S
16Jack N

在这个结果中,由于我们设置了分区数为 2,age 为 25 和 18 的人会被分为一组,age 为 16 的人会分为一组。但是在第一个分区内并没有按照 age 进行排序。

7 Clusterby 子句

使用CLUSTER BY子句首先根据输入表达式对数据进行重新分区,然后对每个分区内的数据进行排序。这在语义上相当于先执行DISTRIBUT BY,然后执行SORT BY。该子句仅确保结果行在每个分区内排序,并不保证输出的总顺序。
假设我们有一个名为 person 的表,它记录了每个人的姓名和年龄:

CREATE TABLE person (name STRING, age INT);

表中的数据如下:

姓名年龄
Zen Hui25
Anil B18
Shone S16
Mike A25
John A18
Jack N16

现在我们设置它的分区数为 2:

SET spark.sql.shuffle.partitions = 2;

我们先执行一下 SELECT,能够看到它是没有任何排序的:

SELECT age, name FROM person;
agename
16Shone S
25Zen Hui
16Jack N
25Mike A
18John A
18Anil B

然后我们根据 age 做一下 clusterby。根据 clusterby 的定义,它会将数据按照分区数进行重新分区,然后分区内排序。

  • 语句
[ CLUSTER BY { expression [ , ... ] } ]
  • 执行语句示例
SELECT age, name FROM person CLUSTER BY age;
  • 执行结果
agename
18John A
18Anil B
25Zen Hui
25Mike A
16Shone S
16Jack N

由于我们设置的分区数为2,age 为 18 和 25 的人会被分到一个组,age 为 16 的人会被分到另外一个组。

8 Window Function

窗口函数对一组行(称为窗口)进行操作,并基于这组行计算每行的返回值。窗口函数在处理任务中非常有用,例如计算移动平均值、计算累积统计数据或根据当前行的相对位置访问行的值。
窗口函数本身比较复杂,其包含三个主要部分:

  • Rank 函数:用于排序,又分为几个子类

    • RNAK:为每个窗口内的行分配一个唯一的序号。如果存在相同的值,则会跳过序号。例如,如果有两行并列第一,则下一个序号是3。

    • DENSE_RANK:与 RANK() 类似,但它不会跳过序号。如果有并列的排名,那么接下来的序号会连续。

    • PERCENT_RANK:计算每个行在一个窗口内的相对排名,返回一个介于0和1之间的值。该值表示行的排名除以窗口中总行数,再乘以100。

    • NTILE:将窗口内的行分成 N 组,每组大约有相同数量的行。这个函数返回一个介于 1 和 N 之间的整数,表示行所属的组。

    • ROW_NUMBER:为每个窗口内的行分配一个连续的序号,从1开始。

  • Analytic 函数

    • CUME_DIST:计算一个值在窗口中所有行的分布位置。它返回一个介于0和1之间的值,表示当前行的排名相对于窗口中所有行的排名的累积分布。简单来说,如果有许多行具有相同的值,CUME_DIST() 会将这些行视为一个组,并计算这个组在整个分布中的相对位置。

    • LAG:用于访问窗口中当前行前面的行中的数据。你可以指定你想要检索的行数,以及如果该行不存在时返回的默认值。

    • LEAD:与 LAG() 函数相反,它用于访问窗口中当前行后面的行中的数据。同样,你可以指定你想要检索的行数和默认值。

  • Aggregate 函数

    • MAX:取最大值

    • MIN:取最小值

    • COUNT:计算数量

    • SUM:求和

    • AVG:求平均

    • ...

除了窗口的计算部分,窗口还有大小的定义:

  • RANGE:定义一个区间范围

  • ROWS:定义包含哪些行

注意

  • 所有的这些窗口函数都需要一个 OVER 子句,用来定义窗口的规则。

  • OVER 子句可以包含 PARTITION BYORDER BYPARTITION BY 类似于 GROUP BY,表示按照哪些列对数据进行分区。ORDER BY 定义了行在窗口中的排序规则。

假设我们有一个名为 employee 的表,其中包含了员工的 name、dept、salary 和 age。
表结构:

CREATE TABLE employees (name STRING, dept STRING, salary INT, age INT);

其中包含数据:

namedeptsalaryage
ChloeEngineering2300025
FredEngineering2100028
PaulEngineering2900023
HelenMarketing2900040
TomEngineering2300033
JaneMarketing2900028
JeffMarketing3500038
EvanSales3200038
LisaSales1000035
AlexSales3000033

现在,让我们计算一下他们分部门的工资排序:

  • 执行语句示例
SELECT name, dept, RANK() OVER (PARTITION BY dept ORDER BY salary) AS rank FROM employees;
  • 执行结果
namedeptsalaryrank
Lisasales100001
Alexsales300002
Evansales320003
FredEngineering210001
TomEngineering230002
ChloeEngineering230002
PaulEngineering290004
HelenMarketing290001
JaneMarketing290001
JeffMarketing350003

现在我们分部门按照工资进行排序,并且分配给排序一个序号。这个序号采用 DENSE_RANK 的形式,也就是如果有相同的,那么分配同一个号码,下一个号码继续上一个号码。

SELECT name, dept, DENSE_RANK() OVER (PARTITION BY dept ORDER BY salary) AS dense_rank FROM employees;
  • 执行结果
namedeptsalarydense_rank
LisaSales100001
AlexSales300002
EvanSales320003
FredEngineering210001
TomEngineering230002
ChloeEngineering230002
PaulEngineering290003
HelenMarketing290001
JaneMarketing290001
JeffMarketing350002