You need to enable JavaScript to run this app.
导航
ByteHouse CDW
最近更新时间:2025.02.10 10:49:01首次发布时间:2023.12.18 17:54:13

在 Flink 控制台,bytehouse-cdw 连接器支持做结果表,可以通过 Flink 任务将数据写入到 ByteHouse 目标表。

背景信息

ByteHouse 是一款云原生数据仓库,云数仓版(CDW)是一个支持实时导入和离线导入的自助数据分析平台,能够对海量数据进行高效分析。
如需了解 ByteHouse 云数仓版更多信息,请参见 ByteHouse 云数仓版简介

使用限制

  • ByteHouse CDW 连接器暂时仅支持在 Flink 1.16-volcano 及以上引擎版本中使用。
  • 如果使用火山引擎私有网络,建议需要 ByteHouse CDW 和 Flink 处于相同 VPC,具体设置方法可以参考 ByteHouse CDW 网络设置文档
  • 如果您需要使用更新版本的 ByteHouse Flink Connector ,可以在 ByteHouse CDW Driver 进行下载。

DDL 定义

CREATE TABLE `bh_de_sink` ( -- sink table
 `id` STRING NOT NULL,
 `event_time` STRING,
 `content` ARRAY<DECIMAL(20, 0)>,
 PRIMARY KEY (`id`) NOT ENFORCED -- 主键字段对应 BH 唯一键表的 Unique Key,在 BH 入库的时候会进行去重
) WITH (
 'connector' = 'bytehouse-cdw',
 'jdbc.enable-gateway-connection' = 'true',
 'bytehouse.gateway.region' = 'VOLCANO_PRIVATE',
 'bytehouse.gateway.host' = 'tenant-xxxx-cn-beijing.bytehouse.ivolces.com',
 'bytehouse.gateway.port' = '19000',
 'bytehouse.gateway.api-token' = '<API_KEY>',
 'bytehouse.gateway.virtual-warehouse' = '<VIRTUAL_WAREHOUSE>',
 'database' = '<BYTEHOUSE_DB>',
 'table-name' = '<BYTEHOUSE_TABLE>'
);

WITH 参数

参数

必选

默认值

数据类型

描述

connector

(none)

String

指定要使用的驱动,这里应该是bytehouse-cdw

database

(none)

String

需要连接的 ByteHouse 云数仓版数据库的名称。

table-name

(none)

String

需要连接的 ByteHouse 云数仓版表的名称。

jdbc.query.timeout

10 minutes

Duration

通过 JDBC 执行查询的超时设置。

jdbc.enable-gateway-connection

false

Boolean

指定 JDBC 连接是否需要经过 ByteHouse网关。

bytehouse.gateway.region

(none)

String

ByteHouse 网关区域。支持的值包括:

  • VOLCANO_PRIVATE: 适用于与私有网关主机绑定的火山引擎云服务。

bytehouse.gateway.host

(none)

String

ByteHouse 网关的私有主机。前提是将 bytehouse.gateway.region 设置为 VOLCANO_PRIVATE

bytehouse.gateway.port

19000

Integer

ByteHouse 网关的私有端口。前提是将 bytehouse.gateway.region 设置为 VOLCANO_PRIVATE

bytehouse.gateway.virtual-warehouse

(none)

String

通过 ByteHouse Gateway 进行查询处理的计算组的名称或 ID。默认情况下,使用通过 ByteHouse 控制台配置的默认计算组。

bytehouse.gateway.account

(none)

String

ByteHouse 网关账户 ID。

bytehouse.gateway.access-key-id

(none)

String

ByteHouse Gateway 访问密钥 ID。

bytehouse.gateway.secret-key

(none)

String

ByteHouse 网关密钥。

bytehouse.gateway.api-token

(none)

String

ByteHouse 网关API token令牌。

bytehouse.storage.dump-parallelism

1

Integer

存储转储的并行度(即每次插入执行的线程数)。

sink.group-by.key

(none)

String

预接收器分组的密钥。密钥可以由多个字段组成,以逗号分隔。

sink.group-by.expression

(none)

String

预接收器分组的表达式。如果设置了,所有涉及的字段名称也必须在 sink.group-by.key 中列出。

sink.group-by.number-of-groups

(none)

Integer

记录预接收器分组的组数。如果未指定,它将回退到 sink.parallelism(如果提供)。The ByteHouse Gateway secret key.

sink.buffer-flush.interval

1 second

Duration

两次批量刷新之间的最大间隔。最小为 200 毫秒(ms)。

sink.buffer-flush.max-rows

50,000

Integer

刷新前缓冲记录的最大大小。最小为 100。

sink.buffer-flush.max-batches

6

Integer

触发异步刷新过载预防的待处理批次数的阈值。最小为 1。

sink.max-retries

3

Integer

刷新数据失败时的最大尝试次数。将其设置为 -1 表示无限次重试。

sink.parallelism

(none)

Integer

刷新数据的最大并行度。默认情况下,并行度由框架使用与上游链式运算符相同的并行度来确定。

sink.proactive-validate

false

Boolean

指定是否启用主动数据验证(即,在添加到批次之前要验证的每个记录)。默认情况下,使用被动数据验证(即,仅在数据刷新尝试失败时触发数据验证)来减少运行时开销。

sink.mode

insert

String

选择要接收的数据记录。支持的值有:

  • insert:仅接受 INSERT 记录。
  • upsert:(底表为unique table时适用)接受 INSERT、UPDATE_AFTER 和 DELETE 记录
  • upsert-no-delete:(底表为unique table时适用)接受 INSERT 和 UPDATE_AFTER 记录。
  • upsert-all:(底表为unique table时适用)接受所有类型的记录。
  • upsert-all-no-delete:(底表为unique table时适用)接受除 DELETE 之外的所有类型的记录。
  • delete-only:仅接受 DELETE 记录。

sink.checkpoint.wait-for-buffer-drained

false

Boolean

允许检查点等待,直到所有缓冲批次都完全耗尽。如果设置为 false,则所有缓冲批次将在每个检查点期间写入检查点状态。如果已知数据刷新到表的速度很快,建议将其设置为 true。

metrics.update-interval

5 seconds

Duration

刷新指标的固定间隔。 该时间最少为 5 秒。

metrics.log-level

INFO

String

记录指标的日志级别。 这对应于 Log4J 内置的标准日志级别

timestamp-offset

(none)

Duration

时间戳数据的附加时间偏移量。

lookup.async.scale-factor

1

Integer

异步查找的比例因子。如果设置为小于 2,查找将以同步 (SYNC) 模式运行。如果设置为 n(n >= 2),查找将以异步 (ASYNC) 模式运行,每个查找实例的最大并发数等于 n。

lookup.cache.max-rows

0

Long

数据表的查找缓存的最大行数。如果设置为 0,则禁用缓存。

lookup.cache.ttl

(none)

Duration

ByteHouse 表的查找缓存的 TTL。如果未指定,则不为缓存记录设置 TTL。

lookup.max-retries

3

Integer

查找操作失败时允许的最大重试次数。

数据类型

如下表格是 Flink SQL 数据类型ByteHouse 数据类型的映射关系。如果需要了解。

Flink SQL 字段类型

ByteHouse 字段类型

TINYINT

Int8

SMALLINT

Int16

INT

Int32

BIGINT

Int64

DECIMAL(20, 0)

UInt64

FLOAT

Float32

DOUBLE

Float64

DECIMAL(p, s)

Decimal(P, S)

BOOLEAN

Int8

DATE

Date

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

DateTime

STRING

String

BYTES

ARRAY

ARRAY

示例代码

SQL 代码示例

CREATE TABLE `bh_de_source` ( -- source table
 `id` BIGINT NOT NULL,
 `time` TIMESTAMP(0),
 `content` ARRAY<DECIMAL(20, 0)>
) WITH (
 'connector' = 'datagen'
);

CREATE TABLE `bh_de_sink` ( -- sink table
 `id` STRING NOT NULL,
 `event_time` STRING,
 `content` ARRAY<DECIMAL(20, 0)>,
 PRIMARY KEY (`id`) NOT ENFORCED -- 主键字段对应 BH 唯一键表的 Unique Key
) WITH (
 'connector' = 'bytehouse-cdw',
 'jdbc.enable-gateway-connection' = 'true',
 'bytehouse.gateway.region' = 'VOLCANO_PRIVATE',
 'bytehouse.gateway.host' = 'tenant-xxxx-cn-beijing.bytehouse.ivolces.com',
 'bytehouse.gateway.port' = '19000',
 'bytehouse.gateway.api-token' = '<API_KEY>',
 'bytehouse.gateway.virtual-warehouse' = '<VIRTUAL_WAREHOUSE>',
 'database' = '<BYTEHOUSE_DB>',
 'table-name' = '<BYTEHOUSE_TABLE>'
);

INSERT INTO `bh_de_sink` ( -- data loading
 `id`,
 `event_time`,
 `content`
)
SELECT
 CAST(`id` AS STRING), 
 CAST(`time` AS STRING), 
 `content` 
FROM `bh_de_source`

Java DataStream 代码示例

Flink 连接器的 DataStream API 源数据类型为 RowData。通过 DataStream API 的使用主要流程是通过 CnchSinkFunctionBuilder 获取 CnchSinkFunction 实例,下面是一个演示基本用法的示例。详细参数配置可以参考 ByteHouse 官方文档

StreamExecutionEnvironment env = ...;

// Adding a source to the data stream
DataStream<RowData> dataStream =
    env.addSource(...).returns(TypeInformation.of(RowData.class));

// List of columns representing the table schema
List<Column> columns =
    Arrays.asList(
        Column.physical("year", DataTypes.INT()),
        Column.physical("npc", DataTypes.STRING()),
        Column.physical("offence", DataTypes.STRING()),
        Column.physical("case_no", DataTypes.INT()));

try (@SuppressWarnings("unchecked")
    CnchSinkFunction<RowData, ?> cnchSink =
        new CnchSinkFunctionBuilder.Insert(database, tableName)
            .withSchema(columns)
            .withGatewayConnection("VOLCANO_PRIVATE", "your_host", "your_port")
            .withGatewayApiToken("<< your API token >>")
            .withGatewayVirtualWarehouse("default_vw")
            .withFlushInterval(Duration.ofSeconds(1))
            .build()) {

    // Add the sink to the data stream
    dataStream.addSink(cnchSink);

    // Trigger the execution
    env.execute();
}

常见问题

写入 ByteHouse 任务因网络不通无法验证 SQL 及启动任务

问题描述:在使用 SQL 验证功能的时候会出现长时间卡住,最终出现 timeout 异常。导致无法通过 SQL 验证。如果对于线上任务,可能会出现长时间无法启动,最终失败的问题。

问题原因:在 SQL 验证和语法解析的过程中,会去访问 ByteHouse 服务端进行元数据获取。此时如果网络无法访问,则会出现长时间卡住的现象。

解决方案
Image

  1. 在 ByteHouse·CDW 产品界面 租户管理 - 基本信息 - 网络信息,检查 Flink 任务运行的资源池的 VPC 和子网是否与 ByteHouse VPC 和子网相同:
    1. 如果处于不同 VPC 下,默认网络是不联通的,建议参考 创建资源池,创建一个与 ByteHouse VPC 相同的 Flink 资源池(建议)。或者采用 设置网络信息 方案设置公网访问域名,Flink 通过公网进行访问(不建议)。
    2. 如果处于相同 VPC 下
      1. 请检查安全组是否相同,如果不同请确认是否安全组的安全规则限制了 ByteHouse 的访问,如果有相关端口或者 IP 限制,请酌情放开限制。
      2. 请检查 Flink 与 ByteHouse 的子网是否相同,如果不同,请在私有网络 - 网络ACL 确认子网之间是否网络访问限制。如果有,请禁用相关限制。

导入 ByteHouse 后出现未来时间

问题描述

  1. 上游 Kafka 数据源中,有字段为字符串格式时间戳例如 {"t": "2024-10-17 10:00:00"}
  2. Flink 的 DDL 语句中定义 t字段为 TIMESTAMP 类型。
  3. ByteHouse 中下游数据类型为 DateTime 或 DateTime64 两种类型。
  4. 写入 ByteHouse 中时间戳查询结果为 2024-10-17 18:00:00

问题原因:源端(例如 Kafka 数据源)时间戳数据以不带时区的格式输出时(如 “2024-10-17 10:00:00” ),默认按 UTC 解析为 Epoch 时间戳;如果源端业务的时区并非 UTC,那么按上述方式输出时间戳值则会间接地引入 UTC 与源端业务时区之间的小时数偏差。

解决方案

  1. 推荐方案一:Kafka 数据源中时间戳建议使用 UTC 时间例如 "2024-10-17T10:00:00Z",同时 Flink 建表语句中定义字段类型为 TIMESTAMP_LTZ。
  2. 推荐方案二:在 Flink ByteHouse 建表语句 WITH OPTIONS 中增加 'timestamp-offset' = '-8h',这样子在单个 ByteHouse Sink 中生效。
  3. 推荐方案三:在 Flink 任务中统一设置变量,这个和方案二效果类似,但是对 Flink 任务中所有的 ByteHouse Sink 都生效containerized.taskmanager.env.FLINK_WRITE_TO_BYTEHOUSE_TIMESTAMP_OFFSET: -8h