Apache Spark 是一种用于大数据工作负载的分布式开源处理系统。本文以 Spark 3.x 操作Iceberg表为例,介绍如何通过 Spark API 以批处理的方式读写 Iceberg 表。
适合 E-MapReduce(EMR) 1.2.0以后的版本(包括 EMR 1.2.0)
不适配 EMR2.x 版本。关于 EMR2.x 版本的 Spark 操作 Iceberg 表,请参考 Iceberg基础使用(适用于EMR2.x版本)
已创建 EMR 集群,且安装有 Iceberg 组件。有两种方式可以安装 Iceberg 组件:
新建 Maven 项目并引入 pom 依赖:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.2.1</version> <scope>provided</scope> </dependency>
说明
Spark 组件和 Iceberg 组件的版本信息,需参考 EMR 服务中该组件对应的版本信息。
配置 Catalog:
Spark 3.x写数据到Iceberg表,V1 DataFrame API已不推荐使用,建议采用DataFrameWriterV2 API。以下代码以V2 API写入表名为 sample 的 iceberg 表为例。。
首先需要配置Catalog,在SparkConf中加入必要配置项即可。
sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") sparkConf.set("spark.sql.catalog.<yourCatalogName>", "org.apache.iceberg.spark.SparkCatalog") sparkConf.set("spark.sql.catalog.<yourCatalogName>.type", "hadoop") sparkConf.set("spark.sql.catalog.<yourCatalogName>.warehouse", "hdfs://emr-master-1:8020/user/hive/warehouse/iceberg/hive")
说明
<yourCatalogName>
为 Catalog 的名称,请根据实际情况修改 Catalog 名称。创建表:
val dataFrame = spark.createDataFrame(Seq((1, "ZhangSan", 20))).toDF("id", "name", "age") dataFrame.writeTo("iceberg.iceberg_db.iceberg_001").create()
追加数据:
val dataFrame = spark.createDataFrame(Seq((2, "LiSi", 20))).toDF("id", "name", "age") dataFrame.writeTo("iceberg.iceberg_db.iceberg_001").append()
覆盖数据:
val dataFrame = spark.createDataFrame(Seq((3, "WangWu", 20))).toDF("id", "name", "age") dataFrame.writeTo("iceberg.iceberg_db.iceberg_001").overwritePartitions()
查询数据:
val dataFrame = spark.table("iceberg.iceberg_db.iceberg_001")
本示例是使用Spark DataFrame API批式读写Iceberg表。
编写Spark代码。
以Scala版代码为例,代码示例如下。
package com.bytedance import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object IcebergSparkScalaExample { def main(args: Array[String]): Unit = { // 配置使用数据湖元数据。 val sparkConf = new SparkConf() sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") sparkConf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") sparkConf.set("spark.sql.catalog.iceberg.type", "hadoop") sparkConf.set("spark.sql.catalog.iceberg.warehouse", "/warehouse/tablespace/managed/hive") val spark = SparkSession .builder() .config(sparkConf) .appName("IcebergSparkScalaExample") .getOrCreate() // 从DataFrame中创建或替换Iceberg表 val df1 = spark.createDataFrame( Seq((1, "ZhangSan", 20), (2, "LiSi", 25), (3, "WangWu", 30)) ) .toDF("id", "name", "age") df1.writeTo("iceberg.iceberg_db.sample").createOrReplace() // 读Iceberg表 spark.table("iceberg.iceberg_db.sample").show() // 将DataFrame写入Iceberg表 val df2 = spark.createDataFrame(Seq((4, "LiLei", 28), (5, "XiaoMing", 22))) .toDF("id", "name", "age") df2.writeTo("iceberg.iceberg_db.sample").append() // 读Iceberg表 spark.table("iceberg.iceberg_db.sample").show() } }
打包程序并部署到EMR集群。
<build> <plugins> <!-- the Maven Scala plugin will compile Scala source files --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
mvn clean package
通过spark-submit命令运行Spark作业
spark-submit --class com.bytedance.IcebergSparkScalaExample iceberg-spark-example-1.0.jar
说明
class名字和JAR包,需根据自己代码工程修改。上述的iceberg-spark-example-1.0.jar就是根据代码工程打出的JAR包。
运行结果如下
+---+--------+---+ | id| name|age| +---+--------+---+ | 4| LiLei| 28| | 1|ZhangSan| 20| | 5|XiaoMing| 22| | 2| LiSi| 25| | 3| WangWu| 30| +---+--------+---+