You need to enable JavaScript to run this app.
导航
ByteHouse CDW
最近更新时间:2025.01.16 15:31:24首次发布时间:2023.12.18 17:54:13

在 Flink 控制台,bytehouse-cdw 连接器支持做结果表,可以通过 Flink 任务将数据写入到 ByteHouse 目标表。

背景信息

ByteHouse 是一款云原生数据仓库,云数仓版(CDW)是一个支持实时导入和离线导入的自助数据分析平台,能够对海量数据进行高效分析。
如需了解 ByteHouse 云数仓版更多信息,请参见 ByteHouse 云数仓版简介

使用限制

  • ByteHouse CDW 连接器暂时仅支持在 Flink 1.16-volcano 以上引擎版本中使用。
  • 如果使用火山引擎私有网络,此时需要 ByteHouse CDW 和 Flink 处于相同 VPC,具体设置方法可以参考 ByteHouse CDW 网络设置文档
  • 如果您需要使用更新版本的 ByteHouse Flink Connector ,可以在 ByteHouse CDW Driver 进行下载。

DDL 定义

CREATE TABLE `bh_de_sink` (  
  `id` STRING NOT NULL,
  `event_time` STRING,
  `content` ARRAY<DECIMAL(20, 0)>
)
PARTITIONED BY (`id`)
WITH (
  'connector' = 'bytehouse-cdw',
  'jdbc.enable-gateway-connection' = 'true',
  -- 指定 ByteHouse Gateway 的地域。
  -- 示例VOLCANO_PRIVATE为火山引擎私有网络,此时需要ByteHouse CDW和Flink处于相同VPC。
  'bytehouse.gateway.region' = 'VOLCANO_PRIVATE',
  'bytehouse.gateway.host' = 'tenant-xxxx.bytehouse.ivolces.com',
  'bytehouse.gateway.port' = '19000',
  'bytehouse.gateway.api-token' = '<< your API token >>',
  'bytehouse.gateway.virtual-warehouse' = 'default_vw',
  'database' = 'demo_database',
  'table-name' = 'demo_table'
);

WITH 参数

参数

必选

默认值

数据类型

描述

connector

(none)

String

指定要使用的驱动,这里应该是bytehouse-cdw

database

(none)

String

需要连接的 ByteHouse 云数仓版数据库的名称。

table-name

(none)

String

需要连接的 ByteHouse 云数仓版表的名称。

username

(none)

String

用户名。

password

(none)

String

密码。

jdbc.query.timeout

10 minutes

Duration

通过 JDBC 执行查询的超时设置。

jdbc.enable-gateway-connection

false

Boolean

指定 JDBC 连接是否需要经过 ByteHouse网关。

bytehouse.gateway.region

(none)

String

ByteHouse 网关区域。支持的值包括:

  • VOLCANO_PRIVATE: 适用于与私有网关主机绑定的火山引擎云服务。

bytehouse.gateway.host

(none)

String

ByteHouse 网关的私有主机。前提是将 bytehouse.gateway.region 设置为 VOLCANO_PRIVATE

bytehouse.gateway.port

19000

Integer

ByteHouse 网关的私有端口。前提是将 bytehouse.gateway.region 设置为 VOLCANO_PRIVATE

bytehouse.gateway.virtual-warehouse

(none)

String

通过 ByteHouse Gateway 进行查询处理的计算组的名称或 ID。默认情况下,使用通过 ByteHouse 控制台配置的默认计算组。

bytehouse.gateway.account

(none)

String

ByteHouse 网关账户 ID。

bytehouse.gateway.access-key-id

(none)

String

ByteHouse Gateway 访问密钥 ID。

bytehouse.gateway.secret-key

(none)

String

ByteHouse 网关密钥。

bytehouse.gateway.api-token

(none)

String

ByteHouse 网关API token令牌。

bytehouse.storage.dump-parallelism

1

Integer

存储转储的并行度(即每次插入执行的线程数)。

sink.group-by.key

(none)

String

预接收器分组的密钥。密钥可以由多个字段组成,以逗号分隔。

sink.group-by.expression

(none)

String

预接收器分组的表达式。如果设置了,所有涉及的字段名称也必须在 sink.group-by.key 中列出。

sink.group-by.number-of-groups

(none)

Integer

记录预接收器分组的组数。如果未指定,它将回退到 sink.parallelism(如果提供)。The ByteHouse Gateway secret key.

sink.buffer-flush.interval

1 second

Duration

两次批量刷新之间的最大间隔。最小为 200 毫秒(ms)。

sink.buffer-flush.max-rows

50,000

Integer

刷新前缓冲记录的最大大小。最小为 100。

sink.buffer-flush.max-batches

6

Integer

触发异步刷新过载预防的待处理批次数的阈值。最小为 1。

sink.max-retries

3

Integer

刷新数据失败时的最大尝试次数。将其设置为 -1 表示无限次重试。

sink.parallelism

(none)

Integer

刷新数据的最大并行度。默认情况下,并行度由框架使用与上游链式运算符相同的并行度来确定。

sink.proactive-validate

false

Boolean

指定是否启用主动数据验证(即,在添加到批次之前要验证的每个记录)。默认情况下,使用被动数据验证(即,仅在数据刷新尝试失败时触发数据验证)来减少运行时开销。

sink.mode

insert

String

选择要接收的数据记录。支持的值有:

  • insert:仅接受 INSERT 记录。
  • upsert:(底表为unique table时适用)接受 INSERT、UPDATE_AFTER 和 DELETE 记录
  • upsert-no-delete:(底表为unique table时适用)接受 INSERT 和 UPDATE_AFTER 记录。
  • upsert-all:(底表为unique table时适用)接受所有类型的记录。
  • upsert-all-no-delete:(底表为unique table时适用)接受除 DELETE 之外的所有类型的记录。
  • delete-only:仅接受 DELETE 记录。

sink.checkpoint.wait-for-buffer-drained

false

Boolean

允许检查点等待,直到所有缓冲批次都完全耗尽。如果设置为 false,则所有缓冲批次将在每个检查点期间写入检查点状态。如果已知数据刷新到表的速度很快,建议将其设置为 true。

metrics.update-interval

5 seconds

Duration

刷新指标的固定间隔。 该时间最少为 5 秒。

metrics.log-level

INFO

String

记录指标的日志级别。 这对应于 Log4J 内置的标准日志级别

timestamp-offset

(none)

Duration

时间戳数据的附加时间偏移量。

lookup.async.scale-factor

1

Integer

异步查找的比例因子。如果设置为小于 2,查找将以同步 (SYNC) 模式运行。如果设置为 n(n >= 2),查找将以异步 (ASYNC) 模式运行,每个查找实例的最大并发数等于 n。

lookup.cache.max-rows

0

Long

数据表的查找缓存的最大行数。如果设置为 0,则禁用缓存。

lookup.cache.ttl

(none)

Duration

ByteHouse 表的查找缓存的 TTL。如果未指定,则不为缓存记录设置 TTL。

lookup.max-retries

3

Integer

查找操作失败时允许的最大重试次数。

数据类型

如下表格是 Flink SQL 数据类型ByteHouse 数据类型的映射关系。如果需要了解。

Flink SQL 字段类型

ByteHouse 字段类型

TINYINT

Int8

SMALLINT

Int16

INT

Int32

BIGINT

Int64

DECIMAL(20, 0)

UInt64

FLOAT

Float32

DOUBLE

Float64

DECIMAL(p, s)

Decimal(P, S)

BOOLEAN

Int8

DATE

Date

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

DateTime

STRING

String

BYTES

ARRAY

ARRAY

示例代码

SQL 代码示例

CREATE TABLE `bh_de_source` (  -- source table
  `id` BIGINT NOT NULL,
  `time` TIMESTAMP(0),
  `content` ARRAY<DECIMAL(20, 0)>
)
PARTITIONED BY (`id`)
WITH (
  'connector' = 'datagen'
);

CREATE TABLE `bh_de_sink` (  -- sink table
  `id` STRING NOT NULL,
  `event_time` STRING,
  `content` ARRAY<DECIMAL(20, 0)>
)
PARTITIONED BY (`id`)
WITH (
  'connector' = 'bytehouse-cdw',
  'jdbc.enable-gateway-connection' = 'true',
  'bytehouse.gateway.region' = 'VOLCANO_PRIVATE',
  'bytehouse.gateway.host' = 'tenant-xxxx.bytehouse.ivolces.com',
  'bytehouse.gateway.port' = '19000',
  'bytehouse.gateway.api-token' = '<< your API token >>',
  'bytehouse.gateway.virtual-warehouse' = 'default_vw',
  'database' = 'demo_database',
  'table-name' = 'demo_table'
);

INSERT INTO `bh_de_sink` (  -- data loading
  `id`,
  `event_time`,
  `content`
)
SELECT
  IFNULL(CAST(`id` AS STRING), '43'),  -- type casting with default value
  CAST(`time` AS STRING),  -- type casting
  IFNULL(`content`, 'hello')  -- default value
FROM `bh_de_source`

Java DataStream 代码示例

Flink 连接器的 DataStream API 源数据类型为 RowData。通过 DataStream API 的使用主要流程是通过 CnchSinkFunctionBuilder 获取 CnchSinkFunction 实例,下面是一个演示基本用法的示例。详细参数配置可以参考 ByteHouse 官方文档

StreamExecutionEnvironment env = ...;

// Adding a source to the data stream
DataStream<RowData> dataStream =
    env.addSource(...).returns(TypeInformation.of(RowData.class));

// List of columns representing the table schema
List<Column> columns =
    Arrays.asList(
        Column.physical("year", DataTypes.INT()),
        Column.physical("npc", DataTypes.STRING()),
        Column.physical("offence", DataTypes.STRING()),
        Column.physical("case_no", DataTypes.INT()));

try (@SuppressWarnings("unchecked")
    CnchSinkFunction<RowData, ?> cnchSink =
        new CnchSinkFunctionBuilder.Insert(database, tableName)
            .withSchema(columns)
            .withGatewayConnection("VOLCANO_PRIVATE", "your_host", "your_port")
            .withGatewayApiToken("<< your API token >>")
            .withGatewayVirtualWarehouse("default_vw")
            .withFlushInterval(Duration.ofSeconds(1))
            .build()) {

    // Add the sink to the data stream
    dataStream.addSink(cnchSink);

    // Trigger the execution
    env.execute();
}

常见问题

导入 ByteHouse 后出现未来时间

问题描述

  1. 上游 Kafka 数据源中,有字段为字符串格式时间戳例如 {"t": "2024-10-17 10:00:00"}
  2. Flink 的 DDL 语句中定义 t字段为 TIMESTAMP 类型。
  3. ByteHouse 中下游数据类型为 DateTime 或 DateTime64 两种类型。
  4. 写入 ByteHouse 中时间戳查询结果为 2024-10-17 18:00:00

问题原因:源端(例如 Kafka 数据源)时间戳数据以不带时区的格式输出时(如 “2024-10-17 10:00:00” ),默认按 UTC 解析为 Epoch 时间戳;如果源端业务的时区并非 UTC,那么按上述方式输出时间戳值则会间接地引入 UTC 与源端业务时区之间的小时数偏差。

解决方案

  1. 推荐方案一:Kafka 数据源中时间戳建议使用 UTC 时间例如 "2024-10-17T10:00:00Z",同时 Flink 建表语句中定义字段类型为 TIMESTAMP_LTZ。
  2. 推荐方案二:在 Flink ByteHouse 建表语句 WITH OPTIONS 中增加 'timestamp-offset' = '-8h',这样子在单个 ByteHouse Sink 中生效。
  3. 推荐方案三:在 Flink 任务中统一设置变量,这个和方案二效果类似,但是对 Flink 任务中所有的 ByteHouse Sink 都生效containerized.taskmanager.env.FLINK_WRITE_TO_BYTEHOUSE_TIMESTAMP_OFFSET: -8h