You need to enable JavaScript to run this app.
导航
PySpark 基础使用
最近更新时间:2024.06.12 14:54:12首次发布时间:2024.06.12 14:54:12

1 介绍

PySpark 是 Apache Spark 的 Python API,它允许使用 Python 语言进行大数据处理。PySpark 支持多种数据处理任务,包括 SQL 查询、数据流处理、机器学习等。

2 安装 PySpark 和配置环境

EMR 的环境中已经默认包含了 PySpark,您无需额外安装。
PySpark 默认使用的环境由 PYSPARK_PYTHON 指定,用户可以通过进入 "EMR 控制台->集群->服务列表->Spark" 进行配置,配置文件为 spark-env.sh

3 启动/停止 SparkSession

启动 PySpark 会初始化一个 SparkSession,它是使用 PySpark 进行数据处理的入口点。使用 PySpark 有两种形式,非交互式(代码)和交互式。代码创建 SparkSession 的例子如下:

from pyspark.sql import SparkSession

# 创建 sparkSession
spark = SparkSession.builder \
    .appName("MyApp") \
    .getOrCreate()
    
# your code here...
# your code here...
# your code here...

# 停止 sparkSession
spark.stop()

命令行交互式方式的启动如下:

pyspark --name "MyApp" --master yarn --num-executors 2 <other options>

启动之后,会默认初始化一个名为 sparkSparkSession

4 基本操作

4.1 RDD

# 使用 Python 集合创建 RDD
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
print(rdd.collect())
# [1, 2, 3, 4, 5]

# 对其中每个元素乘 2
mapped_rdd = rdd.map(lambda x: x * 2)
print(mapped_rdd.collect())
# [2, 4, 6, 8, 10]

4.2 DataFrame

from pyspark.sql import DataFrame

# 将 RDD 转换为 DataFrame
rdd = spark.sparkContext.parallelize([("Sam", 28, 88), ("Flora", 28, 90), ("Run", 1, 60)])
df = rdd.toDF(["name", "age", "score"])
df.show()
# +-----+---+-----+
# | name|age|score|
# +-----+---+-----+
# |  Sam| 28|   88|
# |Flora| 28|   90|
# |  Run| 10|   60|
# +-----+---+-----+

# 从基础数据结构创建 DataFrame
df = spark.createDataFrame([
    ("Sam", 28, 88),
    ("Flora", 28, 90),
    ("Run", 10, 60)
], schema='name string, age int, score int')
df

# 从其他数据结构创建 DataFrame,比如 pandas
import pandas as pd
pandas_df = pd.DataFrame([['Sam', 28, 88], ['Flora', 28, 90], ['Run', 10, 60]],
                  columns=['name', 'age', 'score'])
df = spark.createDataFrame(pandas_df)

# 查询
df.select("name", "age").show()
# +-----+---+
# | name|age|
# +-----+---+
# |  Sam| 28|
# |Flora| 28|
# |  Run| 10|
# +-----+---+

# 转换
df.selectExpr("name", "score-10").show()
# +-----+-----+
# | name|score|
# +-----+-----+
# |  Sam|   78|
# |Flora|   80|
# |  Run|   50|
# +-----+-----+

4.3 数据读写

PySpark 支持多种数据源的读写操作,包括 CSV、JSON、Parquet、JDBC 等。

  • 读取数据
# 从 CSV 文件读取数据
csv_df = spark.read.csv("people.csv", header=True, inferSchema=True)
csv_df.show()

# 从 Parquet 文件读取数据
parquet_df = spark.read.parquet("data.parquet")
parquet_df.show()
  • 写入数据
# 将 DataFrame 写入 CSV 文件
df.write.csv("people_output.csv", header=True)

# 将 DataFrame 写入 Parquet 文件
df.write.parquet("people_output.parquet")

5 Pandas API

通常相较于 PySpark API,数据科学家或者机器学习算法同学可能更熟悉 Pandas API。鉴于此,Spark 从 3.2 版本引入了 Pandas API,并在随后的几个版本对其进行了大量的改进、加强,到目前(3.5.1)为止,大部分 Pandas API 在 PySpark 中都得到了支持。Spark 中使用 Pandas API 和使用 Pandas 区别不大,一般情况下改一个 import 包即可:

import pyspark.pandas as ps

基础使用

import pandas as pd
import numpy as np
import pyspark.pandas as ps
from pyspark.sql import SparkSession

# 创建一个 "pandas" Series
s = ps.Series([1, 3, 5, np.nan, 6, 8])
print(type(s))
# <class 'pyspark.pandas.series.Series'>

# 创建一个 "pandas" DataFrame
psdf = ps.DataFrame(
    {'a': [1, 2, 3, 4, 5, 6],
     'b': [100, 200, 300, 400, 500, 600],
     'c': ["one", "two", "three", "four", "five", "six"]},
    index=[10, 20, 30, 40, 50, 60])
print(type(psdf)
# <class 'pyspark.pandas.frame.DataFrame'>

# 基于一个 pandas DataFrame 创建一个 "pandas" DataFrame
pdf = pd.DataFrame()
psdf = ps.from_pandas(pdf)

# 将一个 "pandas DataFrame" 转化为 pandas DataFrame (注意由于 pandas 为单机引擎,这个操作会把 pyspark dataframe 的所有数据拉到本地)
pdf = psdf.to_pandas()

# 将一个 PySpark DataFrame 转化为 "pandas" DataFrame
sdf = spark.range(10)
psdf = sdf.pandas_api()

# 创建 "pandas" DataFrame 之后,就可以像 pandas 那样操作该 DataFrame 了

6 注意事项

PySpark 的工作原理是 python 算子通过 py4j 被翻译成 scala 算子在 JVM 上执行。但是有些时候用户会有一些自定义的 python 代码运行,这种情况下 Spark 会在 Executor 的进程外额外启动一个 python 进程来执行这些代码。这就导致两点:

  1. JVM 和 Python 之间有进程间通信,使得计算效率相较于 scala 程序会打折扣。
  2. 默认情况下 Python 代码的内存管理需要开发者自己保证,因为这些内存 Spark 无法进行管理。如果使用超量很有可能会导致进程被 Yarn 或者操作系统杀死。这个时候可以配置 spark.executor.memoryOverhead 向 Yarn 多申请一些内存。