Iceberg 表的数据,可以放在火山引擎对象存储服务 TOS 中。本章节为您介绍不同引擎组件在 TOS 中创建 Iceberg 表的示例。
适合火山引擎 E-MapReduce(EMR)3.1.0 之后的版本(如:3.2.1,3.3.0等)。
执行的账号,需确保拥有 TOS 的认证和读写权限,参考Hadoop 使用 Proton。
说明
已创建包含 Iceberg、Spark 组件的 EMR 集群,详见创建集群。
在创建 EMR 集群时,选择 Icerberg 作为可选组件
对已安装 EMR 集群,参考服务管理章节添加 Iceberg 服务
Spark 组件操作示例不适配 EMR2.x 版本。关于 EMR2.x 版本,Spark 组件操作 Iceberg 表,请参考 TOS 中操作 Iceberg 表(适用于EMR2.x版本)。
登录 EMR 集群主节点,详情参见登录集群。
执行命令,通过 Spark SQL 读写 Iceberg 配置,在 Spark SQL 中操作 Iceberg。详见 Iceberg 基础使用中 Spark 组件的操作示例。
配置完成并进入 spark-sql 命令行后,在创建数据库或表的时候,指定为 TOS 地址即可,也可以配置 Iceberg 表的 warehouse 仓库地址为 TOS 地址,选择其中一种方式即可。如下:
spark-sql \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.hive=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.hive.type=hive \ --conf spark.sql.catalog.hive.uri='{hive metastore地址}' --conf spark.sql.catalog.hive.warehouse='{TOS地址}'
说明
示例中配置项的描述可参考 Iceberg 高阶使用中的参数配置。 若在 EMR 集群上运行,参数 spark.sql.catalog.hive.uri
可以省略。
CREATE DATABASE IF NOT EXISTS hive.iceberg_db_1 LOCATION '{TOS地址}';
CREATE TABLE IF NOT EXISTS hive.iceberg_db_1.spark_pokes( rank BIGINT, suite STRING ) USING iceberg LOCATION '{TOS地址}';
说明
用例中{TOS地址}
,需要根据实际情况修改,可参考以下示例填写:tos://ceshi/autotest/iceberg/spark
启动 SparkSQL
spark-sql \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.hive=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.hive.type=hive \ --conf spark.sql.catalog.hive.uri=thrift://emr-master-1:9083 \ --conf spark.sql.catalog.hive.warehouse='tos://{桶名称}/tmp/warehouse'
说明
用例中{
桶名称
}
,需要根据实际情况修改,可参考以下示例填写:tos://ceshi/autotest/iceberg/spark
基础操作
说明
如果想在 Trino/Presto/Hive 中可以操作 Spark 创建 Iceberg 表,需要进入集群详情 > 服务列表 > HDFS > 服务参数界面,在 HDFS 的服务参数 hdfs-site 中添加iceberg.engine.hive.enabled=true
配置,并重启 HDFS 和 Hive 组件,然后在 Spark 中创建 Iceberg 表,Hive 中采用操作该表。
CREATE DATABASE IF NOT EXISTS hive.iceberg_db;
CREATE TABLE IF NOT EXISTS hive.iceberg_db.spark_pokes( rank BIGINT, suite STRING ) USING iceberg;
SHOW CREATE TABLE hive.iceberg_db.spark_pokes;
从表的信息中可以看到表数据的LOCATION地址是TOS上。
INSERT INTO hive.iceberg_db.spark_pokes VALUES (1, 'name1'), (2, 'name2');
SELECT * FROM hive.iceberg_db.spark_pokes;
Hive 可以通过建库表时指定 LOCATION 参数,将数据放在 TOS 中。举例如下:
创建库指定 TOS 地址:
CREATE DATABASE IF NOT EXISTS iceberg_db LOCATION '{TOS地址}';
创建表指定 TOS 地址:
CREATE EXTERNAL TABLE iceberg_db.table_a1(i INT, s STRING) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION '{TOS地址}' TBLPROPERTIES ('iceberg.catalog'='{iceberg catalog名称}');
启动 Hive 控制台后,设置 Iceberg 的相关信息
SET iceberg.catalog.iceberg_catalog.type=hive; SET iceberg.catalog.iceberg_catalog.uri=thrift://emr-master-1:9083; SET iceberg.catalog.iceberg_catalog.clients=10; SET iceberg.catalog.iceberg_catalog.warehouse=tos://{桶名称}/tmp/warehouse;
创建库表
CREATE DATABASE IF NOT EXISTS d1 LOCATION 'tos://{桶名称}/tmp/warehouse/d1'; CREATE EXTERNAL TABLE d1.t1(i INT, s STRING) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' TBLPROPERTIES ('iceberg.catalog'='iceberg_catalog'); CREATE DATABASE IF NOT EXISTS d2; CREATE EXTERNAL TABLE d2.t1(i INT, s STRING) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'tos://{桶名称}/tmp/warehouse/d2/t1' TBLPROPERTIES ('iceberg.catalog'='iceberg_catalog');
说明
用例中{
桶名称
}
,需要根据实际情况修改,可参考以下示例填写:tos://ceshi/autotest/iceberg/hive
查看表信息
SHOW CREATE TABLE d2.t1;
从表的信息中可以看到表数据的 LOCATION 地址是 TOS上 。
插入数据
INSERT INTO d1.t1 VALUES(1, 'a'); INSERT INTO d2.t1 VALUES(1, 'a');
TOS中查看数据
Presto/Trino 操作步骤类似,下面以 Trino 为例介绍。
说明
已创建包含 Iceberg、Trino、Presto 组件的 EMR 集群,详见创建集群。
在 HDFS 中添加配置参数,并进行 HDFS 和 Trino 组件的重启:
登录 EMR 控制台。
在左侧导航栏中,进入集群管理 > 集群列表 > 集群详情 > 服务列表 > HDFS > 服务参数界面。
在 HDFS 的服务参数界面中,单击添加自定义参数,配置文件选择 core-site,在里面添加fs.file.impl.disable.cache=true
参数,并单击确定按钮,完成参数保存。
完成参数配置后,重启 HDFS 和 Trino 组件。
执行 Trino cli 命令,创建 Iceberg 表,并指定 TOS 地址:
登录 EMR 集群主节点,详情参见登录集群。
进入 Trino cli 端,并使用 Iceberg 连接器获取 Metastore,详见 Iceberg与Trino/Presto集成。
创建表,将表的 location 地址设置为 TOS 地址:
CREATE TABLE iceberg.default.t1 ( order_id BIGINT, order_date DATE, account_number BIGINT, customer VARCHAR, country VARCHAR ) WITH ( location='{TOS地址}', format = 'ORC', partitioning = ARRAY['bucket(account_number, 10)','country'] )
说明
用例中{TOS地址}
,需要根据实际情况修改,可参考以下示例填写:tos://ceshi/autotest/iceberg/trino
插入数据:
INSERT INTO iceberg.default.t1 VALUES (2, CAST('2021-01-12' AS DATE), 670011, 'bcd', 'ca');
查询数据:
SELECT * FROM iceberg.default.t1;