You need to enable JavaScript to run this app.
导航
StarRocks
最近更新时间:2024.10.23 19:54:09首次发布时间:2023.09.12 16:22:50

StarRocks 连接器提供了对 StarRocks 数据仓库的读写能力,支持做数据源表、结果表和维表。

使用限制

  • StarRocks 连接器目前仅支持在 Flink 1.16-volcano 引擎版本中使用 。
  • StarRocks 维表场景建议使用 JDBC 维表替换,效果是相同的,同时 JDBC 使用更广泛,更稳定,参考 JDBC Connector

DDL 定义

CREATE TABLE starrocks_table(
 name VARCHAR,
 score BIGINT
 ) WITH (
 'connector' = 'starrocks',
 'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port',
 'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port',
 'database-name' = 'doc_db',
 'table-name' = 'table1',
 'username' = 'flinkuser',
 'password' = 'flinkpw'
 );

WITH 参数

通用参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器,此处是 starrocks 连接器。

jdbc-url

(none)

String

FE 节点的 IP 和 query 端口信息,如果有多个,需要用逗号(,)分隔。
格式为jdbc:mysql://fe_ip1:query_port,fe_ip2:query_port..

database-name

(none)

String

StarRocks 数据库名称。

table-name

(none)

String

StarRocks 表名称。

username

(none)

String

StarRocks 用户名称。

password

(none)

String

StarRocks 用户密码。

源表参数

参数

是否必选

默认值

数据类型

描述

scan-url

(none)

String

FE 节点的 IP 和 http 端口信息,如果有多个,需要用逗号(,)分隔。
格式为fe_ip1:http_port,fe_ip2:http_port..

scan.connect.timeout-ms

1000

String

连接 StarRocks 数据仓库的超时时长,单位毫秒。

scan.params.keep-alive-min

10

String

读取任务的保活时长,单位分钟。

scan.params.query-timeout-s

600

String

读取任务的最大超时时长,单位秒。

scan.params.mem-limit-byte

1073741824

String

BE 节点中单个查询的内存上限,单位为 bytes。默认值 1073741824,相当于 1GB。

scan.max-retries

1

String

读取任务失败后的最大重试次数

结果表参数

参数

是否必选

默认值

数据类型

描述

load-url

(none)

String

FE 节点的 IP 和 http 端口信息,如果有多个,需要用逗号(,)分隔。
格式为fe_ip1:http_port,fe_ip2:http_port..

sink.semantic

at-least-once

String

数据写入语义。

  • at-least-once:默认值,至少写入一次。
  • exactly-once:仅写入一次,不会出现重复写的情况。

说明

配置为exactly-once写入语义时,只在 checkpoint 时写数据。注意此时的 sink.buffer-flush.* 相关参数无效。

sink.version

AUTO

String

数据加载时使用的接口。

  • V1:使用 Stream Load 接口加载数据。
  • V2:使用 Transaction Stream Load 接口加载数据,要求 StarRocks 至少为 2.4 版本。
  • AUTO:判断 StarRocks 是否支持 Transaction Stream Load 接口,然后选择版本。支持则选择 V2,不支持则选择 V1。

sink.buffer-flush.max-bytes

94371840

String

数据写入 StarRocks 前,Buffer 可容纳的最大数据量,范围为[64MB, 10GB]。
默认值 94371840,相当于 90MB。

sink.buffer-flush.max-rows

500000

String

数据写入 StarRocks 前,Buffer 可容纳的最大数据行数。

sink.buffer-flush.interval-ms

300000

String

Buffer 刷新时间间隔,单位为毫秒,取值范围 [64000, 5000000]。

sink.max-retries

3

String

写入任务的最大重试次数,取值范围为 [0, 10]。

sink.connect.timeout-ms

1000

String

连接 StarRocks 数据仓库的超时时长,单位毫秒,取值范围为 100~60000。

sink.parallelism

NULL

String

指定并行度。
如果不指定并行度,则使用全局并行度。

sink.properties.*

(none)

String

结果表属性。
此处列举三个参数。如需了解更多,请参见STREAM LOAD

  • sink.properties.format:写入 StarRocks 的数据格式,支持 CSV 和 JSON,默认值为csv
  • sink.properties.column_separator:用于指定 CSV 格式的列分隔符,默认值为\t
  • sink.properties.row_delimiter:用于指定 CSV 格式的行分隔符,默认值为\n

维表参数

参数

是否必选

默认值

数据类型

描述

lookup.cache.ttl-ms

5000

Long

维表查询的 cache 超时时间。

注意:StarRocks 维表场景建议都使用 JDBC 维表替换,效果是相同的,同时 JDBC 使用更广泛,更稳定,参考 JDBC Connector

数据类型映射

  • 源表

    StarRocks字段类型

    Flink字段类型

    NULL

    NULL

    BOOLEAN

    BOOLEAN

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    INT

    INT

    BIGINT

    BIGINT

    LARGEINT

    STRING

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    DATE

    DATE

    DATETIME

    TIMESTAMP

    DECIMAL

    DECIMAL

    DECIMALV2

    DECIMAL

    DECIMAL32

    DECIMAL

    DECIMAL64

    DECIMAL

    DECIMAL128

    DECIMAL

    CHAR

    CHAR

    VARCHAR

    STRING

  • 结果表

    Flink 字段类型

    StarRocks 字段类型

    BOOLEAN

    BOOLEAN

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    INTEGER

    INTEGER

    BIGINT

    BIGINT

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    DECIMAL

    DECIMAL

    BINARY

    INT

    CHAR

    STRING

    VARCHAR

    STRING

    STRING

    STRING

    DATE

    DATE

    TIMESTAMP_WITHOUT_TIME_ZONE(N)

    DATETIME

    TIMESTAMP_WITH_LOCAL_TIME_ZONE(N)

    DATETIME

    ARRAY

    ARRAY

    MAP<KT,VT>

    JSON STRING

    ROW

    JSON STRING

示例代码

  • 源表

    CREATE TABLE starrocks_source (
        id INTEGER,
        name STRING,
        score INTEGER,
        PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' = 'starrocks',
      'jdbc-url' = 'jdbc:mysql://172.28.*.*:9030', 
      'scan-url' = '172.28.*.*:8030', 
      'database-name' = 'doc_db',
      'table-name' = 'table1',
      'username' = 'docuser',
      'password' = 'docpw'
    );
    
    CREATE TABLE print_sink (
        id INTEGER,
        name STRING,
        score INTEGER
    ) WITH (
      'connector' = 'print'
    );
    
    INSERT INTO print_sink
    SELECT * FROM starrocks_source;
    
  • 结果表

    CREATE TABLE datagen_source(
      id INTEGER,
      name STRING,
      score INTEGER
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '1',
      'fields.name.length' = '9',
      'fields.id.min' = '1',
      'fields.id.max' = '1000',
      'fields.score.min' = '1',
      'fields.score.max' = '1000'
    );
    
    CREATE TABLE starrocks_sink(
      id INTEGER,
      name STRING,
      score INTEGER,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' = 'starrocks',
      'jdbc-url' = 'jdbc:mysql://172.28.*.*:9030', 
      'load-url' = '172.28.*.*:8030', 
      'database-name' = 'doc_db',
      'table-name' = 'table1',
      'username' = 'docuser',
      'password' = 'docpw',
      'sink.parallelism' = '1'
    );
    
    
    INSERT INTO starrocks_sink
    SELECT * FROM datagen_source;
    
  • 维表(使用 JDBC 维表的方式读取 SR 表)

    CREATE TABLE datagen_source (
        id1 INTEGER,
        name1 STRING,
        score1 INTEGER,
        proc_time as PROCTIME ()
      )
    WITH
      (
        'connector' = 'datagen',
        'rows-per-second' = '1',
        'fields.name1.length' = '9',
        'fields.id1.min' = '1',
        'fields.id1.max' = '1000',
        'fields.score1.min' = '1',
        'fields.score1.max' = '1000'
      );
    
    
    CREATE TABLE starrocks_lookup (
        id INTEGER,
        name STRING,
        score INTEGER,
        PRIMARY KEY (id) NOT ENFORCED
      )
    WITH
      (
        'connector' = 'jdbc',
        'url' = 'jdbc:mysql://192.*.*.*:3306/doc_db',     
        'table-name' = '<yourtablename>',
        'username' = 'admin',          
        'password' = 'MyPa$$w0rd',      
        'lookup.cache.max-rows' = '100',
        'lookup.max-retries' = '3'
      );
    
    
    CREATE TABLE print_sink (
        id1 INTEGER,
        name1 STRING,
        score1 INTEGER,
        id INTEGER,
        name STRING,
        score INTEGER
      )
    WITH
      ('connector' = 'print');
    
    
    INSERT INTO
      print_sink
    SELECT
      datagen_source.id1,
      datagen_source.name1,
      datagen_source.score1,
      starrocks_lookup.id,
      starrocks_lookup.name,
      starrocks_lookup.score
    FROM
      datagen_source
      LEFT JOIN starrocks_lookup FOR SYSTEM_TIME AS OF proc_time ON datagen_source.id1 = starrocks_lookup.id;