本文以 Spark 3.x 操作 Iceberg 表为例介绍如何通过 Spark Structured Streaming 流式读写 Iceberg 表。
适合 E-MapReduce(EMR) 1.2.0以后的版本(包括 EMR 1.2.0)
不适配 EMR 2.x 的版本。EMR2.x 版本中 Spark 流式读写 Iceberg,请参考 Spark流式读写 Icerberg(适用于EMR 2.x版本)
已创建 EMR 集群,且安装有 Iceberg 组件。有两种方式可以安装 Iceberg 组件:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.2.1</version> <scope>provided</scope> </dependency>
Spark Structured Streaming 通过 DataStreamWriter 接口流式写数据到 Iceberg 表,代码如下。
val tableIdentifier: String = "iceberg.iceberg_db.streamingtable" val checkpointPath: String = "/tmp/iceberg_checkpointPath" data.writeStream .format("iceberg") .outputMode("append") .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)) .option("path", tableIdentifier) .option("checkpointLocation", checkpointPath) .start()
说明
代码中的 tableIdentifier 是元数据表名或者表路径。checkpointPath 是 spark 流数据处理程序使用的 checkpoint 地址。流式写入支持以下两种方式:
append:追加每个批次的数据到Iceberg表,相当于insert into。
complete:使用最新批次的数据完全覆盖Iceberg,相当于insert overwrite。
val df = spark.readStream .format("iceberg") .option("stream-from-timestamp", Long.toString(streamStartTimestamp)) .load("database.table_name")
本示例上采用 linux 的 netcat 命令发送数据,Spark 接收数据后写入 Iceberg 表中。
以 Scala 版代码为例,代码示例如下。
import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.Trigger object IcebergSparkStreamingScalaExample { def main(args: Array[String]): Unit = { // 配置使用数据湖元数据。 val sparkConf = new SparkConf() val spark = SparkSession .builder() .config(sparkConf) .appName("IcebergSparkStreamingScalaExample") .getOrCreate() import spark.implicits._ // Create DataFrame representing the stream of input lines from connection to localhost:9999 val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() // Split the lines into words val words = lines.as[String].flatMap(_.split(" ")) val checkpointPath = "/tmp/iceberg_checkpointPath" // 流式写入Iceberg表 val query = words.toDF().writeStream .format("iceberg") .outputMode("append") .trigger(Trigger.ProcessingTime("2 seconds")) .option("checkpointLocation", checkpointPath) .option("path", "iceberg.iceberg_db.streamingtable") .start() query.awaitTermination() } }
打包程序并部署到EMR集群。
<build> <plugins> <!-- the Maven Scala plugin will compile Scala source files --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
mvn clean package
通过 Spark SQL 创建测试使用的数据库 iceberg_db 和表 streamingtable,详细操作请参见 基础使用。也可以直接输入下面命令:
spark-sql --master yarn \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.iceberg.type=hadoop \ --conf spark.sql.catalog.iceberg.warehouse=/warehouse/tablespace/managed/hive
接下来,在 Spark SQL 控制台中执行下面的sql语句:
CREATE DATABASE IF NOT EXISTS iceberg_db; CREATE TABLE IF NOT EXISTS iceberg.iceberg_db.streamingtable(value STRING) USING iceberg;
通过 Linux 的 netcat 命令准备一些数据
netcat -lk -p 9999
并输入一些字符串。
通过spark-submit命令运行Spark作业
spark-submit --class com.bytedance.IcebergSparkStreamingScalaExample iceberg-spark-example-1.0.jar
说明
class名字和JAR包,需根据自己代码工程修改。上述的iceberg-spark-example-1.0.jar就是根据代码工程打出的JAR包。
通过 Spark SQL 查看Iceberg表的数据运行结果如下:
spark-sql --master yarn \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.iceberg.type=hadoop \ --conf spark.sql.catalog.iceberg.warehouse=/warehouse/tablespace/managed/hive
在 Spark SQL 控制台中执行下面的sql语句:
SELECT * FROM iceberg.iceberg_db.streamingtable;
可以打印出第4步输入的数据。