本文介绍在 E-MapReduce(EMR) 集群2.x版本中,采用 Spark DataFrame API 方式对 Iceberg 表进行创建等操作。
需要在 EMR 集群上安装 Iceberg 组件。有两种方式可以安装Iceberg组件:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.4.8</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-spark-runtime-3.2_2.12</artifactId> <version>0.14.0</version> <scope>compile</scope> </dependency>
说明
Spark 组件和 Iceberg 组件的版本信息,请参考 EMR 服务中该组件对应的版本信息。
import org.apache.iceberg.hive.HiveCatalog; val catalog = new HiveCatalog() catalog.setConf(spark.sparkContext.hadoopConfiguration) val properties = new util.HashMap[String, String] properties.put("warehouse", "/user/hive/warehouse/iceberg/hive") properties.put("uri", "thrift://emr-master-1:9083") catalog.initialize("hive", properties) catalog.createTable(name, schema, spec);
说明
请根据 EMR 集群信息,填写 properties 的配置,其中 "warehouse" 是指数据存放的地址,"uri" 是指 Hive Metastore 的地址。
追加数据:
val dataFrame = spark.createDataFrame(Seq((2, "LiSi", 20))).toDF("id", "name", "age") dataFrame.write.format("iceberg").mode("append").save("db.table")
覆盖数据:
val dataFrame = spark.createDataFrame(Seq((3, "WangWu", 20))).toDF("id", "name", "age") dataFrame.write.format("iceberg").mode("overwrite").save("db.table")
查询数据:
val dataFrame = spark.table("iceberg.iceberg_db.iceberg_001") // named metastore table spark.read.format("iceberg").load("db.table")// Hadoop path table
To run SQL SELECT
statements on Iceberg tables in 2.4, register the DataFrame as a temporary table:
val df = spark.read.format("iceberg").load("db.table")df.createOrReplaceTempView("table") spark.sql("""select count(1) from table""").show()
本示例是使用 Spark DataFrame API 批式读写 Iceberg 表。
编写Spark代码。
以Scala版代码为例,代码示例如下。
package com.bytedance import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.iceberg.{PartitionSpec, Schema} import org.apache.iceberg.catalog.TableIdentifier import org.apache.iceberg.hive.HiveCatalog import org.apache.iceberg.types.Types object IcebergSpark2ScalaExample { def main(args: Array[String]): Unit = { // 配置使用数据湖元数据。 val sparkConf = new SparkConf() val spark = SparkSession .builder() .config(sparkConf) .appName("IcebergSparkScalaExample") .getOrCreate() val name = TableIdentifier.of("default","spark2_demo"); val tableName = name.toString val catalog = new HiveCatalog() catalog.setConf(spark.sparkContext.hadoopConfiguration) val properties = new util.HashMap[String, String] properties.put("warehouse", "/user/hive/warehouse/iceberg/hive") properties.put("uri", "thrift://emr-master-1:9083") catalog.initialize("hive", properties) val schema = new Schema( Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "name", Types.StringType.get()), Types.NestedField.required(3, "age", Types.IntegerType.get())) val spec = PartitionSpec.builderFor(schema).bucket("id", 2).build(); try { // 创建 Iceberg 表 catalog.createTable(name, schema, spec); } catch { case e: org.apache.iceberg.exceptions.AlreadyExistsException => } // 从DataFrame中创建或替换Iceberg表 val df1 = spark.createDataFrame(Seq((1, "ZhangSan", 20), (2, "LiSi", 25), (3, "WangWu", 30))) .toDF("id", "name", "age") df1.write.format("iceberg").mode("overwrite").save(tableName) // 读Iceberg表 spark.read.format("iceberg").load(tableName).show() // 将DataFrame写入Iceberg表 val df2 = spark.createDataFrame(Seq((4, "LiLei", 28), (5, "XiaoMing", 22))) .toDF("id", "name", "age") df2.write.format("iceberg").mode("append").save(tableName) // 读Iceberg表 spark.read.format("iceberg").load(tableName).show() } }
打包程序并部署到 EMR 集群。
<build> <plugins> <!-- the Maven Scala plugin will compile Scala source files --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
mvn clean package
通过spark-submit命令运行Spark作业
spark-submit --class com.bytedance.IcebergSpark2ScalaExample iceberg-spark2-example-1.0.jar
说明
class名字和JAR包,需根据自己代码工程修改。上述的iceberg-spark2-example-1.0.jar就是根据代码工程打出的JAR包。
运行结果如下
+---+--------+---+ | id| name|age| +---+--------+---+ | 4| LiLei| 28| | 1|ZhangSan| 20| | 5|XiaoMing| 22| | 2| LiSi| 25| | 3| WangWu| 30| +---+--------+---+