StarRocks 支持通过 Flink 读取或写入数据。您可以使用 Flink Connector 连接 Flink 与 StarRocks 实现数据导入,其原理是在内存中对数据进行攒批,按批次使用 Stream Load 将数据导入 StarRocks。Flink Connector 支持 DataStream API、Table API & SQL,以及 Python API,并且相对于 Flink 官方提供的 JDBC Connector 具备更好的性能和稳定性。
您可以从 Maven 中央仓库 中下载与您 Flink 版本匹配的最新的 flink-connector-starrocks.jar
文件,也可以使用由 EMR 团队提供的 Flink Connector 版本。
EMR 团队提供的 Flink Connector Jar 文件随 Flink 安装包一同附送,您可以在支持部署 Flink 组件的 EMR 集群 /usr/lib/emr/current/flink/connectors
路径下找到对应的 jar 文件。
相对于开源版本的 Flink Connector,我们更加推荐您使用 EMR 团队提供的 Flink Connector 版本,相对而言优势如下:
能够与 EMR 集群,及其周边生态更好的集成。
增加一些 EMR 团队定制开发的竞争力特性。
更加及时修复一些已知的 bug。
不过兼容开源是 EMR 作为开源大数据平台的基本原则,您仍然可以坚持使用开源 Flink Connector 版本。
本小节以导入数据到 StarRocks 明细表 examples.tb_duplicate_key
为例,该表的建表语句如下:
CREATE TABLE IF NOT EXISTS tb_duplicate_key ( event_time BIGINT NOT NULL COMMENT 'timestamp of event', event_type INT NOT NULL COMMENT 'type of event', user_id INT NOT NULL COMMENT 'id of user', device VARCHAR(128) NULL COMMENT 'device' ) ENGINE = OLAP DUPLICATE KEY(event_time, event_type) DISTRIBUTED BY HASH(user_id) PROPERTIES ( 'replication_num' = '3' );
您可以直接通过 Flink SQL 形式将数据写入 StarRocks 对应数据表中,步骤如下:
进入 Flink SQL 交互终端,参考 Flink SQL Client 使用方式 进入 Flink SQL 交互终端,这里以 YARN Session 模式为例。
通过 CREATE TABLE
创建一张 StarRocks tb_duplicate_key
表的映射表,不要求同名:
CREATE TABLE IF NOT EXISTS tb_duplicate_key ( event_time BIGINT NOT NULL COMMENT 'timestamp of event', event_type INT NOT NULL COMMENT 'type of event', user_id INT NOT NULL COMMENT 'id of user', device VARCHAR(128) NULL COMMENT 'device' ) WITH ( 'connector' = 'starrocks', 'jdbc-url' = 'jdbc:mysql://{fe_ip}:9030', 'load-url' = '{fe_ip}:8030', 'database-name' = 'examples', 'table-name' = 'tb_duplicate_key', 'username' = 'system_query_user', 'password' = '***' );
INSERT INTO
操作将数据插入映射表:INSERT INTO tb_duplicate_key VALUES (1703128450, 1, 1001, 'PHONE'), (1703128451, 0, 1002, 'PAD'), (1703128452, 1, 1003, 'TV');
正常情况下,您可以在 StarRocks 中查询到刚刚由 Flink 侧写入的数据。
您也可以按照输入的数据类型编写对应的 Flink DataStream 作业,将数据 Sink 到指定的 StarRocks 表,支持的数据格式包括 CSV、JSON,以及自定义 Java 对象。
本小节演示将内存中构造的 CSV 数据通过 Flink DataStream 方式导入 StarRocks 的 tb_duplicate_key
表,示例代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 模拟 CSV 格式数据 String[] records = new String[]{ "1703128450, 1, 1001, PHONE", "1703128451, 0, 1002, PAD", "1703128452, 1, 1003, TV" }; DataStream<String> source = env.fromElements(records); // 配置 Sink 选项 StarRocksSinkOptions options = StarRocksSinkOptions.builder() .withProperty("jdbc-url", "jdbc:mysql://{fe_ip}:9030") .withProperty("load-url", "{fe_ip}:8030") .withProperty("database-name", "examples") .withProperty("table-name", "tb_duplicate_key") .withProperty("username", "system_query_user") .withProperty("password", "******") .withProperty("sink.properties.format", "csv") .withProperty("sink.properties.column_separator", ",") .build(); // 创建并注册 Sink source.addSink(StarRocksSink.sink(options)); env.execute("load_data_example");
关于如何提交 Flink 任务可以参考 Flink 使用文档。
本小节演示将内存中构造的 JSON 数据通过 Flink DataStream 方式导入 StarRocks 的 tb_duplicate_key
表,示例代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 模拟 JSON 格式数据 String[] records = new String[]{ "{\"event_time\":1703128450,\"event_type\":1,\"user_id\":1001,\"device\":\"PHONE\"}", "{\"event_time\":1703128451,\"event_type\":0,\"user_id\":1002,\"device\":\"PAD\"}", "{\"event_time\":1703128452,\"event_type\":1,\"user_id\":1003,\"device\":\"TV\"}" }; DataStream<String> source = env.fromElements(records); // 配置 Sink 选项 StarRocksSinkOptions options = StarRocksSinkOptions.builder() .withProperty("jdbc-url", "jdbc:mysql://{fe_ip}:9030") .withProperty("load-url", "{fe_ip}:8030") .withProperty("database-name", "examples") .withProperty("table-name", "tb_duplicate_key") .withProperty("username", "system_query_user") .withProperty("password", "******") .withProperty("sink.properties.format", "json") .build(); // 创建并注册 Sink source.addSink(StarRocksSink.sink(options)); env.execute("load_data_example");
关于如何提交 Flink 任务可以参考 Flink 使用文档。
本小节演示将内存中构造的 Java 数据对象通过 Flink DataStream 方式导入 StarRocks 的 tb_duplicate_key
表。假设该表单行数据的数据结构 Record 定义如下:
public static final class Record { private final Long eventTime; private final Integer eventType; private final Integer userId; private final String device; public Record(Long eventTime, Integer eventType, Integer userId, String device) { this.eventTime = eventTime; this.eventType = eventType; this.userId = userId; this.device = device; } // ... getter }
我们需要实现 StarRocksSinkRowBuilder 接口定义 Record 对象到行数组的转换方式:
private static class RecordTransformer implements StarRocksSinkRowBuilder<Record> { @Override public void accept(Object[] row, Record record) { row[0] = record.getEventTime(); row[1] = record.getEventType(); row[2] = record.getUserId(); row[3] = record.getDevice(); // 对于主键表而言,还需要进一步指定最后一个元素,标识当前操作是 UPSERT 或 DELETE // row[row.length - 1] = StarRocksSinkOP.UPSERT.ordinal(); } }
最后模拟在内存中构造 Java 数据对象并实现导入,示例如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 模拟 JSON 格式数据 Record[] records = new Record[]{ new Record(1703128450L, 1, 1001, "PHONE"), new Record(1703128451L, 0, 1002, "PAD"), new Record(1703128452L, 1, 1003, "TV") }; DataStream<Record> source = env.fromElements(records); // 定义表 Schema TableSchema schema = TableSchema.builder() .field("event_time", DataTypes.BIGINT().notNull()) .field("event_type", DataTypes.INT().notNull()) .field("user_id", DataTypes.INT().notNull()) .field("device", DataTypes.VARCHAR(128)) .build(); // 配置 Sink 选项 StarRocksSinkOptions options = StarRocksSinkOptions.builder() .withProperty("jdbc-url", "jdbc:mysql://{fe_ip}:9030") .withProperty("load-url", "{fe_ip}:8030") .withProperty("database-name", "examples") .withProperty("table-name", "tb_duplicate_key") .withProperty("username", "system_query_user") .withProperty("password", "******") .build(); // 创建并注册 Sink source.addSink(StarRocksSink.sink(schema, options, new RecordTransformer())); env.execute("load_data_example");
关于如何提交 Flink 任务可以参考 Flink 使用文档。
参数 | 必须 | 参数值 | 说明 |
---|---|---|---|
connector | 是 | 固定为 starrocks。 | |
jdbc-url | 是 | 配置 FE 节点 MySQL 服务器地址,格式为 jdbc:mysql://<fe_host>:<fe_query_port> ,多个以英文逗号 , 分隔。 | |
load-url | 是 | 配置 FE 节点 HTTP 服务器,格式为 <fe_host>:<fe_http_port> ,多个以英文逗号 , 分隔。 | |
database-name | 是 | 目标导入的 StarRocks 数据库名。 | |
table-name | 是 | 目标导入的 StarRocks 数据表名。 | |
username | 是 | StarRocks 访问用户名称。 | |
password | 是 | StarRocks 访问用户密码。 | |
sink.semantic | 否 |
| Sink 一致性语义,支持 at-least-once 和 exactly-once。 |
sink.version | 否 |
| Flink Connector 底层基于 Stream Load 实现,该参数用户配置使用的 Stream Load 接口版本,支持:
|
sink.label-prefix | 否 | 用于配置 Stream Load 导入任务 label 的前缀,推荐为作业依据具体的业务场景配置 label 前缀。 | |
sink.buffer-flush.max-bytes | 否 |
| 用于配置缓存在内存中的数据量,当缓存数据量达到该阈值后会触发一次 Stream Load 导入。 该参数仅在 |
sink.buffer-flush.max-rows | 否 |
| 用于配置缓存在内存中的数据条数,当缓存数据量达到该阈值后会触发一次 Stream Load 导入。 该参数仅在 |
sink.buffer-flush.interval-ms | 否 |
| 用于配置数据发送的时间间隔。 该参数仅在 |
sink.max-retries | 否 |
| 用于配置最大失败重试次数。 该参数仅在 |
sink.connect.timeout-ms | 否 |
| 用于配置建立连接的超时时间。 该参数默认值自 v1.2.9 版本开始调整为 30000 毫秒,此前为 1000 毫秒。 |
sink.socket.timeout-ms | 否 |
| 用于配置 Socket 连接超时时间,建议参数值要大于 Stream Load |
sink.wait-for-continue.timeout-ms | 否 |
| 用于配置等待 FE HTTP 100-continue 应答的超时时间。 |
sink.ignore.update-before | 否 |
| 用于配置将数据导入到主键表时是否忽略来自 Flink 的 UPDATE_BEFORE 记录,如果设置为 false,则在主键表中视为 DELETE 操作。 |
sink.parallelism | 否 | 用于配置写入并行度,默认由 Flink Planner 决定。
|
您可以访问 StarRocks 官方文档 了解关于使用 Flink Connector 向 StarRocks 导入数据的更多介绍,以及如何使用 Flink Connector 读取 StarRocks 中的数据。