You need to enable JavaScript to run this app.
导航
基础使用
最近更新时间:2024.11.21 19:11:46首次发布时间:2022.08.19 11:39:52

本文将为您介绍Spark支持弹性分布式数据集(RDD)、Spark SQL、PySpark和数据库表的基础操作示例。

1 使用前提

已创建E-MapReduce(简称“EMR”)集群,详见:创建集群

2 RDD基础操作

Spark围绕着 RDD 的概念展开,RDD是可以并行操作的元素的容错集合。Spark支持通过集合来创建RDD和通过外部数据集构建RDD两种方式来创建RDD。例如,共享文件系统、HDFS、HBase或任何提供Hadoop InputFormat的数据集。

2.1 创建RDD示例:

  • 通过集合来创建RDD
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
  • 通过外部数据集构建RDD
val distFile = sc.textFile("data.txt")

RDD构建成功后,可以对其进行一系列操作,例如Map和Reduce等操作。
例如,运行以下代码,首先从外部存储系统读一个文本文件构造了一个RDD,然后通过RDD的Map算子计算得到了文本文件中每一行的长度,最后通过Reduce算子计算得到了文本文件中各行长度之和。

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

2.2 Spark RDD常用操作

通常,Spark RDD的常用操作有两种,分别为Transform操作和Action操作。Transform操作并不会立即执行,而是到了Action操作才会被执行。

  • Transform操作

操作

描述

map()

参数是函数,函数应用于RDD每一个元素,返回值是新的RDD。

flatMap()

参数是函数,函数应用于RDD每一个元素,将元素数据进行拆分,变成迭代器,返回值是新的RDD。

filter()

参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD。

distinct()

没有参数,将RDD里的元素进行去重操作。

union()

参数是RDD,生成包含两个RDD所有元素的新RDD。

intersection()

参数是RDD,求出两个RDD的共同元素。

subtract()

参数是RDD,将原RDD里和参数RDD里相同的元素去掉。

cartesian()

参数是RDD,求两个RDD的笛卡尔积。

  • Action操作

操作

描述

collect()

返回RDD所有元素。

count()

返回RDD中的元素个数。

countByValue()

返回各元素在RDD中出现的次数。

reduce()

并行整合所有RDD数据,例如求和操作。

fold(0)(func)

和reduce()功能一样,但是fold带有初始值。

aggregate(0)(seqOp,combop)

和reduce()功能一样,但是返回的RDD数据类型和原RDD不一样。

foreach(func)

对RDD每个元素都是使用特定函数。

3 Spark SQL 基础操作

Spark SQL支持直接通过SQL语句操作数据,而Spark会将SQL进行解析、优化并执行。
以下示例展示了如何使用Spark SQL进行读取文件。示例如下:

  • 示例1:Spark支持多种数据格式,本示例读取了JSON格式文件的数据,并输出为Parquet格式。
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
peopleDF.write.parquet("people.parquet")
  • 示例2:通过SQL从parquetFile表中读出年龄在13岁到19岁之间的年轻人的名字,并转化为DataFrame,随后通过Map操作将名字转化为一个可读的形式并输出。
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()

4 PySpark基础操作

PySpark是Spark提供的Python API。可以通过PySpark提供的DataFrame接口,完成各种计算逻辑。
操作步骤

  1. 初始化SparkSession。
  2. 初始化SparkSession作为PySpark的执行入口。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
  1. 创建DataFrame。
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
  1. DataFrame创建完成后,可以通过各种类型的transform算子完成数据计算。
  2. 打印DataFrame和Schema。
df.show()
df.printSchema()

5 基础库表操作

EMR SparkSQL完全兼容开源SparkSQL语法,以下对基本的库表操作做一个说明,其他详细指南可以参考开源SparkSQL语法说明

5.1 数据库操作

5.1.1 创建数据库

0: jdbc:hive2://emr-master-1-1:10005> create database db_demo;
+---------+
| Result  |
+---------+
+---------+
No rows selected (0.285 seconds)

5.1.2 查看数据库信息

0: jdbc:hive2://emr-master-1-1:10005> desc database db_demo;
+----------------------------+----------------------------------------------------+
| database_description_item  |             database_description_value             |
+----------------------------+----------------------------------------------------+
| Database Name              | db_demo                                            |
| Comment                    |                                                    |
| Location                   | hdfs://emr-master-1-1:8020/apps/spark/warehouse/db_demo.db |
| Owner                      | hive                                               |
+----------------------------+----------------------------------------------------

5.1.3 删除数据库

0: jdbc:hive2://emr-master-1-1:10005> drop database db_demo;
+---------+
| Result  |
+---------+
+---------+
No rows selected (0.266 seconds)

5.2 表操作

5.2.1 创建表

0: jdbc:hive2://emr-master-1-1:10005> create table tb_demo(id int, name string);
+---------+
| Result  |
+---------+
+---------+
No rows selected (0.128 seconds)

5.2.2 描述表信息

0: jdbc:hive2://emr-master-1-1:10005> desc table tb_demo;
+-----------+------------+----------+
| col_name  | data_type  | comment  |
+-----------+------------+----------+
| id        | int        | NULL     |
| name      | string     | NULL     |
+-----------+------------+----------+

5.2.3 删除表

0: jdbc:hive2://emr-master-1-1:10005> drop table tb_demo;
+---------+
| Result  |
+---------+
+---------+

5.2.4 插入数据

0: jdbc:hive2://emr-master-1-1:10005> insert into tb_demo select 1,'name1';
+---------+
| Result  |
+---------+
+---------+

5.2.5 查询表数据

0: jdbc:hive2://emr-master-1-1:10005> select * from tb_demo;
+-----+-------+
| id  | name  |
+-----+-------+
+ 1   + name1 +
+-----+-------+
1 rows selected (0.116 seconds)