EMR SparkSQL 完全兼容开源 SparkSQL 语法,以下对基本的库表操作做一个说明,其他详细指南可以参考开源 SparkSQL 语法说明。
非Spark官网支持的SQL语法,需要额外通过设置下述参数来支持(例如Iceberg的procedures)。
set emr.serverless.spark.custom.parse.enabled = true;
-- 创建数据库 create database db_demo; -- 创建数据库,指定自定义TOS桶路径进行存储-- 注意要确保该TOS桶存在,并且当前用户有该桶路径的读写权限 create database db_demo location 'tos://您的tos bucket name/warehouse/'; --查看数据库信息 desc database db_demo; -- 删除数据库 drop database db_demo;
use db_demo; -- 创建表 create table tb_demo(id int, name string); -- 创建表指定指定自定义TOS桶路径进行存储, 创建的表是外表 create table tb_demo(id int, name string) LOCATION 'tos://您的tos bucket name/xxx/tb_demo'; -- 创建外表,效果同上 create EXTERNAL table tb_demo(id int, name string) LOCATION 'tos://您的tos bucket name/xxx/tb_demo'; DESCRIBE EXTENDED tb_demo; -- 描述表信息 desc table tb_demo; -- 查询建表语句 show create table tb_demo; -- 删除表 drop table tb_demo; -- 插入数据 insert into tb_demo select 1,'name1'; -- 查询表数据 select * from tb_demo;
-- 上传UDF, 上传到TOS对应路径-- 创建udf,默认在hive catalog中 CREATE FUNCTION <schemaName>.<functionName> AS '<funcClassName>' using jar 'tos://您的tos bucket name/您的jar包地址'; -- Spark使用UDF select dbname.udfname('aaabbB'); select hive.dbname.udfname('aaabbB'); -- presto 使用UDF select hive.dbname.udfname('aaabbB');
create database if not exists test_hive_db; create table if not EXISTS test_hive_db.test_tb(id int, name string); insert into table test_hive_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from test_hive_db.test_tb; drop table if exists test_hive_db.test_tb; drop database if exists test_hive_db;
-- 默认参数 set spark.sql.catalog.spark_iceberg_catalog=org.apache.iceberg.spark.SparkCatalog; set spark.sql.catalog.spark_iceberg_catalog.type=hive; set spark.sql.storeAssignmentPolicy=ansi; use spark_iceberg_catalog; create database if not exists test_iceberg_db; create table if not EXISTS test_iceberg_db.test_tb(id int, name string); insert into table test_iceberg_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from test_iceberg_db.test_tb; drop table if exists test_iceberg_db.test_tb; drop database if exists test_iceberg_db; -- 访问iceberg元数据表,如history、snapshots等元数据表 set spark.sql.threePartIdentifier.catalogService.enabled=false; select * from test_iceberg_db.test_tb.history;
-- 默认参数 set spark.sql.catalog.spark_catalog=org.apache.paimon.spark.SparkGenericCatalog; set spark.sql.catalog.spark_catalog.metastore=hive; set spark.sql.storeAssignmentPolicy=ansi; -- 自定义warehouse路径 set spark.sql.catalog.spark_catalog.warehouse=tos://tos_bucket_name/warehouse; create database if not exists test_paimon_db; create table if not EXISTS test_paimon_db.test_tb(id int, name string); insert into table test_paimon_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from test_paimon_db.test_tb; drop table if exists test_paimon_db.test_tb; drop database if exists test_paimon_db;
-- 默认参数 set spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog; set spark.serializer=org.apache.spark.serializer.KryoSerializer; create database if not exists test_hudi_db; create table if not EXISTS test_hudi_db.test_tb(id int, name string); insert into table test_hudi_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from test_hudi_db.test_tb; drop table if exists test_hudi_db.test_tb; drop database if exists test_hudi_db;
-- 默认参数 set spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog; create database if not exists test_del_db; create table if not EXISTS test_del_db.test_tb(id int, name string); insert into table test_del_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from test_del_db.test_tb; drop table if exists test_del_db.test_tb; drop database if exists test_del_db;
说明
提示
若需要创建LAS Catalog数据目录可以跳转至LAS控制台操作。
Spark默认访问的是LAS Catalog中Hive数据目录下的库表,若您需要访问LAS Catalog下除了Hive以外的数据目录,可以通过以下方式访问:
对于DDL,可以通过参数设置需要访问的LAS Catalog数据目录,比如
set spark.hadoop.hive.metastore.catalog.default=your_las_catalog; -- 指定your_las_catalog为需要访问的LAS Catalog数据目录 --DDL create database if not exists test_hive_db; create table if not EXISTS test_hive_db.test_tb(id int, name string);
说明
注意:spark.hadoop.hive.metastore.catalog.default参数是对当前整个会话生效,当前会话中所有的库表都应当存在于同一个LAS Catalog数据目录下。
对于DML、DQL,除了可以设置上述参数,也可以在表名上使用三段式的命名来指定自定义LAS Catalog数据目录下的库表,比如:
insert into table your_las_catalog.test_hive_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from your_las_catalog.test_hive_db.test_tb;
说明
注意
DDL不支持使用三段式访问支持访问多个LAS Catalog数据目录。
针对每一种湖仓格式的完整示例可以参考如下。
DDL
set spark.hadoop.hive.metastore.catalog.default=your_las_catalog; --DDL create database if not exists test_hive_db; create table if not EXISTS test_hive_db.test_tb(id int, name string);
DML&DQL:
insert into table your_las_catalog.test_hive_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from your_las_catalog.test_hive_db.test_tb;
会话指定 Catalog:sql中仅支持访问一个catalog
set spark.hadoop.hive.metastore.catalog.default=your_las_catalog; --DML&DQL insert into table test_hive_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from test_hive_db.test_tb;
DDL
-- 默认参数 set spark.sql.catalog.spark_iceberg_catalog=org.apache.iceberg.spark.SparkCatalog; set spark.sql.catalog.spark_iceberg_catalog.type=hive; set spark.sql.storeAssignmentPolicy=ansi; use spark_iceberg_catalog; -- 指定自定义Catalog,根据实际情况调整 set spark.hadoop.hive.metastore.catalog.default=your_las_catalog; create database if not exists test_iceberg_db; create table if not EXISTS test_iceberg_db.test_tb(id int, name string);
DML&DQL
-- 默认参数 set spark.sql.catalog.spark_iceberg_catalog=org.apache.iceberg.spark.SparkCatalog; set spark.sql.catalog.spark_iceberg_catalog.type=hive; set spark.sql.storeAssignmentPolicy=ansi; -- 访问方式:自定义catalog.库.表 use spark_iceberg_catalog; insert into table your_las_catalog.test_iceberg_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from your_las_catalog.test_iceberg_db.test_tb; --访问方式:spark_iceberg_catalog.自定义catalog.库.表 insert into table spark_iceberg_catalog.your_las_catalog.test_iceberg_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from spark_iceberg_catalog.your_las_catalog.test_iceberg_db.test_tb;
会话指定Catalog:SQL 中仅支持访问一个 catalog
-- 默认参数 set spark.sql.catalog.spark_iceberg_catalog=org.apache.iceberg.spark.SparkCatalog; set spark.sql.catalog.spark_iceberg_catalog.type=hive; set spark.sql.storeAssignmentPolicy=ansi; use spark_iceberg_catalog; -- 指定自定义Catalog,根据实际情况调整 set spark.hadoop.hive.metastore.catalog.default=your_las_catalog; --访问方式:库.表 insert into table test_iceberg_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from test_iceberg_db.test_tb; -- 访问iceberg元数据表,如history、snapshots等元数据表 set spark.sql.threePartIdentifier.catalogService.enabled=false; select * from test_iceberg_db.test_tb.history;
DDL
-- 默认参数 set spark.sql.catalog.spark_catalog=org.apache.paimon.spark.SparkGenericCatalog; set spark.sql.catalog.spark_catalog.metastore=hive; set spark.sql.storeAssignmentPolicy=ansi; -- 自定义warehouse路径 set spark.sql.catalog.spark_catalog.warehouse=tos://tos_bucket_name/warehouse; -- 指定自定义Catalog,根据实际情况调整 set spark.hadoop.hive.metastore.catalog.default=your_las_catalog; create database if not exists test_paimon_db; create table if not EXISTS test_paimon_db.test_tb(id int, name string);
DML&DQL:
说明
三段式访问支持访问多个 catalog,不支持 DDL。
-- 默认参数 set spark.sql.catalog.spark_catalog=org.apache.paimon.spark.SparkGenericCatalog; set spark.sql.catalog.spark_catalog.metastore=hive; set spark.sql.storeAssignmentPolicy=ansi; -- 自定义warehouse路径 set spark.sql.catalog.spark_catalog.warehouse=tos://tos_bucket_name/warehouse; -- 访问方式:自定义catalog.库.表 insert into table your_las_catalog.test_paimon_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from your_las_catalog.test_paimon_db.test_tb;
会话指定 Catalog:SQL 中仅支持访问一个 catalog
-- 默认参数 set spark.sql.catalog.spark_catalog=org.apache.paimon.spark.SparkGenericCatalog; set spark.sql.catalog.spark_catalog.metastore=hive; set spark.sql.storeAssignmentPolicy=ansi; -- 自定义warehouse路径 set spark.sql.catalog.spark_catalog.warehouse=tos://tos_bucket_name/warehouse; -- 指定自定义Catalog,根据实际情况调整 set spark.hadoop.hive.metastore.catalog.default=your_las_catalog; --访问方式:库.表 insert into table test_paimon_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from test_paimon_db.test_tb;
DDL
-- 默认参数 set spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog; set spark.serializer=org.apache.spark.serializer.KryoSerializer; -- 指定自定义Catalog,根据实际情况调整 set spark.hadoop.hive.metastore.catalog.default=your_las_catalog; create database if not exists test_hudi_db; create table if not EXISTS test_hudi_db.test_tb(id int, name string);
DML&DQL
说明
三段式访问支持访问多个 catalog,不支持 DDL。
-- 默认参数 set spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog; set spark.serializer=org.apache.spark.serializer.KryoSerializer; -- 访问方式:自定义catalog.库.表 insert into table your_las_catalog.test_hudi_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from your_las_catalog.test_hudi_db.test_tb;
会话指定 catalog:SQL 中仅支持访问一个 catalog
-- 默认参数 set spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog; set spark.serializer=org.apache.spark.serializer.KryoSerializer; -- 指定自定义Catalog,根据实际情况调整 set spark.hadoop.hive.metastore.catalog.default=your_las_catalog; --访问方式:库.表 insert into table test_hudi_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from test_hudi_db.test_tb;
DDL
-- 默认参数 set spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog; -- 指定自定义Catalog,根据实际情况调整 set spark.hadoop.hive.metastore.catalog.default=your_las_catalog; create database if not exists test_del_db; create table if not EXISTS test_del_db.test_tb(id int, name string);
DML&DQL
说明
三段式访问支持访问多个 catalog,不支持 DDL。
-- 默认参数 set spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog; -- 访问方式:自定义catalog.库.表 insert into table your_las_catalog.test_del_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from your_las_catalog.test_del_db.test_tb;
会话指定catalog:SQL 中仅支持访问一个 Catalog
-- 默认参数 set spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog; -- 指定自定义Catalog,根据实际情况调整 set spark.hadoop.hive.metastore.catalog.default=your_las_catalog; --访问方式:库.表 insert into table test_del_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from test_del_db.test_tb;