You need to enable JavaScript to run this app.
导航
StarRocks Multi-Warehouse(多仓)最佳实践
最近更新时间:2025.03.17 10:57:47首次发布时间:2025.03.17 10:57:47
我的收藏
有用
有用
无用
无用

多仓简介

在存算分离集群中,Warehouse 是一组计算节点(CN),可以为您提供执行查询、导入和数据处理任务所需的计算资源 (CPU、内存和临时存储)。每个 Warehouse 作为一个独立的计算资源池,可以物理隔离计算资源。
在存算分离集群中,数据在多个 Warehouse 之间共享,但不同的 Warehouse 保持计算和内存资源的物理隔离。因此,您可以针对不同的业务需求创建多个 Warehouse ,比如 Ad hoc Query Warehouse 、ETL Warehouse 和 Compaction Warehouse ,并轻松地将特定任务分配至各自的 Warehouse 。

优点

  • 资源隔离

    Multi-Warehouse 允许更细粒度的计算节点调度。您可以将不同的任务分配到不同的 Warehouse ,确保计算资源的物理隔离。

  • 数据共享

    多个 Warehouse 可以共享同一数据存储,授权用户可以通过任何 Warehouse 无缝访问集群数据。

  • 垂直扩展性

    Multi-Warehouse 允许您按需分组计算节点,提供更高的灵活性以应对负载波动。您可以动态启动或停止 Warehouse ,或通过添加或删除无状态的计算节点来扩缩容。

  • 水平扩展性

    您可以轻松创建新 Warehouse 以满足新业务场景的需求,而不会影响现有的 Warehouse 。因为数据无需重新分布,所以现有 Warehouse 上的任务不会受到新 Warehouse 干扰。您可以构建一个集中的大型集群支持多个并发服务,从而避免了维护多个独立集群所带来的复杂性和成本增加。

使用场景

  • 多样化业务工作负载

    您可以将不同类型的工作负载分配给不同的 Warehouse ,以物理隔离计算资源。例如,您可以分配一个 Warehouse 执行查询分析,另一个用于 ETL 处理,从而优化每个 Warehouse 的资源利用率。

  • 集中大型集群

    数据库管理员可以维护一个共同的大型集群,并为每个业务单元创建单独的 Warehouse,有效减少了建立大量小型 StarRocks 集群的需求,从而降低维护开销。

  • 后台任务分离

    您可以在专用的 Warehouse 内隔离执行后台任务,如 Compaction 任务,以避免其干扰正常业务。此外,您可以根据需要调整 Warehouse 资源,以在成本和性能之间取得平衡。

最佳实践

  • 在线-离线混合分析

    Multi-Warehouse 为将在线和离线操作结合的业务提供卓越的资源隔离和数据共享能力。通过将多业务线的数据统一存储,节约了存储成本并简化了集群管理。

  • Ad hoc Query

    您可以根据 Ad hoc Query 请求的需要灵活地为 Warehouse 扩缩容。

  • 离线任务

    在离线任务时间要求严峻的情况下,您可以快速分配资源以保证紧急的任务在要求时间内完成。

创建实例

Image
创建多仓实例时,您需要在实例类型中选择多仓版(存算分离)

说明

  • 与其他形式的实例不同的是,多仓实例允许单独选择 FE 与计算仓库的计费周期,推荐使用包年包月的 FE ,同时,按需配置计算仓库的付费方式。
  • 可以使用一个包年包月的计算仓库,用来执行集群的常设作业,同时使用按量付费的仓库,来应对突发的查询。
  • 在创建页面,必须创建一个 Default 仓库,作为集群的默认仓库,其他仓库需要进入到控制台页面创建。

所有未指定仓库的查询都将路由到 default_Warehouse 中,同时,default_Warehouse 也会执行部分实例后台任务,这些任务包括:

  • Compaction
  • SUBMIT TASK
  • Pipe
  • 统计信息收集
  • 动态分区创建和删除
  • Schema Change
  • AutoVacuum(即在 Compaction 后进行的垃圾文件回收)
  • Garbage Collection(垃圾文件回收)
  • 用于 SHOW DATA 的统计信息报告
  • 异步物化视图刷新
  • ANALYZE TABLE

创建仓库

创建实例后,进入控制台,您可以在“计算仓库”页面找到创建仓库的选项,每个仓库可以有不同的计算、缓存大小。您也可以给每个仓库设置不同的计费方式,其中按量计费可以转为包年包月,包年包月不能转为按量计费。
按量计费仓库有更灵活的弹性(支持扩容、缩容、分时弹性),测试阶段,我们建议您优先使用按量付费仓库,当业务稳定后,再转为包年包月仓库。

点击创建后,系统将为您拉起计算资源,并创建仓库,当仓库处于“运行中”,代表仓库创建完成,您可以向该仓库提交作业。
Image

仓库启停/扩缩容

对于按量付费仓库,多仓版本支持单仓库的启停功能(default_Warehouse 不支持启停),停机后,计算资源不再计费,只收取缓存资源费用,可以在业务低谷期停止仓库,以节省成本。

进入到仓库页面,可以选择弹性设置来配置仓库弹性:

  • 对于包年包月仓库,支持手动弹性扩容,您可以将仓库扩容到目标资源,以适配业务使用。
  • 对于按量付费仓库,支持手动弹性扩缩容分时弹性,您可以灵活的调整仓库大小,或者设置弹性规则,以适配业务需求。

连接仓库

您可以在网络连接页面找到实例的连接信息,并开启公网、设置白名单。
您需要预先创建公网 IP,并绑定在实例上。
Image
Image

监控

在监控页面,我们提供了总览、FE、仓库(CN)的常用监控信息。
其中仓库的监控可以分仓库、分节点查看,更方便监控运维实例的健康状况。
您可以在火山引擎云监控产品配置实例的告警。

用户管理

StarRocks 同时采用了基于角色的访问控制 (RBAC) 和基于身份的访问控制 (IBAC) 以管理集群内的权限,使集群管理员可以轻松地在不同粒度级别上限制集群内的权限。
在 StarRocks 集群中,您可以将权限授予用户或角色。角色是一组权限,可根据需要授予集群内的用户或角色。一个用户或角色可以被授予一个或多个角色,这些角色决定了他们对不同对象的权限。
您可以在控制台“用户管理”页面,管理用户、角色的权限。

注意

StarRocks 3.X 权限系统更新后,用户所拥有的角色不默认生效,需要使用 SET ROLE 命令,或者开启 active_all_roles_on_login。可以在页面横幅一键开启功能。

使用Warehouse执行query、load等常规操作

若操作中未明确指定使用哪个 Warehouse,所有的操作都默认在内置的default_Warehouse中执行。

注意

当前如果 default_Warehouse 中没有节点,会导致后台任务如 compaction,schema change、MV 以及统计信息表创建等操作失败。

为用户指定默认的 Warehouse

可以通过设置用户属性来指定用户链接时默认使用的 Warehouse。
为用户 jack 指定默认的 Warehouse 为 wh1

ALTER USER 'jack' SET PROPERTIES ("session.Warehouse" = "wh1");

查询指定使用某个Warehouse

query 只支持通过 setVar 方式或在当前 session 中手动切换 Warehouse 来指定使用某个 Warehouse。

  • setVar 方式指定:
select /*+SET_VAR(Warehouse="aaa")*/ * from my_db.my_table;
  • 在当前 session 中手动切换:

指定使用某个 Warehouse

set [session] Warehouse [=] xxx 

其中,session=可写可不写,不影响语义。

查看当前使用的 Warehouse

Show variables like "%warehouse%";

指定的 Warehouse 只在当前 session 中生效。不明确指定则使用default_Warehouse。compaction、异步物化视图刷新、统计信息等异步任务都使用default_Warehouse。为了不影响性能,建议在 default_Warehouse 中保留一些 alive 的计算节点。

执行 query 或其他 sql

示例:

select * from my_db.my_table;

导入指定使用某个 Warehouse

streamLoad 指定使用的 Warehouse

将 Warehouse 信息添加到 http 请求的 hheader中。
示例:

curl --location-trusted -u root: \
    -H "label:test_labe56"  -H "timeout:100" -H "max_filter_ratio:1" \
    -H "Warehouse:xxx" \
    -T example1.csv -XPUT http://xxx/api/test/test11/_stream_load

外部事务导入指定使用的 Warehouse

开启一个事务

将 Warehouse 信息添加到 http 请求的 header 中。
示例:

curl --location-trusted -u root: \
    -H label:stream_load_trans_e3cd0c18_14c1_11ee_bee7_00163e135cd7 \
    -H db:stream_load_test_db_e3c600ee_14c1_11ee_bee7_00163e135cd7 \
    -H table:duplicate_table_with_null \
    -H Warehouse:xxx \
    -XPOST http://127.0.0.1:38032/api/transaction/begin

load、prepare、commit 参数不需要指定。

Flink connector 指定使用的 Warehouse

将 Warehouse 信息添加到参数中。
Flink SQL 示例:

CREATE TABLE `score_board` (
    `id` INT,
    `name` STRING,
    `score` INT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'starrocks',
    'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030?sessionVariables=Warehouse=xxx',
    'load-url' = '127.0.0.1:8030',
    'sink.properties.Warehouse' = 'wh1',
    'database-name' = 'test',
    'table-name' = 'score_board',
    'username' = 'root',
    'password' = ''
);

INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100);

Flink DataStream 示例:

/**
 * Generate CSV-format records. Each record has three values separated by "\t". 
 * These values will be loaded to the columns `id`, `name`, and `score` in the StarRocks table.
 */
String[] records = new String[]{
        "1\tstarrocks-csv\t100",
        "2\tflink-csv\t100"
};
DataStream<String> source = env.fromElements(records);

/**
 * Configure the Flink connector with the required properties.
 * You also need to add properties "sink.properties.format" and "sink.properties.column_separator"
 * to tell the Flink connector the input records are CSV-format, and the column separator is "\t".
 * You can also use other column separators in the CSV-format records,
 * but remember to modify the "sink.properties.column_separator" correspondingly.
 */
StarRocksSinkOptions options = StarRocksSinkOptions.builder()
        .withProperty("jdbc-url", jdbcUrl)
        .withProperty("load-url", loadUrl)
        .withProperty("database-name", "test")
        .withProperty("table-name", "score_board")
        .withProperty("username", "root")
        .withProperty("password", "")
        .withProperty("sink.properties.Warehouse", "wh1")
        .withProperty("sink.properties.format", "csv")
        .withProperty("sink.properties.column_separator", "\t")
        .build();
// Create the sink with the options.
SinkFunction<String> starRockSink = StarRocksSink.sink(options);
source.addSink(starRockSink);

jdbcUrl 中需通过 sessionvariables 指定 Warehouse,示例如下:

jdbc:mysql://127.0.0.1:9030?sessionVariables=Warehouse=xxx

Broker Load 指定使用的 Warehouse

LOAD LABEL [<database_name>.]<label_name>
(
    data_desc[, data_desc ...]
)
WITH BROKER
(
    StorageCredentialParams
)
[PROPERTIES
(
    opt_properties
)
]

创建导入任务的语句中 opt_properties 新增 Warehouse参数,默认值为当前 session 的 Warehouse。
示例:

LOAD LABEL test_broker_load_e2a81a1e_674e_11ee_8013_00163e135cd7.path_extra_fields_e50c557c_674e_11ee_8013_00163e135cd9 
(DATA INFILE( "hdfs://172.26.194.238:9000/starrocks_test_data/system_data/hdfs_data/nose-ut-data/broker_load_data/orc_files/hive_hdfs_test_10827_orc/*")
 INTO TABLE `tbl_sr_10827` FORMAT AS "orc" )  
 WITH BROKER hdfs_broker 
 () 
 PROPERTIES("Warehouse"="aaa");

Routine load 指定使用的 Warehouse

job_properties 新增Warehouse参数,默认值为当前 session 的 Warehouse。

CREATE ROUTINE LOAD <database_name>.<job_name> ON <table_name>
[load_properties]
[job_properties]
FROM data_source
[data_source_properties]

示例:

CREATE ROUTINE LOAD routine_load_basic_types_fe824e00_680a_11ee_81d7_00163e135aa9 
on primary_table_types_load 
COLUMNS (k1,k2,k3,k4,k5,k6,v1,v2,v3,v4,v5,v6,v7,v8), 
COLUMNS TERMINATED BY '\t',where v1 != 0 
PROPERTIES (
    "desired_concurrent_number"="1",
    "max_error_number"="1000",
    "max_batch_interval"="7",
    "Warehouse"="ccc"
) 
FROM KAFKA (
    "kafka_broker_list"="ip:port",
    "kafka_topic"="xxx",
    "kafka_partitions"="0",
    "kafka_offsets"="OFFSET_BEGINNING"
);

物化视图指定使用的 Warehouse

create materialized view 语句中 PROPERTIES 新增Warehouse参数,默认值为当前 session 的 Warehouse。

"Warehouse" = "Warehouse_name"

例如要创建一个名为 order_mv 的物化视图,指定后续改写操作都跑在名为aaa的 Warehouse 中,创建语句如下:

CREATE MATERIALIZED VIEW order_mv
DISTRIBUTED BY HASH(`order_id`)
REFRESH ASYNC EVERY (interval 1 MINUTE) 
PROPERTIES ("Warehouse" = "aaa")
AS SELECT 
    order_list.order_id,
    sum(goods.price) as total
FROM order_list INNER JOIN goods ON goods.item_id1 = order_list.item_id2
GROUP BY order_id;

执行成功后,可以在show materialized views where name="order_mv" 命令 和show create materialized view order_mv 看到 Warehouse 信息。fe.audit.log 中也会记录物化视图改写语句的执行情况,其中包含了 Warehouse 字段。

Compaction 使用指定的 Warehouse

compaction 默认是在default_Warehouse中执行的。如果想要指定其他 Warehouse,可通过修改配置项lake_compaction_Warehouse实现*。*

说明

支持在session中动态修改和在fe.conf中修改后重启两种方式。

例如,想要指定名为 aaa 的 Warehouse 做 compaction。

  1. 在 session 中执行ADMIN SET FRONTEND CONFIG ("lake_compaction_Warehouse" = "aaa");,或者在 fe.conf 中写入 lake_compaction_Warehouse=aaa; ** 后重启fe。
  2. 修改后可通过show proc '/compactions' 查看最近的 compaction 记录,最后一列为 Warehouse 信息。

JDBC 链接指定 Warehouse

jdbc.url=jdb    c:mysql://xxxx.xx.xx.xx/dbName?sessionVariables=Warehouse=xxx

Superset 链接指定 Warehouse

参考 支持 Superset | StarRocks 安装 starrocks-sqlalchemy,使用 init_command 设置对应的 Warehouse。

starrocks://root:@192.168.100.88:9030/test?init_command=set Warehouse = xxx