You need to enable JavaScript to run this app.
导航
使用Serveless Flink实现MySQL到StarRocks数据集成
最近更新时间:2024.04.29 11:13:20首次发布时间:2024.04.15 11:50:33

1 支持的版本

支持采集 EMR-3.1.1 及以上 StarRocks 集群和 OLAP 服务中 1.1.0 及以上版本全托管 StarRocks 引擎中的数据。

2 使用前提

2.1 准备MySQL的数据库信息

  • 准备云数据库MySQL,建立在同一个VPC网络内,您可前往控制台创建MySQL实例

alt

2.2 获取StarRocks的数据库信息

  1. StarRocks 数据源配置时,EMR 集群对应的集群信息、数据库用户名密码需填写正确:

  2. 填写的数据库用户名信息,必须拥有相应数据库表的读写权限,来保障任务数据能够被正常读取或写入 StarRocks 中。

    1. 必须有账户密码,其中 root 账户无密码,不符合安全规范,数据源配置时无法使用。

    2. 用户名密码获取方式,您可在 EMR 集群详情 > 服务列表 > StarRocks 服务名称 > 服务参数 > starrocks-env 参数文件下,看到 StarRocks 已经预置了一个账户和密码,推荐使用该账户/密码来配置 StarRocks 数据源。 其余用户创建方式详见StarRocks官网 CREATE USER

  3. EMR StarRocks 集群和独享集成资源组中的 VPC 必须一致。

    1. Mysql数据源和Serverless Flink资源池 建议在同一个私有网络

    2. 其 VPC 下的子网和安全组也尽可能保持一致。

    3. 若 VPC 不一致时,则需要在 StarRocks 集群的安全组上,在入方向规则处,添加独享集成资源组子网的 IP 网段:

      • 在 EMR StarRocks 集群详情界面,进入集群所在的安全组,并添加入方向规则。

      • 在弹窗中,填写独享集成资源组子网的 IP 网段:

2.3 创建Serveless Flink集群

  • Flink 资源池是项目中用来管理计算资源的,资源池中的计算资源相互隔离,相互独立。任务运行和调试需要消耗计算资源,在开发任务前,需要先创建 Flink 资源池。详见:flink资源池创建

  • 创建Flink任务,参考:开发 Flink SQL 任务

3 MySQL 导入到StarRocks链路

3.1 准备MySQL数据源

CREATE DATABASE test;

CREATE TABLE test.score_board(
    id int(11) NOT NULL COMMENT "",
    name varchar(255) NULL DEFAULT "" COMMENT "",
    score int(11) NOT NULL DEFAULT "0" COMMENT "",
PRIMARY KEY(id)
);


#准备若干条数据
INSERT INTO `score_board` VALUES (201, 'mysqlinsert',123), (202, 'mysql-uui',345);

3.2 准备StarRocks 表

mysql -uxxx -pxxx -h xxx -P 9030

CREATE DATABASE test;

CREATE TABLE test.score_board(
    id int(11) NOT NULL COMMENT "",
    name varchar(65533) NULL DEFAULT "" COMMENT "",
    score int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(id)
DISTRIBUTED BY HASH(id);

StarRocks 连接器目前仅支持在 Flink 1.16-volcano 引擎版本中使用 。

3.3 编写 Flink任务运行导入

starrocks的数据源connector定义如下

CREATE TABLE starrocks_table(
 name VARCHAR,
 score BIGINT
 ) WITH ('connector' = 'starrocks','jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port','load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port','database-name' = 'xxx','table-name' = 'xxx','username' = 'xxx','password' = 'xxx'
 );
SQL

编写数据源写入

CREATE TABLE
  mysql_jdbc (`id` INT, `name` STRING, `score` INT)
WITH
  (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://xxx.rds.ivolces.com:3306/xxx',
    'table-name' = 'xxx',
    'username' = 'xxx',
    'password' = 'xxx'
  );

CREATE TABLE
  `score_board_sr` (
    `id` INT,
    `name` STRING,
    `score` INT,
    PRIMARY KEY (id) NOT ENFORCED
  )
WITH
  (
    'connector' = 'starrocks',
    'jdbc-url' = 'jdbc:mysql://xxx:9030',
    'load-url' = 'xxx:8030',
    'database-name' = 'xxx',
    'table-name' = 'xxx',
    'username' = 'xxx',
    'password' = 'xxx'
  );

insert into
  score_board_sr
select
  *
from
  mysql_jdbc;

启动任务运行

验证StarRocks端可以看到数据通过flink写入成功

4 详细参数介绍

通用参数

参数是否必选默认值数据类型描述
connector(none)String指定使用的连接器,此处是 starrocks 连接器。

jdbc-url

(none)

String

FE 节点的 IP 和 query 端口信息,如果有多个,需要用逗号(,)分隔。
格式为jdbc:mysql://fe_ip1:query_port,fe_ip2:query_port..。

database-name(none)StringStarRocks 数据库名称。
table-name(none)StringStarRocks 表名称。
username(none)StringStarRocks 用户名称。
password(none)StringStarRocks 用户密码。

源表参数

参数是否必选默认值数据类型描述

scan-url

(none)

String

FE 节点的 IP 和 http 端口信息,如果有多个,需要用逗号(,)分隔。
格式为fe_ip1:http_port,fe_ip2:http_port..。

scan.connect.timeout-ms1000String连接 StarRocks 数据仓库的超时时长,单位毫秒。
scan.params.keep-alive-min10String读取任务的保活时长,单位分钟。
scan.params.query-timeout-s600String读取任务的最大超时时长,单位秒。
scan.params.mem-limit-byte1073741824StringBE 节点中单个查询的内存上限,单位为 bytes。默认值 1073741824,相当于 1GB。
scan.max-retries1String读取任务失败后的最大重试次数

结果表参数

参数是否必选默认值数据类型描述

load-url

(none)

String

FE 节点的 IP 和 http 端口信息,如果有多个,需要用逗号(,)分隔。
格式为fe_ip1:http_port,fe_ip2:http_port..。

sink.semanticat-least-onceString数据写入语义。at-least-once:默认值,至少写入一次。exactly-once:仅写入一次,不会出现重复写的情况。说明配置为exactly-once写入语义时,只在 checkpoint 时写数据。注意此时的 sink.buffer-flush.* 相关参数无效。
sink.versionAUTOString数据加载时使用的接口。V1:使用 Stream Load 接口加载数据。V2:使用 Transaction Stream Load 接口加载数据,要求 StarRocks 至少为 2.4 版本。AUTO:判断 StarRocks 是否支持 Transaction Stream Load 接口,然后选择版本。支持则选择 V2,不支持则选择 V1。

sink.buffer-flush.max-bytes

94371840

String

数据写入 StarRocks 前,Buffer 可容纳的最大数据量,范围为[64MB, 10GB]。
默认值 94371840,相当于 90MB。

sink.buffer-flush.max-rows500000String数据写入 StarRocks 前,Buffer 可容纳的最大数据行数。
sink.buffer-flush.interval-ms300000StringBuffer 刷新时间间隔,单位为毫秒,取值范围 [64000, 5000000]。
sink.max-retries3String写入任务的最大重试次数,取值范围为 [0, 10]。
sink.connect.timeout-ms1000String连接 StarRocks 数据仓库的超时时长,单位毫秒,取值范围为 100~60000。

sink.parallelism

NULL

String

指定并行度。
如果不指定并行度,则使用全局并行度。

sink.properties.*

(none)

String

结果表属性。
此处列举三个参数。如需了解更多,请参见STREAM LOAD。sink.properties.format:写入 StarRocks 的数据格式,支持 CSV 和 JSON,默认值为csv。sink.properties.column_separator:用于指定 CSV 格式的列分隔符,默认值为\t。sink.properties.row_delimiter:用于指定 CSV 格式的行分隔符,默认值为\n。

数据类型映射

  • 源表
StarRocks字段类型Flink字段类型
NULLNULL
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
LARGEINTSTRING
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
DATETIMETIMESTAMP
DECIMALDECIMAL
DECIMALV2DECIMAL
DECIMAL32DECIMAL
DECIMAL64DECIMAL
DECIMAL128DECIMAL
CHARCHAR
VARCHARSTRING
  • 结果表
Flink 字段类型StarRocks 字段类型
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTEGERINTEGER
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DECIMALDECIMAL
BINARYINT
CHARSTRING
VARCHARSTRING
STRINGSTRING
DATEDATE
TIMESTAMP_WITHOUT_TIME_ZONE(N)DATETIME
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N)DATETIME
ARRAYARRAY
MAP<KT,VT>JSON STRING
ROWJSON STRING

其它参考示例

  • 结果表示例
CREATE TABLE datagen_source(
  id INTEGER,
  name STRING,
  score INTEGER
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.name.length' = '9','fields.id.min' = '1','fields.id.max' = '1000','fields.score.min' = '1','fields.score.max' = '1000'
);

CREATE TABLE starrocks_sink(
  id INTEGER,
  name STRING,
  score INTEGER,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'starrocks','jdbc-url' = 'jdbc:mysql://172.*.*.*:9030', 'load-url' = '172.*.*.*:8030', 'database-name' = 'xxx','table-name' = 'xxx','username' = 'xxx','password' = 'xxx','sink.parallelism' = '1'
);

INSERT INTO starrocks_sink
SELECT * FROM datagen_source;