本文介绍行存表的接入方式。
StarRocks的行存表支持Flink Sink,使用方式同列存表一模一样,列存表的使用详情可以参考官方文档。
说明
实时写入推荐使用StarRocks Sink。
示例
在StarRocks中创建表
CREATE DATABASE if not exists test; CREATE TABLE test.usertable( `YCSB_KEY` String, `FIELD0` String, `FIELD1` String, `FIELD2` String, `FIELD3` String, `FIELD4` String, `FIELD5` String, `FIELD6` String, `FIELD7` String, `FIELD8` String, `FIELD9` String) ENGINE=ROW_STORE PRIMARY KEY(YCSB_KEY);
在Flink服务中操作如下步骤
说明
该jar适用于火山EMR环境中提供的Flink 1.16.x版本。您还可以根据Flink环境的不同,在 Maven Central Repository 获取相应版本的 Flink connector JAR 文件
2. 将下载的Flink StarRocks Connector文件复制到Flink集群的 /usr/lib/emr/current/flink/lib目录下: ```shell cp flink-* /usr/lib/emr/current/flink/lib ``` 3. 启动Flink sql 客户端 ```shell cd /usr/lib/emr/current/flink/ # 启动集群 ./bin/yarn-session.sh --detached # 启动SQL ./bin/sql-client.sh embedded ``` 4. 启动Flink作业,向StarRocks导入数据 ```sql SET execution.checkpointing.interval = 10s; -- 定义Source表, 这里用datagen代替 CREATE TABLE datagen ( `YCSB_KEY` String, `FIELD0` String, `FIELD1` String, `FIELD2` String, `FIELD3` String, `FIELD4` String, `FIELD5` String, `FIELD6` String, `FIELD7` String, `FIELD8` String, `FIELD9` String ) WITH ( 'connector' = 'datagen', 'rows-per-second'='100' ); -- 定义sink表 CREATE TABLE UserTable ( `YCSB_KEY` String, `FIELD0` String, `FIELD1` String, `FIELD2` String, `FIELD3` String, `FIELD4` String, `FIELD5` String, `FIELD6` String, `FIELD7` String, `FIELD8` String, `FIELD9` String, PRIMARY KEY (YCSB_KEY) NOT ENFORCED ) WITH ( 'connector' = 'starrocks', 'load-url' = '{fe host}:8030', 'jdbc-url' = 'jdbc:mysql://{fe host}:9030', 'username' = '{user name}', 'password' = '{password}', 'database-name' = 'test', 'table-name' = 'usertable', 'sink.parallelism' = '2' ); INSERT INTO UserTable select YCSB_KEY,FIELD0,FIELD1,FIELD2,FIELD3,FIELD4,FIELD5,FIELD6,FIELD7,FIELD8,FIELD9 FROM datagen; ```
在StarRocks集群上查看test.usertable表中数据
SELECT * FROM test.usertable;
说明
列存表调优时,经常会调大以下Flush参数,提升导入吞吐:
sink.buffer-flush.max-bytes
sink.buffer-flush.max-rows
sink.buffer-flush.interval-ms
但行存的表的逻辑正好相反,行存表能够获得更高的写入QPS和更高的实时性。但整体吞吐性能比列存表相差较大。因此在使用时,不需要调整这3个参数的默认值。
行存表支持绝大多是的DML的语法, 因此可以直接用Flink Mysql JDBC Sink写入到行存表中。
示例
在StarRocks中创建表
CREATE DATABASE if not exists test; CREATE TABLE test.usertable( `YCSB_KEY` String, `FIELD0` String, `FIELD1` String, `FIELD2` String, `FIELD3` String, `FIELD4` String, `FIELD5` String, `FIELD6` String, `FIELD7` String, `FIELD8` String, `FIELD9` String) ENGINE=ROW_STORE PRIMARY KEY(YCSB_KEY);
在Flink服务中操作如下步骤
启动Flink sql 客户端
cd /usr/lib/emr/current/flink/ # 拷贝jdbc connect的jar cp connectors/flink-sql-connector-jdbc-1.16.1.jar lib/ # 启动集群 ./bin/yarn-session.sh --detached # 启动SQL ./bin/sql-client.sh embedded
启动Flink作业,向StarRocks导入数据
SET execution.checkpointing.interval = 10s; -- source表 CREATE TABLE datagen ( `YCSB_KEY` String, `FIELD0` String, `FIELD1` String, `FIELD2` String, `FIELD3` String, `FIELD4` String, `FIELD5` String, `FIELD6` String, `FIELD7` String, `FIELD8` String, `FIELD9` String ) WITH ( 'connector' = 'datagen', 'rows-per-second'='1' ); CREATE TABLE UserTable ( `YCSB_KEY` String, `FIELD0` String, `FIELD1` String, `FIELD2` String, `FIELD3` String, `FIELD4` String, `FIELD5` String, `FIELD6` String, `FIELD7` String, `FIELD8` String, `FIELD9` String ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://{fe host}:9030/test?useServerPrepStmts=true', 'table-name' = 'usertable', 'username' = '{user name}', 'password' = '{password}', 'sink.parallelism' = '80' ); INSERT INTO UserTable SELECT YCSB_KEY,FIELD0,FIELD1,FIELD2,FIELD3,FIELD4,FIELD5,FIELD6,FIELD7,FIELD8,FIELD9 FROM datagen;
说明
在定义JDBC Sink的时候,尽量不要指定Flink表中的PRIMARY KEY,否则会触发Flink的幂等写入,导致性能下降。
除了通过Flink实时写入之外,也可以通过Insert方式进行写入行存表。下面以JDBC代码为例介绍。
示例
采用Java程序编写JDBC代码,实现Insert方式将数据导入行存表:
// 设置 JDBC url String mysqlUrl = "jdbc:mysql://${FE地址}:9030/"; String mysqlUser = "${mysqlUser}"; String mysqlPassword = "${mysqlPassword}"; DriverManager.registerDriver(((java.sql.Driver) Class.forName("com.mysql.cj.jdbc.Driver").getDeclaredConstructor().newInstance())); Connection conn = DriverManager.getConnection(mysqlUrl, mysqlUser, mysqlPassword); Statement statement = conn.createStatement(); statement.execute("CREATE DATABASE IF NOT EXISTS demo"); statement.execute("DROP TABLE IF EXISTS demo.tbl_point_query"); statement.execute("CREATE TABLE IF NOT EXISTS demo.tbl_point_query( \n" + " `k1` int(11),\n" + " `v1` decimal(27, 9) NULL,\n" + " `v2` varchar(30) NULL,\n" + " `v3` varchar(30) NULL\n" + " )\n" + "ENGINE=ROW_STORE\n" + "PRIMARY KEY (k1);"); statement.execute("INSERT INTO demo.tbl_point_query VALUES(1, 1.1, 'a', 'a1'),\n" + "(2, 1.2, 'b', 'a2'),\n" + "(3, 1.3, 'c', 'a3'),\n" + "(4, 1.4, 'd', 'a4');"); String sql = "SELECT * FROM demo.tbl_point_query where k1 = ? and v3 = 'a1' and v2 = ?"; PreparedStatement pstmt = conn.prepareStatement(sql); pstmt.setInt(1, 1); pstmt.setString(2, "a"); ResultSet rs = pstmt.executeQuery(); int num = 0; while (rs.next()) { num += 1; System.out.println(rs.getInt(1)); System.out.println(rs.getInt(2)); System.out.println(rs.getInt(3)); System.out.println(rs.getInt(4)); }
说明
用例中${FE地址}、${mysqlUser}、${mysqlPassword}需要根据实际环境替换
如果您使用的是maven工程,需要在pom.xml文件引入mysql依赖:
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.28</version> </dependency>
行存表支持Stream Load的,用法也与列存表一致。列存表的详细用法直接参考开源StarRocks的StreamLoad章节。
示例
example1.csv
,内容如下:1,Lily,23 2,Rose,23 3,Alice,24 4,Julia,25
在数据库 test_db
中创建目标表table1
CREATE DATABASE IF NOT EXISTS test_db; USE test_db; CREATE TABLE `table1` ( `id` int(11) NOT NULL COMMENT "用户 ID", `name` varchar(65533) NULL COMMENT "用户姓名", `score` int(11) NOT NULL COMMENT "用户得分" ) ENGINE=ROW_STORE PRIMARY KEY(`id`);
创建导入作业
curl --location-trusted -u <username>:<password> -H "label:123" \ -H "Expect:100-continue" \ -H "column_separator:," \ -H "columns: id, name, score" \ -T example1.csv -XPUT \ http://<fe_host>:8030/api/test_db/table1/_stream_load
说明
column_separator参数:定义文件分隔符。example1.csv
文件中包含三列,跟 table1
表的 id
、name
、score
三列一一对应,并用逗号 (,) 作为列分隔符。
导入完成后,查询 table1
表的数据
SELECT * FROM table1;
对于几十GB到上百GB级别的数据量,建议采用Broker Load。Broker Load 是一种异步的导入方式,提交导入作业后,可以通过 SHOW LOAD 语句或者 curl 命令来查看导入作业的结果。
行存表的Broker Load用法也与列存表一致。列存表的详细用法参考StarRocks社区从 HDFS 或外部云存储系统导入数据章节。
示例
创建一个名为 file1.csv 的数据文件,测试数据如下。并将该文件上传到TOS中
1,Lily,23 2,Rose,23 3,Alice,24 4,Julia,25
在StarRocks集群创建 StarRocks目标数据库和表
创建一张名为 table1
的主键模型表。表包含 id
、name
和 score
三列
CREATE DATABASE IF NOT EXISTS test_db; USE test_db; CREATE TABLE `table1` ( `id` int(11) NOT NULL COMMENT "用户 ID", `name` varchar(65533) NULL DEFAULT "" COMMENT "用户姓名", `score` int(11) NOT NULL DEFAULT "0" COMMENT "用户得分" ) ENGINE=ROW_STORE PRIMARY KEY(`id`);
在StarRocks集群中将TOS文件file1.csv
导入到table1表中
TRUNCATE TABLE test_db.table1; LOAD LABEL test_db.lable_3 ( DATA INFILE("s3a://{bucket_name}/input") INTO TABLE table1 COLUMNS TERMINATED BY "," (id, name, score) ) WITH BROKER ( "fs.s3a.access.key" = "xxx", "fs.s3a.secret.key" = "xxx", "fs.s3a.endpoint" = "xxx", "fs.s3a.path.style.access"="false", "fs.s3a.paging.maximum"="1000", "fs.s3a.aws.credentials.provider"="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider", "fs.s3a.connection.ssl.enabled"="false" ) PROPERTIES ( "timeout" = "36000" );
查看导入作业状态
curl --location-trusted -u <username>:<password> \ 'http://<fe_host>:8030/api/test_db/_load_info?label=lable_3'
行存表并不支持多行导入的事务,通过幂等写入保障数据一致,因此行存建表时一定要明确主键属性。
行存表的写入吞吐没有列存表高,因此需要控制导入的并发,防止大规模导入失败。