PySpark 是 Apache Spark 的 Python API,它允许使用 Python 语言进行大数据处理。PySpark 支持多种数据处理任务,包括 SQL 查询、数据流处理、机器学习等。
EMR 的环境中已经默认包含了 PySpark,您无需额外安装。
PySpark 默认使用的环境由 PYSPARK_PYTHON
指定,用户可以通过进入 "EMR 控制台->集群->服务列表->Spark" 进行配置,配置文件为 spark-env.sh
。
启动 PySpark 会初始化一个 SparkSession,它是使用 PySpark 进行数据处理的入口点。使用 PySpark 有两种形式,非交互式(代码)和交互式。代码创建 SparkSession 的例子如下:
from pyspark.sql import SparkSession # 创建 sparkSession spark = SparkSession.builder \ .appName("MyApp") \ .getOrCreate() # your code here... # your code here... # your code here... # 停止 sparkSession spark.stop()
命令行交互式方式的启动如下:
pyspark --name "MyApp" --master yarn --num-executors 2 <other options>
启动之后,会默认初始化一个名为 spark
的 SparkSession
。
# 使用 Python 集合创建 RDD rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5]) print(rdd.collect()) # [1, 2, 3, 4, 5] # 对其中每个元素乘 2 mapped_rdd = rdd.map(lambda x: x * 2) print(mapped_rdd.collect()) # [2, 4, 6, 8, 10]
from pyspark.sql import DataFrame # 将 RDD 转换为 DataFrame rdd = spark.sparkContext.parallelize([("Sam", 28, 88), ("Flora", 28, 90), ("Run", 1, 60)]) df = rdd.toDF(["name", "age", "score"]) df.show() # +-----+---+-----+ # | name|age|score| # +-----+---+-----+ # | Sam| 28| 88| # |Flora| 28| 90| # | Run| 10| 60| # +-----+---+-----+ # 从基础数据结构创建 DataFrame df = spark.createDataFrame([ ("Sam", 28, 88), ("Flora", 28, 90), ("Run", 10, 60) ], schema='name string, age int, score int') df # 从其他数据结构创建 DataFrame,比如 pandas import pandas as pd pandas_df = pd.DataFrame([['Sam', 28, 88], ['Flora', 28, 90], ['Run', 10, 60]], columns=['name', 'age', 'score']) df = spark.createDataFrame(pandas_df) # 查询 df.select("name", "age").show() # +-----+---+ # | name|age| # +-----+---+ # | Sam| 28| # |Flora| 28| # | Run| 10| # +-----+---+ # 转换 df.selectExpr("name", "score-10").show() # +-----+-----+ # | name|score| # +-----+-----+ # | Sam| 78| # |Flora| 80| # | Run| 50| # +-----+-----+
PySpark 支持多种数据源的读写操作,包括 CSV、JSON、Parquet、JDBC 等。
# 从 CSV 文件读取数据 csv_df = spark.read.csv("people.csv", header=True, inferSchema=True) csv_df.show() # 从 Parquet 文件读取数据 parquet_df = spark.read.parquet("data.parquet") parquet_df.show()
# 将 DataFrame 写入 CSV 文件 df.write.csv("people_output.csv", header=True) # 将 DataFrame 写入 Parquet 文件 df.write.parquet("people_output.parquet")
通常相较于 PySpark API,数据科学家或者机器学习算法同学可能更熟悉 Pandas API。鉴于此,Spark 从 3.2 版本引入了 Pandas API,并在随后的几个版本对其进行了大量的改进、加强,到目前(3.5.1)为止,大部分 Pandas API 在 PySpark 中都得到了支持。Spark 中使用 Pandas API 和使用 Pandas 区别不大,一般情况下改一个 import 包即可:
import pyspark.pandas as ps
基础使用
import pandas as pd import numpy as np import pyspark.pandas as ps from pyspark.sql import SparkSession # 创建一个 "pandas" Series s = ps.Series([1, 3, 5, np.nan, 6, 8]) print(type(s)) # <class 'pyspark.pandas.series.Series'> # 创建一个 "pandas" DataFrame psdf = ps.DataFrame( {'a': [1, 2, 3, 4, 5, 6], 'b': [100, 200, 300, 400, 500, 600], 'c': ["one", "two", "three", "four", "five", "six"]}, index=[10, 20, 30, 40, 50, 60]) print(type(psdf) # <class 'pyspark.pandas.frame.DataFrame'> # 基于一个 pandas DataFrame 创建一个 "pandas" DataFrame pdf = pd.DataFrame() psdf = ps.from_pandas(pdf) # 将一个 "pandas DataFrame" 转化为 pandas DataFrame (注意由于 pandas 为单机引擎,这个操作会把 pyspark dataframe 的所有数据拉到本地) pdf = psdf.to_pandas() # 将一个 PySpark DataFrame 转化为 "pandas" DataFrame sdf = spark.range(10) psdf = sdf.pandas_api() # 创建 "pandas" DataFrame 之后,就可以像 pandas 那样操作该 DataFrame 了
PySpark 的工作原理是 python 算子通过 py4j
被翻译成 scala 算子在 JVM 上执行。但是有些时候用户会有一些自定义的 python 代码运行,这种情况下 Spark 会在 Executor 的进程外额外启动一个 python 进程来执行这些代码。这就导致两点:
spark.executor.memoryOverhead
向 Yarn 多申请一些内存。