支持采集 EMR-3.1.1 及以上 StarRocks 集群和 OLAP 服务中 1.1.0 及以上版本全托管 StarRocks 引擎中的数据。
StarRocks 数据源配置时,EMR 集群对应的集群信息、数据库用户名密码需填写正确:
填写的数据库用户名信息,必须拥有相应数据库表的读写权限,来保障任务数据能够被正常读取或写入 StarRocks 中。
必须有账户密码,其中 root 账户无密码,不符合安全规范,数据源配置时无法使用。
可以进入 EMR Serverless OLAP 用户管理页面
登录 EMR Serverless OLAP 控制台。
在顶部左边菜单栏处,根据实际情况选择地域。
单击目标实例的实例名称。
单击用户管理页签。
添加用户在弹出的对话框中,配置以下信息,单击确定。
Mysql数据源和Serverless Flink资源池 建议在同一个私有网络
其 VPC 下的子网和安全组也尽可能保持一致。
若 VPC 不一致时,则需要在 StarRocks 集群的安全管理上,选择白名单组,并添加入方向规则。
Flink 资源池是项目中用来管理计算资源的,资源池中的计算资源相互隔离,相互独立。任务运行和调试需要消耗计算资源,在开发任务前,需要先创建 Flink 资源池。详见:flink资源池创建。
创建Flink任务,参考:开发 Flink SQL 任务。
CREATE DATABASE test; CREATE TABLE test.score_board( id int(11) NOT NULL COMMENT "", name varchar(255) NULL DEFAULT "" COMMENT "", score int(11) NOT NULL DEFAULT "0" COMMENT "", PRIMARY KEY(id) ); #准备若干条数据 INSERT INTO `score_board` VALUES (201, 'mysqlinsert',123), (202, 'mysql-uui',345);
mysql -uxxx -pxxx -h xxx -P 9030
CREATE DATABASE test; CREATE TABLE test.score_board( id int(11) NOT NULL COMMENT "", name varchar(65533) NULL DEFAULT "" COMMENT "", score int(11) NOT NULL DEFAULT "0" COMMENT "" ) ENGINE=OLAP PRIMARY KEY(id) DISTRIBUTED BY HASH(id);
StarRocks 连接器目前仅支持在 Flink 1.16-volcano 引擎版本中使用 。
starrocks的数据源connector定义如下
CREATE TABLE starrocks_table( name VARCHAR, score BIGINT ) WITH ('connector' = 'starrocks','jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port','load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port','database-name' = 'xxx','table-name' = 'xxx','username' = 'xxx','password' = 'xxx' ); SQL
编写数据源写入
CREATE TABLE mysql_jdbc (`id` INT, `name` STRING, `score` INT) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://xxx.rds.ivolces.com:3306/xxx', 'table-name' = 'xxx', 'username' = 'xxx', 'password' = 'xxx' ); CREATE TABLE `score_board_sr` ( `id` INT, `name` STRING, `score` INT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'starrocks', 'jdbc-url' = 'jdbc:mysql://xxx:9030', 'load-url' = 'xxx:8030', 'database-name' = 'xxx', 'table-name' = 'xxx', 'username' = 'xxx', 'password' = 'xxx' ); insert into score_board_sr select * from mysql_jdbc;
启动任务运行
验证StarRocks端可以看到数据通过flink写入成功
通用参数
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 是 | (none) | String | 指定使用的连接器,此处是 starrocks 连接器。 |
jdbc-url | 是 | (none) | String | FE 节点的 IP 和 query 端口信息,如果有多个,需要用逗号(,)分隔。 |
database-name | 是 | (none) | String | StarRocks 数据库名称。 |
table-name | 是 | (none) | String | StarRocks 表名称。 |
username | 是 | (none) | String | StarRocks 用户名称。 |
password | 是 | (none) | String | StarRocks 用户密码。 |
源表参数
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
scan-url | 是 | (none) | String | FE 节点的 IP 和 http 端口信息,如果有多个,需要用逗号(,)分隔。 |
scan.connect.timeout-ms | 否 | 1000 | String | 连接 StarRocks 数据仓库的超时时长,单位毫秒。 |
scan.params.keep-alive-min | 否 | 10 | String | 读取任务的保活时长,单位分钟。 |
scan.params.query-timeout-s | 否 | 600 | String | 读取任务的最大超时时长,单位秒。 |
scan.params.mem-limit-byte | 否 | 1073741824 | String | BE 节点中单个查询的内存上限,单位为 bytes。默认值 1073741824,相当于 1GB。 |
scan.max-retries | 否 | 1 | String | 读取任务失败后的最大重试次数 |
结果表参数
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
load-url | 是 | (none) | String | FE 节点的 IP 和 http 端口信息,如果有多个,需要用逗号(,)分隔。 |
sink.semantic | 否 | at-least-once | String | 数据写入语义。at-least-once:默认值,至少写入一次。exactly-once:仅写入一次,不会出现重复写的情况。说明配置为exactly-once写入语义时,只在 checkpoint 时写数据。注意此时的 sink.buffer-flush.* 相关参数无效。 |
sink.version | 否 | AUTO | String | 数据加载时使用的接口。V1:使用 Stream Load 接口加载数据。V2:使用 Transaction Stream Load 接口加载数据,要求 StarRocks 至少为 2.4 版本。AUTO:判断 StarRocks 是否支持 Transaction Stream Load 接口,然后选择版本。支持则选择 V2,不支持则选择 V1。 |
sink.buffer-flush.max-bytes | 否 | 94371840 | String | 数据写入 StarRocks 前,Buffer 可容纳的最大数据量,范围为[64MB, 10GB]。 |
sink.buffer-flush.max-rows | 否 | 500000 | String | 数据写入 StarRocks 前,Buffer 可容纳的最大数据行数。 |
sink.buffer-flush.interval-ms | 否 | 300000 | String | Buffer 刷新时间间隔,单位为毫秒,取值范围 [64000, 5000000]。 |
sink.max-retries | 否 | 3 | String | 写入任务的最大重试次数,取值范围为 [0, 10]。 |
sink.connect.timeout-ms | 否 | 1000 | String | 连接 StarRocks 数据仓库的超时时长,单位毫秒,取值范围为 100~60000。 |
sink.parallelism | 否 | NULL | String | 指定并行度。 |
sink.properties.* | 否 | (none) | String | 结果表属性。 |
数据类型映射
StarRocks字段类型 | Flink字段类型 |
---|---|
NULL | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
LARGEINT | STRING |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
DECIMALV2 | DECIMAL |
DECIMAL32 | DECIMAL |
DECIMAL64 | DECIMAL |
DECIMAL128 | DECIMAL |
CHAR | CHAR |
VARCHAR | STRING |
Flink 字段类型 | StarRocks 字段类型 |
---|---|
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL | DECIMAL |
BINARY | INT |
CHAR | STRING |
VARCHAR | STRING |
STRING | STRING |
DATE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE(N) | DATETIME |
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N) | DATETIME |
ARRAY | ARRAY |
MAP<KT,VT> | JSON STRING |
ROW | JSON STRING |
其它参考示例
CREATE TABLE datagen_source( id INTEGER, name STRING, score INTEGER ) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.name.length' = '9','fields.id.min' = '1','fields.id.max' = '1000','fields.score.min' = '1','fields.score.max' = '1000' ); CREATE TABLE starrocks_sink( id INTEGER, name STRING, score INTEGER,PRIMARY KEY (id) NOT ENFORCED ) WITH ('connector' = 'starrocks','jdbc-url' = 'jdbc:mysql://172.*.*.*:9030', 'load-url' = '172.*.*.*:8030', 'database-name' = 'xxx','table-name' = 'xxx','username' = 'xxx','password' = 'xxx','sink.parallelism' = '1' ); INSERT INTO starrocks_sink SELECT * FROM datagen_source;