You need to enable JavaScript to run this app.
导航
Serverless Spark读写StarRocks(存算分离)操作手册
最近更新时间:2024.07.11 11:33:27首次发布时间:2024.07.11 11:33:27

环境准备

创建TOS桶

  1. 登录TOS控制台,创建TOS桶,具体创建方式可参考文档:创建TOS桶
  2. 基于您创建的TOS桶创建您所需的目录,该目录作为Starrocks实例的默认存储目录。

Starrocks数据准备

  1. 登录EMR Serverless StarRocks控制台,创建StarRocks实例,具体创建方式可参考火山官网文档:创建StarRocks实例
  2. 连接实例后,执行下列SQL进行数据库&表准备,连接实例方式可见官网文档:连接实例
CREATE DATABASE if not EXISTS online_database;
USE online_database;
CREATE TABLE log_v1 (
    uid int,
    name varchar(64),
    age int, 
    phone varchar(16),
    last_login datetime,
    credits double
)
DUPLICATE KEY(uid, name);

CREATE TABLE log_v2 (
    uid int,
    name varchar(64),
    age int, 
    phone varchar(16),
    last_login datetime,
    credits double
)
DUPLICATE KEY(uid, name);
  1. 数据准备完成后,您可进行数据查看,数据展示内容如下则表示数据准备完成:

图片

Serverless Spark数据准备

  1. 创建资源队列:登录EMR Serverless控制台创建Serverless Spark队列,根据需求创建按量付费或者包年包月队列,当队列状态变为运行中时,您可开始下一步操作;
  2. 创建数据目录:登录湖仓一体分析服务创建自定义数据目录(Catalog),并为其设置TOS存储位置;

说明

默认的数据目录为hive,若使用该数据目录,请为其设置默认的TOS存储位置。

  1. 创建数据库:登录EMR Serverless Spark作业编辑页面,使用默认数据目录hive,创建数据库实例ss_w2r_sr,使用sql如下:
CREATE DATABASE IF NOT EXISTS ss_w2r_sr;
  1. 数据表:登录EMR Serverless Spark作业编辑页面,使用默认数据目录hive,数据库实例ss_w2r_sr创建两张数据表log_v1和log_v2。其中,location为数据库实例TOS地址+数据表名称,数据库实例地址可以通过湖仓一体分析服务查询。
CREATE TABLE if not exists ss_w2r_sr.log_v1(
  `uid` int, 
  `name` string, 
  `age` int, 
  `phone` string, 
  `last_login` TIMESTAMP, 
  `credits` double)
PARTITIONED BY ( 
  `date` string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
location 'tos://{Database TOS storage address}/log_v1';

CREATE TABLE if not exists ss_w2r_sr.log_v2(
  `uid` int, 
  `name` string, 
  `age` int, 
  `phone` string, 
  `last_login` TIMESTAMP, 
  `credits` double)
PARTITIONED BY ( 
  `date` string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
location 'tos://{Database TOS storage address}/log_v2';
  1. 配置安全组:登陆火山引擎,创建安全组实例,在私有网络列表中选择Starrocks实例使用的VPC,并按下图进行入规则设置,完成安全组实例的创建:

图片

执行步骤

  1. 通过Serverless Spark对Starrocks数据表进行读写

登录EMR Serverless Spark作业编辑页面,输入下列SQL代码块中示例的Spark SQL语句,点击作业编辑框右下角的“运行”按钮,执行SQL作业。

-- starrocks fe url
set spark.sql.catalog.starrocks.fe.http.url=http://{local ip address}:8030;
set spark.sql.catalog.starrocks.fe.jdbc.url=jdbc:mysql://{local ip address}:9030;
-- user name with admin right in starrocks instance
set spark.sql.catalog.starrocks.user={user name};
-- user password
set spark.sql.catalog.starrocks.password={user password};
-- starrocks instance TOS endpoint, such as https://tos-s3-cn-beijing.ivolces.com
set spark.sql.catalog.starrocks.fs.s3a.endpoint={endpoint};
-- starrocks instance TOS region, such as cn-beijing
set spark.sql.catalog.starrocks.fs.s3a.endpoint.region={region name};
set spark.sql.catalog.starrocks.fs.s3a.access.key={access key};
set spark.sql.catalog.starrocks.fs.s3a.secret.key={secret key};

set tqs.query.engine.type = sparkcli;
set las.cross.vpc.access.enabled = true;
set las.cross.vpc.vpc.id = {vpc id};
set las.cross.vpc.subnet.id = {subnet id};
set las.cross.vpc.security.group.id = {security group id};
set las.cross.vpc.accountId = {volc account id};

set spark.sql.catalog.starrocks.batch.size=8092;
set spark.sql.catalog.starrocks.writer.mode=BYPASS;
set spark.sql.catalog.starrocks.reader.mode=BYPASS;
set spark.sql.catalog.starrocks.fs.s3a.connection.ssl.enabled=true;
set spark.sql.catalog.starrocks.fs.s3a.path.style.access=false;
set spark.sql.catalog.starrocks.fs.s3a.retry.limit=27;
set spark.sql.catalog.starrocks=com.starrocks.connector.spark.catalog.StarRocksCatalog;
set spark.sql.storeAssignmentPolicy = ansi;
-- depend jar address, please refer to "Dependency JAR addresses" for jar addresses of online regions
set las.spark.jar.depend.jars = [{"fileName":"{dependency jar address}"}];

set spark.shuffle.manager = sort;
set spark.dynamicAllocation.enabled = false;
set spark.sql.serverless.lf.auth.enabled = false;
set spark.sql.extensions=com.starrocks.connector.spark.StarRocksExtensions;

use starrocks;
insert into table online_database.log_v2
select * from online_database.log_v1;
select * from online_database.log_v2;
  1. 通过Serverless Spark读Starrocks数据表,写Hive数据表

登录EMR Serverless Spark作业编辑页面,输入下列SQL代码块中示例的Spark SQL语句,点击作业编辑框右下角的“运行”按钮,执行SQL作业。

-- starrocks fe url
set spark.sql.catalog.starrocks.fe.http.url=http://{local ip address}:8030;
set spark.sql.catalog.starrocks.fe.jdbc.url=jdbc:mysql://{local ip address}:9030;
-- user name with admin right in starrocks instance
set spark.sql.catalog.starrocks.user={user name};
-- user password
set spark.sql.catalog.starrocks.password={user password};
-- starrocks instance TOS endpoint, such as https://tos-s3-cn-beijing.ivolces.com
set spark.sql.catalog.starrocks.fs.s3a.endpoint={endpoint};
-- starrocks instance TOS region, such as cn-beijing
set spark.sql.catalog.starrocks.fs.s3a.endpoint.region={region name};
set spark.sql.catalog.starrocks.fs.s3a.access.key={access key};
set spark.sql.catalog.starrocks.fs.s3a.secret.key={secret key};

set tqs.query.engine.type = sparkcli;
set las.cross.vpc.access.enabled = true;
set las.cross.vpc.vpc.id = {vpc id};
set las.cross.vpc.subnet.id = {subnet id};
set las.cross.vpc.security.group.id = {security group id};
set las.cross.vpc.accountId = {volc account id};

set spark.sql.catalog.starrocks.batch.size=8092;
set spark.sql.catalog.starrocks.writer.mode=BYPASS;
set spark.sql.catalog.starrocks.reader.mode=BYPASS;
set spark.sql.catalog.starrocks.fs.s3a.connection.ssl.enabled=true;
set spark.sql.catalog.starrocks.fs.s3a.path.style.access=false;
set spark.sql.catalog.starrocks.fs.s3a.retry.limit=27;
set spark.sql.catalog.starrocks=com.starrocks.connector.spark.catalog.StarRocksCatalog;
set spark.sql.storeAssignmentPolicy = ansi;
-- depend jar address, please refer to "Dependency JAR addresses" for jar addresses of online regions
set las.spark.jar.depend.jars = [{"fileName":"{dependency jar address}"}];

set spark.shuffle.manager = sort;
set spark.dynamicAllocation.enabled = false;
set spark.sql.serverless.lf.auth.enabled = false;
set spark.sql.extensions=com.starrocks.connector.spark.StarRocksExtensions;
                                                     
use starrocks;
WITH temp AS (
    select uid, name, age, phone, last_login, credits from online_database.log_v1
)
insert overwrite table spark_catalog.ss_w2r_sr.log_v1 partition(date)
select uid + cast(rand() * 900 + 10 as int), CONCAT(name,'_1'), age, phone, last_login, credits, '20240601' from temp;
select * from spark_catalog.ss_w2r_sr.log_v1;
  1. 通过Serverless Spark读Hive数据表,写入Starrocks数据表

登录EMR Serverless Spark作业编辑页面,输入下列SQL代码块中示例的Spark SQL语句,点击作业编辑框右下角的“运行”按钮,执行SQL作业。

-- starrocks fe url
set spark.sql.catalog.starrocks.fe.http.url=http://{local ip address}:8030;
set spark.sql.catalog.starrocks.fe.jdbc.url=jdbc:mysql://{local ip address}:9030;
-- user name with admin right in starrocks instance
set spark.sql.catalog.starrocks.user={user name};
-- user password
set spark.sql.catalog.starrocks.password={user password};
-- starrocks instance TOS endpoint, such as https://tos-s3-cn-beijing.ivolces.com
set spark.sql.catalog.starrocks.fs.s3a.endpoint={endpoint};
-- starrocks instance TOS region, such as cn-beijing
set spark.sql.catalog.starrocks.fs.s3a.endpoint.region={region name};
set spark.sql.catalog.starrocks.fs.s3a.access.key={access key};
set spark.sql.catalog.starrocks.fs.s3a.secret.key={secret key};

set tqs.query.engine.type = sparkcli;
set las.cross.vpc.access.enabled = true;
set las.cross.vpc.vpc.id = {vpc id};
set las.cross.vpc.subnet.id = {subnet id};
set las.cross.vpc.security.group.id = {security group id};
set las.cross.vpc.accountId = {volc account id};

set tqs.query.engine.type = sparkcli;
set las.cross.vpc.access.enabled = true;
set las.cross.vpc.vpc.id = {vpc id};
set las.cross.vpc.subnet.id = {subnet id};
set las.cross.vpc.security.group.id = {security group id};
set las.cross.vpc.accountId = {volc account id};

set spark.sql.catalog.starrocks.batch.size=8092;
set spark.sql.catalog.starrocks.writer.mode=BYPASS;
set spark.sql.catalog.starrocks.reader.mode=BYPASS;
set spark.sql.catalog.starrocks.fs.s3a.connection.ssl.enabled=true;
set spark.sql.catalog.starrocks.fs.s3a.path.style.access=false;
set spark.sql.catalog.starrocks.fs.s3a.retry.limit=27;
set spark.sql.catalog.starrocks=com.starrocks.connector.spark.catalog.StarRocksCatalog;
set spark.sql.storeAssignmentPolicy = ansi;
-- depend jar address, please refer to "Dependency JAR addresses" for jar addresses of online regions
set las.spark.jar.depend.jars = [{"fileName":"{dependency jar address}"}];

set spark.shuffle.manager = sort;
set spark.dynamicAllocation.enabled = false;
set spark.sql.serverless.lf.auth.enabled = false;
set spark.sql.extensions=com.starrocks.connector.spark.StarRocksExtensions;
        
use starrocks;
WITH temp AS (
  select uid, name, age, phone, last_login, credits from spark_catalog.ss_w2r_sr.log_v1
)
insert into table online_database.log_v1 
select * from temp;
select * from online_database.log_v1;

Dependency JAR addresses

Region

JAR地址

华北-北京

tos://emr-serverless-spark/spark/shared-lib/starrocks-connector/starrocks-spark-connector-3.5_2.12-1.1.2-ve-bypass-2.jar

华东-上海

tos://emr-serverless-spark-shanghai/spark/shared-lib/starrocks-connector/starrocks-spark-connector-3.5_2.12-1.1.2-ve-bypass-2.jar

华南-广州

tos://emr-serverless-spark-guangzhou/spark/shared-lib/starrocks-connector/starrocks-spark-connector-3.5_2.12-1.1.2-ve-bypass-2.jar

东南亚-柔佛

tos://emr-serverless-spark-ap-southeast-1/spark/shared-lib/starrocks-connector/starrocks-spark-connector-3.5_2.12-1.1.2-ve-bypass-2.jar