You need to enable JavaScript to run this app.
导航
Iceberg 基础使用
最近更新时间:2023.08.28 10:41:36首次发布时间:2022.09.22 16:55:46

本文介绍在 E-MapReduce(EMR) 集群,通过 Spark SQL 对 Iceberg 表进行创建等操作。

1 前提条件

  1. 适合EMR 1.2.0以后的版本(包括EMR 1.2.0)。

  2. 本文不适配 EMR2.x 版本。关于 EMR2.x 版本的 Spark 操作 Iceberg 表,请参考 Iceberg基础使用(适用于EMR2.x版本)

  3. 已创建 EMR 集群,且安装有Iceberg组件。有两种方式可以安装Iceberg组件:

    1. 在创建 E-MapReduce 集群时,选择 Icerberg作为可选组件,详见:创建集群

    2. 对已安装 E-MapReduce 集群,参考 服务管理章节 添加Iceberg服务。

2 操作步骤

  1. 使用 SSH 方式登录到集群主节点,详情请参见使用 SSH连接主节点

  2. 执行以下命令,通过 Spark SQL 读写 Iceberg 配置:

    在 Spark SQL 中操作 Iceberg,首先需要配置 Catalog。Catalog的配置以 spark.sql.catalog.<catalog_name> 作为前缀。如以 hive 作为 Catalog 名称时,参数如下:

    spark-sql \
      --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
      --conf spark.sql.catalog.hive=org.apache.iceberg.spark.SparkCatalog \
      --conf spark.sql.catalog.hive.type=hive \
      --conf spark.sql.catalog.hive.uri=thrift://emr-master-1:9083
    

说明

本文示例中的 hive 为您创建的 Catalog 名称。为保证操作的数据库和表都是在指定的 Catalog 下,您需要在命令中的数据库和表名前带上 hive 。例如:

  1. 访问 hive 下的数据库 iceberg_db,应为 hive.iceberg_db
  2. 访问 hive 下,数据库名为 iceberg_db 下的表 table1,应为 hive.iceberg_db.table1

以上命令示例中配置项的描述如下,也可参考 Iceberg高阶使用参数配置章节获得更多配置项解释:

  • spark.sql.extensions:SparkSQL扩展模块。固定值为org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions。

  • spark.sql.catalog.hive:catalog名称。固定值为org.apache.iceberg.spark.SparkCatalog。

  • spark.sql.catalog.hive.type:Catalog type,可以配置为hive 或hadoop。本示例中采用hive。

  • spark.sql.catalog.hive.uri :Hive Metastore 的 url 地址,格式为:thrift://host:port。查看 Hive 组件的配置文件 hive-site 中hive.metastore.uris参数对应的值。若在EMR集群上运行,参数spark.sql.catalog.hive.uri可以省略。

执行以下命令,就可以进入 spark-sql 的命令行

spark-sql>
  1. 数据库表基础操作

    说明

    如果想在 Trino/Presto/Hive 中可以操作 Spark 创建 Iceberg 表,需要进入集群详情 > 服务列表 > HDFS > 服务参数界面,在 HDFS 的服务参数 hdfs-site 中添加iceberg.engine.hive.enabled=true配置,并重启 HDFS 和 Hive 组件,然后在 Spark 中创建 Iceberg 表,Hive 中采用操作该表。

    • 创建库

      CREATE DATABASE IF NOT EXISTS hive.iceberg_db;
      
    • 创建表

      CREATE TABLE  IF  NOT EXISTS  hive.iceberg_db.spark_pokes(
          rank  BIGINT, 
          suite STRING
      ) 
      USING  iceberg;
      

      这里,建表语句支持 COMMENT、PARTITIONED BY、TBLPROPERTIES 等语法,详细配置参考官网文档

      例如,使用 PARTITIONED BY 设置分区属性示例如下:

      CREATE TABLE IF NOT EXISTS hive.iceberg_db.spark_pokes(
      rank BIGINT, 
      suite STRING
      ) 
      USING iceberg
      PARTITIONED BY (suite);
      
    • 写入数据

      INSERT INTO hive.iceberg_db.spark_pokes VALUES (1, 'name1'), (2, 'name2');
      
    • 查询数据

      SELECT * FROM hive.iceberg_db.spark_pokes;
      
    • 更新数据

      UPDATE hive.iceberg_db.spark_pokes SET suite = 'volcano' WHERE rank = 2;
      
    • 删除数据

      DELETE FROM hive.iceberg_db.spark_pokes WHERE rank = 2;
      
    • 删除表

      DROP TABLE hive.iceberg_db.spark_pokes;
      

      说明

      DROP TABEL方式删除表,只是从 Hive Metastore 中删除表的元数据,表的数据还是会保留。