You need to enable JavaScript to run this app.
导航
Spark 对湖仓表使用说明
最近更新时间:2025.04.17 14:12:15首次发布时间:2025.04.17 14:12:15
我的收藏
有用
有用
无用
无用

EMR SparkSQL 完全兼容开源 SparkSQL 语法,以下对基本的库表操作做一个说明,其他详细指南可以参考开源 SparkSQL 语法说明

基础操作

库操作

-- 创建数据库
create database db_demo;

-- 创建数据库,指定自定义TOS桶路径进行存储
-- 注意要确保该TOS桶存在,并且当前用户有该桶路径的读写权限
create database db_demo location 'tos://您的tos bucket name/warehouse/';

--查看数据库信息
desc database db_demo;

-- 删除数据库
drop database db_demo;

表操作

use db_demo;
-- 创建表
create table tb_demo(id int, name string);

-- 创建表指定指定自定义TOS桶路径进行存储, 创建的表是外表
create table tb_demo(id int, name string) LOCATION 'tos://您的tos bucket name/xxx/tb_demo';

-- 创建外表,效果同上
create EXTERNAL table tb_demo(id int, name string) LOCATION 'tos://您的tos bucket name/xxx/tb_demo';
DESCRIBE EXTENDED  tb_demo;

-- 描述表信息
desc table tb_demo;

-- 查询建表语句
show create table tb_demo;

-- 删除表
drop table tb_demo;

-- 插入数据
insert into tb_demo select 1,'name1';

-- 查询表数据
select * from tb_demo;

UDF 操作

-- 上传UDF, 上传到TOS对应路径

-- 创建udf,默认在hive catalog中
CREATE FUNCTION <schemaName>.<functionName> AS '<funcClassName>' using jar 'tos://您的tos bucket name/您的jar包地址';

-- Spark使用UDF
select dbname.udfname('aaabbB');
select hive.dbname.udfname('aaabbB');

-- presto 使用UDF
select hive.dbname.udfname('aaabbB');

访问 Catalog

  • 方式1:参数配置默认 las catalog
-- 指定会话级别的默认catalog为test_catalog,全局默认catalog是hive
set spark.hadoop.hive.metastore.catalog.default=test_catalog;

-- 访问test_catalog下的库表
select * from test_db.test_tb;
  • 方式2:三段式访问, 直接通过三段式访问:{las_catalog}.{db_name}.{table_name}

说明

三段式访问仅支持 DML&DQL,不支持 DDL。

--查询普通hive表
select * from test_catalog.test_db.test_tb;

Spark 访问 hive

访问默认 Catalog

说明

库表在 Catalog 中的Hive数据目录下。

create database if not exists test_hive_db;
create table if not EXISTS test_hive_db.test_tb(id int, name string);

insert into table test_hive_db.test_tb values(1, 'hehe'),(2, 'haha');
select * from test_hive_db.test_tb;

drop table if exists test_hive_db.test_tb;
drop database if exists test_hive_db;

访问自定义 Catalog

  • DDL
set spark.hadoop.hive.metastore.catalog.default=test_catalog;

--DDL
create database if not exists test_hive_db;
create table if not EXISTS test_hive_db.test_tb(id int, name string);
  • DML&DQL:

说明

三段式访问支持访问多个 Catalog,不支持 DDL。

insert into table test_catalog.test_hive_db.test_tb values(1, 'hehe'),(2, 'haha');

select * from test_catalog.test_hive_db.test_tb;

会话指定 Catalog:sql中仅支持访问一个catalog

set spark.hadoop.hive.metastore.catalog.default=test_catalog;

--DML&DQL
insert into table test_hive_db.test_tb values(1, 'hehe'),(2, 'haha');
select * from test_hive_db.test_tb;

Spark 访问 iceberg

访问默认 Catalog

说明

库表在 Catalog 中的Hive数据目录下。

-- 默认参数
set spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog;
set spark.sql.catalog.iceberg.type=hive;
set spark.sql.storeAssignmentPolicy=ansi;
use iceberg;

create database if not exists test_iceberg_db;
create table if not EXISTS test_iceberg_db.test_tb(id int, name string);

insert into table test_iceberg_db.test_tb values(1, 'hehe'),(2, 'haha');
select * from test_iceberg_db.test_tb;

drop table if exists test_iceberg_db.test_tb;
drop database if exists test_iceberg_db;

访问自定义Catalog

  • DDL
-- 默认参数
set spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog;
set spark.sql.catalog.iceberg.type=hive;
set spark.sql.storeAssignmentPolicy=ansi;
use iceberg;

-- 指定自定义Catalog,根据实际情况调整
set spark.hadoop.hive.metastore.catalog.default=test_catalog;

create database if not exists test_iceberg_db;
create table if not EXISTS test_iceberg_db.test_tb(id int, name string);
  • DML&DQL

说明

三段式访问支持访问多个 Catalog,不支持 DDL。

-- 默认参数
set spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog;
set spark.sql.catalog.iceberg.type=hive;
set spark.sql.storeAssignmentPolicy=ansi;

-- 访问方式:自定义catalog.库.表
use iceberg;
insert into table test_catalog.test_iceberg_db.test_tb values(1, 'hehe'),(2, 'haha');
select * from test_catalog.test_iceberg_db.test_tb;

--访问方式:iceberg.自定义catalog.库.表
insert into table iceberg.test_catalog.test_iceberg_db.test_tb values(1, 'hehe'),(2, 'haha');
select * from iceberg.test_catalog.test_iceberg_db.test_tb;

会话指定Catalog:SQL 中仅支持访问一个 catalog

-- 默认参数
set spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog;
set spark.sql.catalog.iceberg.type=hive;
set spark.sql.storeAssignmentPolicy=ansi;
use iceberg;

-- 指定自定义Catalog,根据实际情况调整
set spark.hadoop.hive.metastore.catalog.default=test_catalog;

--访问方式:库.表
insert into table test_iceberg_db.test_tb values(1, 'hehe'),(2, 'haha');
select * from test_iceberg_db.test_tb;

Spark 访问 paimon

访问默认 Catalog

说明

库表在 Catalog 中的Hive数据目录下。

-- 默认参数
set spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog;
set spark.sql.catalog.paimon.metastore=hive;
set spark.sql.storeAssignmentPolicy=ansi;
-- 自定义warehouse路径
set spark.sql.catalog.paimon.warehouse=tos://您的tos bucket name/warehouse;
use paimon;

create database if not exists test_paimon_db;
create table if not EXISTS test_paimon_db.test_tb(id int, name string);

insert into table test_paimon_db.test_tb values(1, 'hehe'),(2, 'haha');
select * from test_paimon_db.test_tb;

drop table if exists test_paimon_db.test_tb;
drop database if exists test_paimon_db;

访问自定义 Catalog

  • DDL
-- 默认参数
set spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog;
set spark.sql.catalog.paimon.metastore=hive;
set spark.sql.storeAssignmentPolicy=ansi;
-- 自定义warehouse路径
set spark.sql.catalog.paimon.warehouse=tos://您的tos bucket name/warehouse;
use paimon;

-- 指定自定义Catalog,根据实际情况调整
set spark.hadoop.hive.metastore.catalog.default=test_catalog;

create database if not exists test_paimon_db;
create table if not EXISTS test_paimon_db.test_tb(id int, name string);
  • DML&DQL:

说明

三段式访问支持访问多个 catalog,不支持 DDL。

-- 默认参数
set spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog;
set spark.sql.catalog.paimon.metastore=hive;
set spark.sql.storeAssignmentPolicy=ansi;
-- 自定义warehouse路径
set spark.sql.catalog.paimon.warehouse=tos://您的tos bucket name/warehouse;

-- 访问方式:自定义catalog.库.表
use paimon;
insert into table test_catalog.test_paimon_db.test_tb values(1, 'hehe'),(2, 'haha');
select * from test_catalog.test_paimon_db.test_tb;


--访问方式:paimon.自定义catalog.库.表
insert into table paimon.test_catalog.test_paimon_db.test_tb values(1, 'hehe'),(2, 'haha');
select * from paimon.test_catalog.test_paimon_db.test_tb;

会话指定 Catalog:SQL 中仅支持访问一个 catalog

-- 默认参数
set spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog;
set spark.sql.catalog.paimon.metastore=hive;
set spark.sql.storeAssignmentPolicy=ansi;
-- 自定义warehouse路径
set spark.sql.catalog.paimon.warehouse=tos://您的tos bucket name/warehouse;
use paimon;

-- 指定自定义Catalog,根据实际情况调整
set spark.hadoop.hive.metastore.catalog.default=test_catalog;

--访问方式:库.表
insert into table test_paimon_db.test_tb values(1, 'hehe'),(2, 'haha');
select * from test_paimon_db.test_tb;

Spark 访问 Hudi

访问默认 Catalog

说明

库表在 Catalog 中的Hive数据目录下。

-- 默认参数
set spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog;
set spark.serializer=org.apache.spark.serializer.KryoSerializer;

create database if not exists test_hudi_db;
create table if not EXISTS test_hudi_db.test_tb(id int, name string);

insert into table test_hudi_db.test_tb values(1, 'hehe'),(2, 'haha');
select * from test_hudi_db.test_tb;

drop table if exists test_hudi_db.test_tb;
drop database if exists test_hudi_db;

访问自定义 Catalog

  • DDL
-- 默认参数
set spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog;
set spark.serializer=org.apache.spark.serializer.KryoSerializer;

-- 指定自定义Catalog,根据实际情况调整
set spark.hadoop.hive.metastore.catalog.default=test_catalog;

create database if not exists test_hudi_db;
create table if not EXISTS test_hudi_db.test_tb(id int, name string);
  • DML&DQL

说明

三段式访问支持访问多个 catalog,不支持 DDL。

-- 默认参数
set spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog;
set spark.serializer=org.apache.spark.serializer.KryoSerializer;

-- 访问方式:自定义catalog.库.表
insert into table test_catalog.test_hudi_db.test_tb values(1, 'hehe'),(2, 'haha');
select * from test_catalog.test_hudi_db.test_tb;

会话指定 catalog:SQL 中仅支持访问一个 catalog

-- 默认参数
set spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog;
set spark.serializer=org.apache.spark.serializer.KryoSerializer;

-- 指定自定义Catalog,根据实际情况调整
set spark.hadoop.hive.metastore.catalog.default=test_catalog;

--访问方式:库.表
insert into table test_hudi_db.test_tb values(1, 'hehe'),(2, 'haha');
select * from test_hudi_db.test_tb;

Spark 访问 Deltalake

访问默认Catalog

说明

库表在 Catalog 中的Hive数据目录下。

-- 默认参数
set spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog;

create database if not exists test_del_db;
create table if not EXISTS test_del_db.test_tb(id int, name string);

insert into table test_del_db.test_tb values(1, 'hehe'),(2, 'haha');
select * from test_del_db.test_tb;

drop table if exists test_del_db.test_tb;
drop database if exists test_del_db;

访问自定义Catalog

  • DDL
-- 默认参数
set spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog;

-- 指定自定义Catalog,根据实际情况调整
set spark.hadoop.hive.metastore.catalog.default=test_catalog;

create database if not exists test_del_db;
create table if not EXISTS test_del_db.test_tb(id int, name string);
  • DML&DQL

说明

三段式访问支持访问多个catalog,不支持DDL。

-- 默认参数
set spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog;

-- 访问方式:自定义catalog.库.表
insert into table test_catalog.test_del_db.test_tb values(1, 'hehe'),(2, 'haha');
select * from test_catalog.test_del_db.test_tb;

会话指定catalog:SQL 中仅支持访问一个 Catalog

-- 默认参数
set spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog;

-- 指定自定义Catalog,根据实际情况调整
set spark.hadoop.hive.metastore.catalog.default=test_catalog;

--访问方式:库.表
insert into table test_del_db.test_tb values(1, 'hehe'),(2, 'haha');
select * from test_del_db.test_tb;