StarRocks Pipeline 连接器可以用作 Pipeline 的 Data Sink,将数据写入 StarRocks。 本文档介绍如何设置 StarRocks Pipeline 连接器。
从 MySQL 读取数据同步到 StarRocks 的 Pipeline 可以定义如下:
source: type: mysql name: MySQL Source hostname: 127.0.0.1 port: 3306 username: admin password: pass tables: adb.\., bdb.user_table_[0-9]+, [app|web].order_\. server-id: 5401-5404 sink: type: starrocks name: StarRocks Sink jdbc-url: jdbc:mysql://127.0.0.1:9030 load-url: 127.0.0.1:8030 username: root password: pass pipeline: name: MySQL to StarRocks Pipeline parallelism: 2
Option | Required | Default | Type | Description |
---|---|---|---|---|
type | required | (none) | String | 指定要使用的连接器, 这里需要设置成 'starrocks'. |
name | optional | (none) | String | Sink 的名称. |
jdbc-url | required | (none) | String | 用于访问 FE 节点上的 MySQL 服务器。多个地址用英文逗号(,)分隔。格式: |
load-url | required | (none) | String | 用于访问 FE 节点上的 HTTP 服务器。多个地址用英文分号(;)分隔。格式: |
username | required | (none) | String | StarRocks 集群的用户名。 |
password | required | (none) | String | StarRocks 集群的用户密码。 |
sink.label-prefix | optional | (none) | String | 指定 Stream Load 使用的 label 前缀。 |
sink.connect.timeout-ms | optional | 30000 | String | 与 FE 建立 HTTP 连接的超时时间。取值范围:[100, 60000]。 |
sink.wait-for-continue.timeout-ms | optional | 30000 | String | 等待 FE HTTP 100-continue 应答的超时时间。取值范围:[3000, 60000]。 |
sink.buffer-flush.max-bytes | optional | 157286400 | Long | 内存中缓冲的数据量大小,缓冲区由所有导入的表共享,达到阈值后将选择一个或多个表的数据写入到StarRocks。 达到阈值后取值范围:[64MB, 10GB]。 |
sink.buffer-flush.interval-ms | optional | 300000 | Long | 每个表缓冲数据发送的间隔,用于控制数据写入 StarRocks 的延迟。单位是毫秒,取值范围:[1000, 3600000]。 |
sink.scan-frequency.ms | optional | 50 | Long | 连接器会定期检查每个表是否到达发送间隔,该配置控制检查频率,单位为毫秒。 |
sink.io.thread-count | optional | 2 | Integer | 用来执行 Stream Load 的线程数,不同表之间的导入可以并发执行。 |
sink.at-least-once.use-transaction-stream-load | optional | TRUE | Boolean | at-least-once 下是否使用 transaction stream load。 |
sink.properties.* | optional | (none) | String | Stream Load 的参数,控制 Stream Load 导入行为。例如 参数 |
table.create.num-buckets | optional | (none) | Integer | 自动创建 StarRocks 表时使用的桶数。对于 StarRocks 2.5 及之后的版本可以不设置,StarRocks 将会 自动设置分桶数量;对于 StarRocks 2.5 之前的版本必须设置。 |
table.create.properties.* | optional | (none) | String | 自动创建 StarRocks 表时使用的属性。比如: 如果使用 StarRocks 3.2 及之后的版本,'table.create.properties.fast_schema_evolution' = 'true' 将会打开 fast schema evolution 功能。 更多信息请参考 主键模型。 |
table.schema-change.timeout | optional | 30min | Duration | StarRocks 侧执行 schema change 的超时时间,必须是秒的整数倍。超时后 StarRocks 将会取消 schema change,从而导致作业失败。 |
table.create.num-buckets
控制。如果使用的 StarRocks 2.5 及之后的版本可以不设置,StarRocks 能够 自动设置分桶数量。对于 StarRocks 2.5 之前的版本必须设置,否则无法自动创建表。table.create.properties.fast_schema_evolution
为 true
来加速 StarRocks 执行变更。CDC type | StarRocks type | NOTE |
---|---|---|
TINYINT | TINYINT | |
SMALLINT | SMALLINT | |
INT | INT | |
BIGINT | BIGINT | |
FLOAT | FLOAT | |
DOUBLE | DOUBLE | |
DECIMAL(p, s) | DECIMAL(p, s) | |
BOOLEAN | BOOLEAN | |
DATE | DATE | |
TIMESTAMP | DATETIME | |
TIMESTAMP_LTZ | DATETIME | |
CHAR(n) where n <= 85 | CHAR(n * 3) | CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。根据 UTF-8 编码,一个中文字符占用三个字节,因此 CDC 中的长度对应到 StarRocks 中为 n * 3。由于 StarRocks CHAR 类型的最大长度为255,所以只有当 CDC 中长度不超过85时,才将 CDC CHAR 映射到 StarRocks CHAR。 |
CHAR(n) where n > 85 | VARCHAR(n * 3) | CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。根据 UTF-8 编码,一个中文字符占用三个字节,因此 CDC 中的长度对应到 StarRocks 中为 n * 3。由于 StarRocks CHAR 类型的最大长度为255,所以当 CDC 中长度超过85时,才将 CDC CHAR 映射到 StarRocks VARCHAR。 |
VARCHAR(n) | VARCHAR(n * 3) | CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。根据 UTF-8 编码,一个中文字符占用三个字节,因此 CDC 中的长度对应到 StarRocks 中为 n * 3。 |