You need to enable JavaScript to run this app.
导航
ByteHouse CDW
最近更新时间:2024.10.23 20:43:55首次发布时间:2023.12.18 17:54:13

在 Flink 控制台,bytehouse-cdw 连接器支持做结果表,可以通过 Flink 任务将数据写入到 ByteHouse 目标表。

背景信息

ByteHouse 是一款云原生数据仓库,云数仓版(CDW)是一个支持实时导入和离线导入的自助数据分析平台,能够对海量数据进行高效分析。
如需了解 ByteHouse 云数仓版更多信息,请参见 ByteHouse 云数仓版简介

使用限制

  • ByteHouse CDW 连接器暂时仅支持在 Flink 1.16-volcano 引擎版本中使用。
  • 如果使用火山引擎私有网络,此时需要 ByteHouse CDW 和 Flink 处于相同 VPC,具体设置方法可以参考 ByteHouse CDW 网络设置文档

DDL 定义

CREATE TABLE bh_cdw (
    f0 VARCHAR, 
    f1 VARCHAR, 
    f2 VARCHAR) WITH (
    'connector' = 'bytehouse-cdw',
    'database' = 'doc_db',
    'table-name' = 'doc_table_2',
    'username' = 'user-a',
    'password' = 'qa***6',
    -- 指定 ByteHouse Gateway 的地域。
    -- 示例VOLCANO_PRIVATE为火山引擎私有网络,此时需要ByteHouse CDW和Flink处于相同VPC。
    'bytehouse.gateway.region' = 'VOLCANO_PRIVATE',
    'bytehouse.gateway.host' = 'tenant-xxxx.bytehouse.ivolces.com',
    'bytehouse.gateway.port' = '19000',
    -- 用来对数据进行分组和管理的虚拟仓库。
    'bytehouse.gateway.virtual-warehouse' = 'test',
    'jdbc.enable-gateway-connection' = 'true',
    'bytehouse.gateway.account' = '210***34',
    'bytehouse.gateway.access-key-id' = '<your-access-key>',
    'bytehouse.gateway.secret-key' = '<your-secret-key>',
    'sink.buffer-flush.interval' = '5 second',
    'sink.buffer-flush.max-rows' = '2000'
);

WITH 参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器,此处是 bytehouse-cdw 连接器。

database

(none)

String

数据库名称。需要在 ByteHouse CDW 控制台提前创建数据库,请参见创建库表

table-name

(none)

String

表格名称。需要在 ByteHouse CDW 控制台提前创建表,请参见创建库表

username

(none)

String

JDBC 帐户名。设置 username,需要同时设置 password。

password

(none)

String

JDBC 帐户密码。

jdbc.enable-gateway-connection

true

Boolean

JDBC 连接是否通过 ByteHouse Gateway。

  • true:默认值,通过。
  • false:不通过。

bytehouse.gateway.region

(none)

String

指定 ByteHouse Gateway 的地域。

  • VOLCANO_PRIVATE:火山引擎私有网络,此时需要 ByteHouse CDW 和 Flink 处于相同 VPC。以及填写私有域名地址和端口。

bytehouse.gateway.host

(none)

String

ByteHouse 网关的私有主机。前提是将 bytehouse.gateway.region 设置为 VOLCANO_PRIVATE

bytehouse.gateway.port

19000

Integer

ByteHouse 网关的私有端口。前提是将 bytehouse.gateway.region 设置为 VOLCANO_PRIVATE

bytehouse.gateway.virtual-warehouse

(none)

String

用于指定虚拟仓库。
虚拟仓库是一个逻辑概念。通过创建虚拟仓库,您可以对数据进行分组和管理,以便更好地控制和优化数据的存储和查询操作。

bytehouse.gateway.account

(none)

String

指定连接器的帐户 ID,用于认证和授权。
通过usernamepassword设置帐户名称和密码。

bytehouse.gateway.access-key-id

(none)

String

连接器帐户的 Access Key。
说明
如果设置帐户的 Access Key,则必须同时设置 Secret Key。

bytehouse.gateway.secret-key

(none)

String

连接器帐户的 Secret Key。

bytehouse.gateway.api-token

(none)

String

连接器帐户的 API Token。

sink.buffer-flush.interval

1 second

Duration

刷新时间间隔,最小值为200 ms

sink.buffer-flush.max-rows

100,000

Integer

缓冲记录大小,最小值为2000

sink.buffer-flush.max-batches

32

Integer

数据写入到 Sink 的缓冲区时的最大批次数,最小值为1

sink.max-retries

3

Integer

刷新数据失败时的最大尝试次数。

sink.parallelism

(none)

Integer

刷新数据的并行度。默认情况下,与上游算子并行度保持一致。

metrics.update-interval

5 seconds

Duration

刷新指标的时间间隔,最小设置为 5 seconds。

metrics.log-level

INFO

String

日志级别。
如需了解更多信息,请参见 Log4j 内置日志级别

数据类型映射

如下表格是 Flink SQL 数据类型ByteHouse 数据类型的映射关系。如果需要了解。

Flink SQL 字段类型

ByteHouse 字段类型

TINYINT

Int8

SMALLINT

Int16

INT

Int32

BIGINT

Int64

DECIMAL(20, 0)

UInt64

FLOAT

Float32

DOUBLE

Float64

DECIMAL(p, s)

Decimal(P, S)

BOOLEAN

Int8

DATE

Date

TIME [(p)] [WITHOUT TIMEZONE]

暂不支持,请使用 String 类型替代

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

DateTime

STRING

String

BYTES

ARRAY

ARRAY

示例代码

SQL 代码示例

CREATE TABLE random_source (
    f0 VARCHAR, 
    f1 VARCHAR, 
    f2 VARCHAR) WITH (
    'connector' = 'datagen', 
    'rows-per-second'='1'
);


CREATE TABLE bh_cdw (
    f0 VARCHAR, 
    f1 VARCHAR, 
    f2 VARCHAR) WITH (
    'connector' = 'bytehouse-cdw',
    'database' = 'doc_db',
    'table-name' = 'doc_table_2',
    'username' = 'user-a',
    'password' = 'qa***6',
    -- 指定 ByteHouse Gateway 的地域。
    -- 示例VOLCANO_PRIVATE为火山引擎私有网络,此时需要ByteHouse CDW和Flink处于相同VPC。
    'bytehouse.gateway.region' = 'VOLCANO_PRIVATE',
    'bytehouse.gateway.host' = 'tenant-xxxx.bytehouse.ivolces.com',
    'bytehouse.gateway.port' = '19000',
    -- 用来对数据进行分组和管理的虚拟仓库。
    'bytehouse.gateway.virtual-warehouse' = 'test',
    'jdbc.enable-gateway-connection' = 'true',
    'bytehouse.gateway.account' = '210***34',
    'bytehouse.gateway.access-key-id' = '<your-access-key>',
    'bytehouse.gateway.secret-key' = '<your-secret-key>',
    'sink.buffer-flush.interval' = '5 second',
    'sink.buffer-flush.max-rows' = '2000'
);


INSERT INTO bh_cdw 
SELECT f0, f1, f2 FROM random_source;

DataStream 代码示例

Flink 连接器的 DataStream API 源数据类型为 RowData。通过 DataStream API 的使用主要流程是通过 CnchSinkFunctionBuilder 获取 CnchSinkFunction 实例,下面是一个演示基本用法的示例。详细参数配置可以参考 ByteHouse 官方文档

StreamExecutionEnvironment env = ...;

// Adding a source to the data stream
DataStream<RowData> dataStream =
    env.addSource(...).returns(TypeInformation.of(RowData.class));

// List of columns representing the table schema
List<Column> columns =
    Arrays.asList(
        Column.physical("year", DataTypes.INT()),
        Column.physical("npc", DataTypes.STRING()),
        Column.physical("offence", DataTypes.STRING()),
        Column.physical("case_no", DataTypes.INT()));

try (@SuppressWarnings("unchecked")
    CnchSinkFunction<RowData, ?> cnchSink =
        new CnchSinkFunctionBuilder.Insert(database, tableName)
            .withSchema(columns)
            .withGatewayConnection("VOLCANO_PRIVATE", "your_host", "your_port")
            .withGatewayApiToken("<< your API token >>")
            .withGatewayVirtualWarehouse("default_vw")
            .withFlushInterval(Duration.ofSeconds(1))
            .build()) {

    // Add the sink to the data stream
    dataStream.addSink(cnchSink);

    // Trigger the execution
    env.execute();
}

常见问题

导入 ByteHouse 后出现未来时间

问题描述

  1. 上游 Kafka 数据源中,有字段为字符串格式时间戳例如 {"t": "2024-10-17 10:00:00"}
  2. Flink 的 DDL 语句中定义 t字段为 TIMESTAMP 类型。
  3. ByteHouse 中下游数据类型为 DateTime 或 DateTime64 两种类型。
  4. 写入 ByteHouse 中时间戳查询结果为 2024-10-17 18:00:00

问题原因:源端(例如 Kafka 数据源)时间戳数据以不带时区的格式输出时(如 “2024-10-17 10:00:00” ),默认按 UTC 解析为 Epoch 时间戳;如果源端业务的时区并非 UTC,那么按上述方式输出时间戳值则会间接地引入 UTC 与源端业务时区之间的小时数偏差。

解决方案

  1. 推荐方案一:Kafka 数据源中时间戳建议使用 UTC 时间例如 "2024-10-17T10:00:00Z",同时 Flink 建表语句中定义字段类型为 TIMESTAMP_LTZ。
  2. 推荐方案二:在 Flink ByteHouse 建表语句 WITH OPTIONS 中增加 'timestmap-offset' = '-8h',这样子在单个 ByteHouse Sink 中生效。
  3. 推荐方案三:在 Flink 任务中统一设置变量,这个和方案二效果类似,但是对 Flink 任务中所有的 ByteHouse Sink 都生效containerized.taskmanager.env.FLINK_WRITE_TO_BYTEHOUSE_TIMESTAMP_OFFSET: -8h