StarRocks 连接器提供了对 StarRocks 数据仓库的读写能力,支持做数据源表、结果表和维表。
CREATE TABLE starrocks_table( name VARCHAR, score BIGINT ) WITH ( 'connector' = 'starrocks', 'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port', 'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port', 'database-name' = 'doc_db', 'table-name' = 'table1', 'username' = 'flinkuser', 'password' = 'flinkpw' );
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 是 | (none) | String | 指定使用的连接器,此处是 starrocks 连接器。 |
jdbc-url | 是 | (none) | String | FE 节点的 IP 和 query 端口信息,如果有多个,需要用逗号(,)分隔。 |
database-name | 是 | (none) | String | StarRocks 数据库名称。 |
table-name | 是 | (none) | String | StarRocks 表名称。 |
username | 是 | (none) | String | StarRocks 用户名称。 |
password | 是 | (none) | String | StarRocks 用户密码。 |
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
scan-url | 是 | (none) | String | FE 节点的 IP 和 http 端口信息,如果有多个,需要用逗号(,)分隔。 |
scan.connect.timeout-ms | 否 | 1000 | String | 连接 StarRocks 数据仓库的超时时长,单位毫秒。 |
scan.params.keep-alive-min | 否 | 10 | String | 读取任务的保活时长,单位分钟。 |
scan.params.query-timeout-s | 否 | 600 | String | 读取任务的最大超时时长,单位秒。 |
scan.params.mem-limit-byte | 否 | 1073741824 | String | BE 节点中单个查询的内存上限,单位为 bytes。默认值 1073741824,相当于 1GB。 |
scan.max-retries | 否 | 1 | String | 读取任务失败后的最大重试次数 |
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
load-url | 是 | (none) | String | FE 节点的 IP 和 http 端口信息,如果有多个,需要用逗号(,)分隔。 |
sink.semantic | 否 | at-least-once | String | 数据写入语义。
说明 配置为 |
sink.version | 否 | AUTO | String | 数据加载时使用的接口。
|
sink.buffer-flush.max-bytes | 否 | 94371840 | String | 数据写入 StarRocks 前,Buffer 可容纳的最大数据量,范围为[64MB, 10GB]。 |
sink.buffer-flush.max-rows | 否 | 500000 | String | 数据写入 StarRocks 前,Buffer 可容纳的最大数据行数。 |
sink.buffer-flush.interval-ms | 否 | 300000 | String | Buffer 刷新时间间隔,单位为毫秒,取值范围 [64000, 5000000]。 |
sink.max-retries | 否 | 3 | String | 写入任务的最大重试次数,取值范围为 [0, 10]。 |
sink.connect.timeout-ms | 否 | 1000 | String | 连接 StarRocks 数据仓库的超时时长,单位毫秒,取值范围为 100~60000。 |
sink.parallelism | 否 | NULL | String | 指定并行度。 |
sink.properties.* | 否 | (none) | String | 结果表属性。
|
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
lookup.cache.ttl-ms | 否 | 5000 | Long | 维表查询的 cache 超时时间。 |
注意:StarRocks 维表场景建议都使用 JDBC 维表替换,效果是相同的,同时 JDBC 使用更广泛,更稳定,参考 JDBC Connector。
源表
StarRocks字段类型 | Flink字段类型 |
---|---|
NULL | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
LARGEINT | STRING |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
DECIMALV2 | DECIMAL |
DECIMAL32 | DECIMAL |
DECIMAL64 | DECIMAL |
DECIMAL128 | DECIMAL |
CHAR | CHAR |
VARCHAR | STRING |
结果表
Flink 字段类型 | StarRocks 字段类型 |
---|---|
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL | DECIMAL |
BINARY | INT |
CHAR | STRING |
VARCHAR | STRING |
STRING | STRING |
DATE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE(N) | DATETIME |
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N) | DATETIME |
ARRAY | ARRAY |
MAP<KT,VT> | JSON STRING |
ROW | JSON STRING |
源表
CREATE TABLE starrocks_source ( id INTEGER, name STRING, score INTEGER, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'starrocks', 'jdbc-url' = 'jdbc:mysql://172.28.*.*:9030', 'scan-url' = '172.28.*.*:8030', 'database-name' = 'doc_db', 'table-name' = 'table1', 'username' = 'docuser', 'password' = 'docpw' ); CREATE TABLE print_sink ( id INTEGER, name STRING, score INTEGER ) WITH ( 'connector' = 'print' ); INSERT INTO print_sink SELECT * FROM starrocks_source;
结果表
CREATE TABLE datagen_source( id INTEGER, name STRING, score INTEGER ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.name.length' = '9', 'fields.id.min' = '1', 'fields.id.max' = '1000', 'fields.score.min' = '1', 'fields.score.max' = '1000' ); CREATE TABLE starrocks_sink( id INTEGER, name STRING, score INTEGER, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'starrocks', 'jdbc-url' = 'jdbc:mysql://172.28.*.*:9030', 'load-url' = '172.28.*.*:8030', 'database-name' = 'doc_db', 'table-name' = 'table1', 'username' = 'docuser', 'password' = 'docpw', 'sink.parallelism' = '1' ); INSERT INTO starrocks_sink SELECT * FROM datagen_source;
维表(使用 JDBC 维表的方式读取 SR 表)
CREATE TABLE datagen_source ( id1 INTEGER, name1 STRING, score1 INTEGER, proc_time as PROCTIME () ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.name1.length' = '9', 'fields.id1.min' = '1', 'fields.id1.max' = '1000', 'fields.score1.min' = '1', 'fields.score1.max' = '1000' ); CREATE TABLE starrocks_lookup ( id INTEGER, name STRING, score INTEGER, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.*.*.*:3306/doc_db', 'table-name' = '<yourtablename>', 'username' = 'admin', 'password' = 'MyPa$$w0rd', 'lookup.cache.max-rows' = '100', 'lookup.max-retries' = '3' ); CREATE TABLE print_sink ( id1 INTEGER, name1 STRING, score1 INTEGER, id INTEGER, name STRING, score INTEGER ) WITH ('connector' = 'print'); INSERT INTO print_sink SELECT datagen_source.id1, datagen_source.name1, datagen_source.score1, starrocks_lookup.id, starrocks_lookup.name, starrocks_lookup.score FROM datagen_source LEFT JOIN starrocks_lookup FOR SYSTEM_TIME AS OF proc_time ON datagen_source.id1 = starrocks_lookup.id;