You need to enable JavaScript to run this app.
导航
Spark RDD 基础使用
最近更新时间:2024.06.12 15:15:34首次发布时间:2024.06.12 15:15:34

1 什么是 Spark RDD

RDD(Resilient Distributed Dataset)即弹性分布式数据集,是 Spark 中最基本的数据抽象,它表示一个容错的、可并行处理的集合。RDD 由一系列分区(Partition)组成,每个分区都可以被独立地处理。RDD 还提供了一系列操作(算子),用于对数据进行各种转换和操作。

2 创建 RDD

在 Spark 中,可以通过多种方式创建 RDD,例如从本地文件、HDFS、Kafka 等数据源中读取数据,或者通过已有的 RDD 进行转换得到新的 RDD。以下是几种常见的创建 RDD 的方式:

  1. 从本地文件创建 RDD
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("LocalFileRDD").getOrCreate()
# 读取本地文件,创建 RDD
data = spark.read.text("path/to/file.txt")
  1. 从 HDFS 创建 RDD
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("HDFSRDD").getOrCreate()
# 读取 HDFS 文件,创建 RDD
data = spark.read.csv("hdfs://localhost:9000/path/to/file.csv")
  1. 通过已有的 RDD 进行转换得到新的 RDD
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("TransformRDD").getOrCreate()
# 创建一个整数类型的 RDD
intRDD = spark.range(1, 10)
# 将 intRDD 转换为字符串类型的 RDD
strRDD = intRDD.map(lambda x: str(x))
# 打印 strRDD 的内容
strRDD.collect()

3 RDD 操作算子

RDD 提供了一系列操作算子,用于对数据进行各种转换和操作。以下是几种常见的 RDD 操作算子:

  1. map() 算子:对 RDD 中的每个元素进行映射操作,返回一个新的 RDD
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MapOperator").getOrCreate()
# 创建一个整数类型的 RDD
intRDD = spark.range(1, 10)
# 将 intRDD 中的每个元素加 1,返回一个新的 RDD
resultRDD = intRDD.map(lambda x: x + 1)
# 打印结果 RDD 的内容
resultRDD.collect()
  1. filter() 算子:对 RDD 中的每个元素进行过滤操作,返回一个新的 RDD
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("FilterOperator").getOrCreate()
# 创建一个整数类型的 RDD
intRDD = spark.range(1, 10)
# 过滤出偶数,返回一个新的 RDD
evenRDD = intRDD.filter(lambda x: x % 2 == 0)
# 打印偶数 RDD 的内容
evenRDD.collect()
  1. reduce() 算子:对 RDD 中的每个分区进行聚合操作,返回一个新的 RDD
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ReduceOperator").getOrCreate()
# 创建一个整数类型的 RDD
intRDD = spark.range(1, 10)
# 对每个分区的元素进行求和,返回一个新的 RDD
resultRDD = intRDD.reduce(lambda x, y: x + y)
# 打印结果 RDD 的内容
resultRDD.collect()
  1. groupByKey() 算子:对 RDD 中的数据按照指定的键进行分组,并返回一个新的 RDD,其中每个元素都是一个 Tuple2,包含分组键和对应分组的元素集合。
from pyspark.sql import SparkSession
# 创建 SparkSession 实例
spark = SparkSession.builder.appName("GroupByKeyOperator").getOrCreate()
# 创建一个示例 RDD
data = [("A", 1), ("B", 2), ("A", 3), ("B", 4), ("A", 5)]
rdd = spark.sparkContext.parallelize(data)
# 使用 groupByKey() 进行分组
grouped_rdd = rdd.groupbyKey(lambda x: x[0])
# 打印分组结果
for key, group in grouped_rdd.collect():
    print(f"Key: {key}, Values: {group}")

4 RDD 持久化

在 Spark 中,RDD 的计算是基于内存的,如果 RDD 中的数据量很大,可能会导致内存不足,从而影响计算性能。为了解决这个问题,Spark 提供了 RDD 持久化的功能,可以将 RDD中的数据持久化到磁盘上,以释放内存。以下是几种常见的 RDD 持久化方式:

  1. cache() 方法:将 RDD 数据缓存到内存中,以加快后续的计算速度。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CacheRDD").getOrCreate()
# 创建一个整数类型的 RDD
intRDD = spark.range(1, 10)
# 将 intRDD 数据缓存到内存中
intRDD.cache()
  1. persist() 方法:将 RDD 数据持久化到磁盘上,以释放内存。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PersistRDD").getOrCreate()
# 创建一个整数类型的 RDD
intRDD = spark.range(1, 10)
# 将 intRDD 数据持久化到磁盘上
intRDD.persist()