EMR SparkSQL 完全兼容开源 SparkSQL 语法,以下对基本的库表操作做一个说明,其他详细指南可以参考开源 SparkSQL 语法说明。
-- 创建数据库 create database db_demo; -- 创建数据库,指定自定义TOS桶路径进行存储 -- 注意要确保该TOS桶存在,并且当前用户有该桶路径的读写权限 create database db_demo location 'tos://您的tos bucket name/warehouse/'; --查看数据库信息 desc database db_demo; -- 删除数据库 drop database db_demo;
use db_demo; -- 创建表 create table tb_demo(id int, name string); -- 创建表指定指定自定义TOS桶路径进行存储, 创建的表是外表 create table tb_demo(id int, name string) LOCATION 'tos://您的tos bucket name/xxx/tb_demo'; -- 创建外表,效果同上 create EXTERNAL table tb_demo(id int, name string) LOCATION 'tos://您的tos bucket name/xxx/tb_demo'; DESCRIBE EXTENDED tb_demo; -- 描述表信息 desc table tb_demo; -- 查询建表语句 show create table tb_demo; -- 删除表 drop table tb_demo; -- 插入数据 insert into tb_demo select 1,'name1'; -- 查询表数据 select * from tb_demo;
-- 上传UDF, 上传到TOS对应路径 -- 创建udf,默认在hive catalog中 CREATE FUNCTION <schemaName>.<functionName> AS '<funcClassName>' using jar 'tos://您的tos bucket name/您的jar包地址'; -- Spark使用UDF select dbname.udfname('aaabbB'); select hive.dbname.udfname('aaabbB'); -- presto 使用UDF select hive.dbname.udfname('aaabbB');
-- 指定会话级别的默认catalog为test_catalog,全局默认catalog是hive set spark.hadoop.hive.metastore.catalog.default=test_catalog; -- 访问test_catalog下的库表 select * from test_db.test_tb;
{las_catalog}.{db_name}.{table_name}
。说明
三段式访问仅支持 DML&DQL,不支持 DDL。
--查询普通hive表 select * from test_catalog.test_db.test_tb;
说明
库表在 Catalog 中的Hive数据目录下。
create database if not exists test_hive_db; create table if not EXISTS test_hive_db.test_tb(id int, name string); insert into table test_hive_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from test_hive_db.test_tb; drop table if exists test_hive_db.test_tb; drop database if exists test_hive_db;
set spark.hadoop.hive.metastore.catalog.default=test_catalog; --DDL create database if not exists test_hive_db; create table if not EXISTS test_hive_db.test_tb(id int, name string);
说明
三段式访问支持访问多个 Catalog,不支持 DDL。
insert into table test_catalog.test_hive_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from test_catalog.test_hive_db.test_tb;
会话指定 Catalog:sql中仅支持访问一个catalog
set spark.hadoop.hive.metastore.catalog.default=test_catalog; --DML&DQL insert into table test_hive_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from test_hive_db.test_tb;
说明
库表在 Catalog 中的Hive数据目录下。
-- 默认参数 set spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog; set spark.sql.catalog.iceberg.type=hive; set spark.sql.storeAssignmentPolicy=ansi; use iceberg; create database if not exists test_iceberg_db; create table if not EXISTS test_iceberg_db.test_tb(id int, name string); insert into table test_iceberg_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from test_iceberg_db.test_tb; drop table if exists test_iceberg_db.test_tb; drop database if exists test_iceberg_db;
-- 默认参数 set spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog; set spark.sql.catalog.iceberg.type=hive; set spark.sql.storeAssignmentPolicy=ansi; use iceberg; -- 指定自定义Catalog,根据实际情况调整 set spark.hadoop.hive.metastore.catalog.default=test_catalog; create database if not exists test_iceberg_db; create table if not EXISTS test_iceberg_db.test_tb(id int, name string);
说明
三段式访问支持访问多个 Catalog,不支持 DDL。
-- 默认参数 set spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog; set spark.sql.catalog.iceberg.type=hive; set spark.sql.storeAssignmentPolicy=ansi; -- 访问方式:自定义catalog.库.表 use iceberg; insert into table test_catalog.test_iceberg_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from test_catalog.test_iceberg_db.test_tb; --访问方式:iceberg.自定义catalog.库.表 insert into table iceberg.test_catalog.test_iceberg_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from iceberg.test_catalog.test_iceberg_db.test_tb;
会话指定Catalog:SQL 中仅支持访问一个 catalog
-- 默认参数 set spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog; set spark.sql.catalog.iceberg.type=hive; set spark.sql.storeAssignmentPolicy=ansi; use iceberg; -- 指定自定义Catalog,根据实际情况调整 set spark.hadoop.hive.metastore.catalog.default=test_catalog; --访问方式:库.表 insert into table test_iceberg_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from test_iceberg_db.test_tb;
说明
库表在 Catalog 中的Hive数据目录下。
-- 默认参数 set spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog; set spark.sql.catalog.paimon.metastore=hive; set spark.sql.storeAssignmentPolicy=ansi; -- 自定义warehouse路径 set spark.sql.catalog.paimon.warehouse=tos://您的tos bucket name/warehouse; use paimon; create database if not exists test_paimon_db; create table if not EXISTS test_paimon_db.test_tb(id int, name string); insert into table test_paimon_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from test_paimon_db.test_tb; drop table if exists test_paimon_db.test_tb; drop database if exists test_paimon_db;
-- 默认参数 set spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog; set spark.sql.catalog.paimon.metastore=hive; set spark.sql.storeAssignmentPolicy=ansi; -- 自定义warehouse路径 set spark.sql.catalog.paimon.warehouse=tos://您的tos bucket name/warehouse; use paimon; -- 指定自定义Catalog,根据实际情况调整 set spark.hadoop.hive.metastore.catalog.default=test_catalog; create database if not exists test_paimon_db; create table if not EXISTS test_paimon_db.test_tb(id int, name string);
说明
三段式访问支持访问多个 catalog,不支持 DDL。
-- 默认参数 set spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog; set spark.sql.catalog.paimon.metastore=hive; set spark.sql.storeAssignmentPolicy=ansi; -- 自定义warehouse路径 set spark.sql.catalog.paimon.warehouse=tos://您的tos bucket name/warehouse; -- 访问方式:自定义catalog.库.表 use paimon; insert into table test_catalog.test_paimon_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from test_catalog.test_paimon_db.test_tb; --访问方式:paimon.自定义catalog.库.表 insert into table paimon.test_catalog.test_paimon_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from paimon.test_catalog.test_paimon_db.test_tb;
会话指定 Catalog:SQL 中仅支持访问一个 catalog
-- 默认参数 set spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog; set spark.sql.catalog.paimon.metastore=hive; set spark.sql.storeAssignmentPolicy=ansi; -- 自定义warehouse路径 set spark.sql.catalog.paimon.warehouse=tos://您的tos bucket name/warehouse; use paimon; -- 指定自定义Catalog,根据实际情况调整 set spark.hadoop.hive.metastore.catalog.default=test_catalog; --访问方式:库.表 insert into table test_paimon_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from test_paimon_db.test_tb;
说明
库表在 Catalog 中的Hive数据目录下。
-- 默认参数 set spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog; set spark.serializer=org.apache.spark.serializer.KryoSerializer; create database if not exists test_hudi_db; create table if not EXISTS test_hudi_db.test_tb(id int, name string); insert into table test_hudi_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from test_hudi_db.test_tb; drop table if exists test_hudi_db.test_tb; drop database if exists test_hudi_db;
-- 默认参数 set spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog; set spark.serializer=org.apache.spark.serializer.KryoSerializer; -- 指定自定义Catalog,根据实际情况调整 set spark.hadoop.hive.metastore.catalog.default=test_catalog; create database if not exists test_hudi_db; create table if not EXISTS test_hudi_db.test_tb(id int, name string);
说明
三段式访问支持访问多个 catalog,不支持 DDL。
-- 默认参数 set spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog; set spark.serializer=org.apache.spark.serializer.KryoSerializer; -- 访问方式:自定义catalog.库.表 insert into table test_catalog.test_hudi_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from test_catalog.test_hudi_db.test_tb;
会话指定 catalog:SQL 中仅支持访问一个 catalog
-- 默认参数 set spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog; set spark.serializer=org.apache.spark.serializer.KryoSerializer; -- 指定自定义Catalog,根据实际情况调整 set spark.hadoop.hive.metastore.catalog.default=test_catalog; --访问方式:库.表 insert into table test_hudi_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from test_hudi_db.test_tb;
说明
库表在 Catalog 中的Hive数据目录下。
-- 默认参数 set spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog; create database if not exists test_del_db; create table if not EXISTS test_del_db.test_tb(id int, name string); insert into table test_del_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from test_del_db.test_tb; drop table if exists test_del_db.test_tb; drop database if exists test_del_db;
-- 默认参数 set spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog; -- 指定自定义Catalog,根据实际情况调整 set spark.hadoop.hive.metastore.catalog.default=test_catalog; create database if not exists test_del_db; create table if not EXISTS test_del_db.test_tb(id int, name string);
说明
三段式访问支持访问多个catalog,不支持DDL。
-- 默认参数 set spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog; -- 访问方式:自定义catalog.库.表 insert into table test_catalog.test_del_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from test_catalog.test_del_db.test_tb;
会话指定catalog:SQL 中仅支持访问一个 Catalog
-- 默认参数 set spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog; -- 指定自定义Catalog,根据实际情况调整 set spark.hadoop.hive.metastore.catalog.default=test_catalog; --访问方式:库.表 insert into table test_del_db.test_tb values(1, 'hehe'),(2, 'haha'); select * from test_del_db.test_tb;