You need to enable JavaScript to run this app.
导航
Flink Connector Driver
最近更新时间:2024.09.03 16:32:00首次发布时间:2024.06.14 16:36:54

Flink Connector Driver for ByteHouse 云数仓版连接器专门用于通过 Flink 将数据加载到 ByteHouse云数仓版。
本文将介绍通过 Table API&SQL 和 Flink 的 DataStreamAPI 两种方式连接ByteHouse并处理数据。

准备工作

根据您安装的 Flink 版本,下载匹配的 Flink SQL 或 Flink DataStream API 驱动。

Flink 版本

驱动程序

发布日期

1.18

flink-sql-connector-bytehouse-cdw-1.27.96-1.18.jar
未知大小

2024-07-22

1.17

flink-sql-connector-bytehouse-cdw_2.12-1.27.100_snapshot7-1.17.jar
未知大小

2024-09-02

1.16

flink-sql-connector-bytehouse-cdw_2.12-1.27.100_snapshot7-1.16.jar
未知大小

2024-09-02

1.15

flink-sql-connector-bytehouse-cdw-1.27.93-1.15.jar
未知大小

2024-07-10

本地安装

请运行以下命令,将下载的flink-sql-connector-bytehouse-cdw-${flink-sql-connector-bytehouse-cdw.version}.jar文件安装到本地 Maven 存储库:

mvn install:install-file \
    -Dfile=${your_path}/flink-sql-connector-bytehouse-cdw-${flink-sql-connector-bytehouse-cdw.version}.jar \
    -DgroupId=com.bytedance.bytehouse \
    -DartifactId=flink-sql-connector-bytehouse-cdw \
    -Dversion=${flink-sql-connector-bytehouse-cdw.version} \
    -Dpackaging=jar

然后,您可以使用 IDE(例如 IntelliJ)将其添加为项目中的依赖项。

Maven 依赖

对于要使用 Flink connector 连接器进行编译的 Maven 项目,请将以下依赖项添加到项目的 pom.xml 文件中。scala.version 需要是 2.11 或 2.12,与 Flink 发行版的关联 Scala 版本相对应。${flink-connector-bytehouse-cdw.version} 可以设置为所需的 Flink connector 连接器版本。

<dependency>
    <groupId>com.bytedance.bytehouse</groupId>
    <artifactId>flink-connector-bytehouse-cdw_${scala.version}</artifactId>
    <version>${flink-connector-bytehouse-cdw.version}</version>
</dependency>

然后,将以下存储库添加到 pom.xml文件:

<repository>
    <id>bytedance</id>
    <name>ByteDance Public Repository</name>
    <url>https://artifact.bytedance.com/repository/releases</url>
</repository>

使用示例

下面是通过 FlinkSQL 将数据表单加载到 ByteHouse 云数仓版数据表中的示例。

说明

CREATE TABLE `bh_de_source` (  -- source table
  `id` BIGINT NOT NULL,
  `time` TIMESTAMP(0),
  `content` ARRAY<DECIMAL(20, 0)>
)
PARTITIONED BY (`id`)
WITH (
  'connector' = 'kinesis',
  'stream' = 'demo_stream',
  'format' = 'json',
  'aws.region' = 'cn-north-1',
  'aws.credentials.provider' = 'BASIC',
  'aws.credentials.basic.accesskeyid' = '???',
  'aws.credentials.basic.secretkey' = '???',
  'scan.shard.getrecords.maxretries' = '7'
);

CREATE TABLE `bh_de_sink` (  -- sink table
  `id` STRING NOT NULL,
  `event_time` STRING,
  `content` ARRAY<DECIMAL(20, 0)>
)
PARTITIONED BY (`id`)
WITH (
  'connector' = 'bytehouse-cdw',
  'jdbc.enable-gateway-connection' = 'true',
  'bytehouse.gateway.region' = 'VOLCANO_PRIVATE',
  'bytehouse.gateway.host' = 'tenant-xxxx.bytehouse.ivolces.com',
  'bytehouse.gateway.port' = '19000',
  'bytehouse.gateway.api-token' = '<< your API token >>',
  'bytehouse.gateway.virtual-warehouse' = 'default_vw',
  'database' = 'demo_database',
  'table-name' = 'demo_table'
);

INSERT INTO `bh_de_sink` (  -- data loading
  `id`,
  `event_time`,
  `content`
)
SELECT
  IFNULL(CAST(`id` AS STRING), '43'),  -- type casting with default value
  CAST(`time` AS STRING),  -- type casting
  IFNULL(`content`, 'hello')  -- default value
FROM `bh_de_source`

Flink 连接器的 DataStream API 将源数据类型限制为 RowData
DataStream API 的使用主要是通过 CnchSinkFunctionBuilder 获取 CnchSinkFunction 实例,这有助于连接器的各种配置。下面是一个演示基本用法的示例。

StreamExecutionEnvironment env = ...;

// Adding a source to the data stream
DataStream<RowData> dataStream =
    env.addSource(...).returns(TypeInformation.of(RowData.class));

// List of columns representing the table schema
List<Column> columns =
    Arrays.asList(
        Column.physical("year", DataTypes.INT()),
        Column.physical("npc", DataTypes.STRING()),
        Column.physical("offence", DataTypes.STRING()),
        Column.physical("case_no", DataTypes.INT()));

try (@SuppressWarnings("unchecked")
    CnchSinkFunction<RowData, ?> cnchSink =
        new CnchSinkFunctionBuilder.Insert(database, tableName)
            .withSchema(columns)
            .withGatewayConnection("VOLCANO_PRIVATE", "your_host", "your_port")
            .withGatewayApiToken("<< your API token >>")
            .withGatewayVirtualWarehouse("default_vw")
            .withFlushInterval(Duration.ofSeconds(1))
            .build()) {

    // Add the sink to the data stream
    dataStream.addSink(cnchSink);

    // Trigger the execution
    env.execute();
}

参数说明

参数

必选

默认值

数据类型

描述

connector

(none)

String

指定要使用的驱动,这里应该是bytehouse-cdw

database

(none)

String

需要连接的 ByteHouse 云数仓版数据库的名称。

table-name

(none)

String

需要连接的 ByteHouse 云数仓版表的名称。

username

(none)

String

用户名。

password

(none)

String

密码。

jdbc.query.timeout

10 minutes

Duration

通过 JDBC 执行查询的超时设置。

jdbc.enable-gateway-connection

false

Boolean

指定 JDBC 连接是否需要经过 ByteHouse网关。

bytehouse.gateway.region

(none)

String

ByteHouse 网关区域。支持的值包括:

  • VOLCANO_PRIVATE: 适用于与私有网关主机绑定的火山引擎云服务。

bytehouse.gateway.host

(none)

String

ByteHouse 网关的私有主机。前提是将 bytehouse.gateway.region 设置为 VOLCANO_PRIVATE

bytehouse.gateway.port

19000

Integer

ByteHouse 网关的私有端口。前提是将 bytehouse.gateway.region 设置为 VOLCANO_PRIVATE

bytehouse.gateway.virtual-warehouse

(none)

String

通过 ByteHouse Gateway 进行查询处理的计算组的名称或 ID。默认情况下,使用通过 ByteHouse 控制台配置的默认计算组。

bytehouse.gateway.account

(none)

String

ByteHouse 网关账户 ID。

bytehouse.gateway.access-key-id

(none)

String

ByteHouse Gateway 访问密钥 ID。

bytehouse.gateway.secret-key

(none)

String

ByteHouse 网关密钥。

bytehouse.gateway.api-token

(none)

String

ByteHouse 网关API token令牌。

bytehouse.storage.dump-parallelism

1

Integer

存储转储的并行度(即每次插入执行的线程数)。

sink.group-by.key

(none)

String

预接收器分组的密钥。密钥可以由多个字段组成,以逗号分隔。

sink.group-by.expression

(none)

String

预接收器分组的表达式。如果设置了,所有涉及的字段名称也必须在 sink.group-by.key 中列出。

sink.group-by.number-of-groups

(none)

Integer

记录预接收器分组的组数。如果未指定,它将回退到 sink.parallelism(如果提供)。The ByteHouse Gateway secret key.

sink.buffer-flush.interval

1 second

Duration

两次批量刷新之间的最大间隔。最小为 200 毫秒(ms)。

sink.buffer-flush.max-rows

50,000

Integer

刷新前缓冲记录的最大大小。最小为 100。

sink.buffer-flush.max-batches

6

Integer

触发异步刷新过载预防的待处理批次数的阈值。最小为 1。

sink.max-retries

3

Integer

刷新数据失败时的最大尝试次数。将其设置为 -1 表示无限次重试。

sink.parallelism

(none)

Integer

刷新数据的最大并行度。默认情况下,并行度由框架使用与上游链式运算符相同的并行度来确定。

sink.proactive-validate

false

Boolean

指定是否启用主动数据验证(即,在添加到批次之前要验证的每个记录)。默认情况下,使用被动数据验证(即,仅在数据刷新尝试失败时触发数据验证)来减少运行时开销。

sink.mode

insert

String

选择要接收的数据记录。支持的值有:

  • insert:仅接受 INSERT 记录。
  • upsert:(底表为unique table时适用)接受 INSERT、UPDATE_AFTER 和 DELETE 记录
  • upsert-no-delete:(底表为unique table时适用)接受 INSERT 和 UPDATE_AFTER 记录。
  • upsert-all:(底表为unique table时适用)接受所有类型的记录。
  • upsert-all-no-delete:(底表为unique table时适用)接受除 DELETE 之外的所有类型的记录。
  • delete-only:仅接受 DELETE 记录。

sink.checkpoint.wait-for-buffer-drained

false

Boolean

允许检查点等待,直到所有缓冲批次都完全耗尽。如果设置为 false,则所有缓冲批次将在每个检查点期间写入检查点状态。如果已知数据刷新到表的速度很快,建议将其设置为 true。

metrics.update-interval

5 seconds

Duration

刷新指标的固定间隔。 该时间最少为 5 秒。

metrics.log-level

INFO

String

记录指标的日志级别。 这对应于 Log4J 内置的标准日志级别

timestamp-offset

(none)

Duration

时间戳数据的附加时间偏移量。

lookup.async.scale-factor

1

Integer

异步查找的比例因子。如果设置为小于 2,查找将以同步 (SYNC) 模式运行。如果设置为 n(n >= 2),查找将以异步 (ASYNC) 模式运行,每个查找实例的最大并发数等于 n。

lookup.cache.max-rows

0

Long

数据表的查找缓存的最大行数。如果设置为 0,则禁用缓存。

lookup.cache.ttl

(none)

Duration

ByteHouse 表的查找缓存的 TTL。如果未指定,则不为缓存记录设置 TTL。

lookup.max-retries

3

Integer

查找操作失败时允许的最大重试次数。