You need to enable JavaScript to run this app.
导航
ByteHouse CE
最近更新时间:2025.01.16 15:31:24首次发布时间:2023.12.18 17:54:22

在 Flink 控制台,bytehouse-ce 连接器支持做结果表,可以通过 Flink 任务将数据写入到 ByteHouse 目标表。

背景信息

ByteHouse 是一款云原生数据仓库,是火山引擎基于开源 ClickHouse 进行深度优化和改造的版本,提供海量数据上更强的查询服务和数据写入性能。
ByteHouse 企业版(CE)基于火山内部的丰富场景,以及 ClickHouse 开源版的痛点进行了深度定制,包括多场景表引擎、扩展数据类型、多级存储等功能。如需了解 ByteHouse 企业版更多信息,请参见ByteHouse 企业版简介

使用限制

ByteHouse CE 连接器暂时仅支持在 Flink 1.16-volcano 引擎版本中使用。

DDL 定义

CREATE TABLE bh_ce_sink (
  f0 VARCHAR,
  f1 VARCHAR,
  f2 VARCHAR
) WITH (
  'connector' = 'bytehouse-ce',
  'clickhouse.cluster' = 'bytehouse_cluster_***',
  'clickhouse.shard-discovery.service.host' = '7249621***.bytehouse-ce.ivolces.com',
  'username' = 'user_a',
  'password' = 'pa***45',
  'database' = 'default',
  'table-name' = 'doc_test',
  'sink.buffer-flush.interval' = '10 second',
  'sink.buffer-flush.max-rows' = '5000',
  'clickhouse.shard-discovery.address-mapping' = '192.18.*.*|8123:192.168.*.*|8123'
);

数据类型映射

如下表格是 Flink SQL 数据类型ByteHouse 数据类型的映射关系。如果需要了解

Flink SQL 字段类型

ByteHouse 字段类型

TINYINT

Int8

SMALLINT

Int16

INT

Int32

BIGINT

Int64

DECIMAL(20, 0)

UInt64

FLOAT

Float32

DOUBLE

Float64

DECIMAL(p, s)

Decimal(P, S)

BOOLEAN

Int8

DATE

Date

TIME [(p)] [WITHOUT TIMEZONE]

暂不支持,请使用 String 类型替代

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

DateTime

STRING

String

BYTES

ARRAY

ARRAY

WITH 参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器,此处是bytehouse-ceclickhouse连接器 。

database

(none)

String

数据库名称。需要在 ByteHouse CE 控制台提前创建数据库,请参见创建库表

table-name

(none)

String

表格名称。需要在 ByteHouse CE 控制台提前创建表,请参见创建库表

clickhouse.cluster

(none)

String

ByteHouse CE 集群名称。需要在 ByteHouse CE 控制台提前创建集群,请参见创建集群

clickhouse.shard-discovery.kind

SYSTEM_CLUSTERS

String

分片发现类型。

  • SYSTEM_CLUSTERS:通过查询system.clusters 表来获取分片的 IP。
  • CONSUL:通过 Consul 服务来获取分片的 IP。
  • API_CLUSTERS:通过 ByteHouse CE 特定 OpenAPI 获取分片的 IP。
  • CE_GATEWAY:通过 ByteHouse CE Gateway 来获取分片的 IP。

clickhouse.shard-discovery.service.host

127.0.0.1

String

分片发现服务的主机地址。

clickhouse.shard-discovery.service.port

8123

Integer

分片发现服务的端口号。

clickhouse.shard-discovery.address-mapping

(none)

Map

分片发现地址映射,将实际的分片地址映射到可供访问的地址。
示例:192.18.*.*|8123:192.168.*.*|8123

bytehouse-ce.api.get-consul-info

(built-in)

String

用于获取 Consul 服务信息的 ByteHouse CE API。

bytehouse-ce.api.get-shard-info

(built-in)

String

用于获取分片信息的 ByteHouse CE API。

bytehouse-ce.auth.api

(built-in)

String

用于进行身份验证的 ByteHouse CE API。

username

(none)

String

JDBC 用户名。
设置 username,需要同时设置 password。

password

(none)

String

JDBC 用户密码。

sharding-strategy

NONE

String

分区策略。

  • NONE:没有分区。
  • RANDOM: 随机分区。
  • ROUND_ROBIN:基于数据到达顺序进行分区。
  • HASH:基于数据内容进行分区。

sharding-key

(none)

String

哈希分区键,用于确定数据分布到不同分片。可以由一个或多个字段组成,多个字段之间使用逗号进行分隔。

sharding-expression

(none)

String

哈希分区表达式,用于确定数据分布到不同分片。如果设置了哈希分区表达式,则所有相关字段的名称也必须在分区键中列出。

sink.buffer-flush.interval

1 second

Duration

刷新时间间隔,最小值为200 ms

sink.buffer-flush.max-rows

50,000

Integer

缓冲记录大小,最小值为100

sink.buffer-flush.max-batches

6

Integer

数据写入到 Sink 的缓冲区时的最大批次数,最小值为1

sink.max-retries

-1

Integer

刷新数据失败时的最大尝试次数。设置为-1,表示无限重试。

sink.parallelism

(none)

Integer

刷新数据的并行度。默认情况下,与上游算子并行度保持一致。

sink.proactive-validate

false

Boolean

是否主动验证数据。

  • true:主动验证数据。在批处理前验证每条记录。
  • false:默认值,被动验证数据。在数据刷新尝试失败时触发验证数据。

sink.enable-upsert

false

Boolean

是否启用 Upsert 操作(插入或更新)到 Sink。
sink.enable-upsert设置为true时,将允许执行 Upsert 操作,即在写入数据到 Sink 时,如果数据已经存在则进行更新,否则进行插入。
由于不是所有的 Sink 都支持 Upsert 操作,在启用sink.enable-upsert之前,请确保您的 Sink 支持该操作。否则可能会导致错误或不可预测的行为。

metrics.update-interval

5 seconds

Duration

刷新指标的时间间隔,最小设置为 5 seconds。

metrics.log-level

INFO

String

日志级别。
如需了解更多信息,请参见Log4j 内置日志级别

示例代码

CREATE TABLE random_source (
   f0 VARCHAR,
   f1 VARCHAR,
   f2 VARCHAR
 ) WITH (
   'connector' = 'datagen',
   'rows-per-second' = '1'
 );

CREATE TABLE bh_ce_sink (
  f0 VARCHAR,
  f1 VARCHAR,
  f2 VARCHAR
) WITH (
  'connector' = 'bytehouse-ce',
  'clickhouse.cluster' = 'bytehouse_cluster_***',
  'clickhouse.shard-discovery.service.host' = '7249621***.bytehouse-ce.ivolces.com',
  'username' = 'user_a',
  'password' = 'pa***45',
  'database' = 'default',
  'table-name' = 'doc_test',
  'sink.buffer-flush.interval' = '10 second',
  'sink.buffer-flush.max-rows' = '5000',
  'clickhouse.shard-discovery.address-mapping' = '192.18.*.*|8123:192.168.*.*|8123'
);

INSERT INTO bh_ce_sink 
SELECT f0, f1, f2 FROM random_source;

常见问题

导入 ByteHouse 后出现未来时间

问题描述

  1. 上游 Kafka 数据源中,有字段为字符串格式时间戳例如 {"t": "2024-10-17 10:00:00"}
  2. Flink 的 DDL 语句中定义 t字段为 TIMESTAMP 类型。
  3. ByteHouse 中下游数据类型为 DateTime 或 DateTime64 两种类型。
  4. 写入 ByteHouse 中时间戳查询结果为 2024-10-17 18:00:00

问题原因:源端(例如 Kafka 数据源)时间戳数据以不带时区的格式输出时(如 “2024-10-17 10:00:00” ),默认按 UTC 解析为 Epoch 时间戳;如果源端业务的时区并非 UTC,那么按上述方式输出时间戳值则会间接地引入 UTC 与源端业务时区之间的小时数偏差。

解决方案

  1. 推荐方案一:Kafka 数据源中时间戳建议使用 UTC 时间例如 "2024-10-17T10:00:00Z",同时 Flink 建表语句中定义字段类型为 TIMESTAMP_LTZ。
  2. 推荐方案二:在 Flink ByteHouse 建表语句 WITH OPTIONS 中增加 'timestamp-offset' = '-8h',这样子在单个 ByteHouse Sink 中生效。
  3. 推荐方案三:在 Flink 任务中统一设置变量,这个和方案二效果类似,但是对 Flink 任务中所有的 ByteHouse Sink 都生效containerized.taskmanager.env.FLINK_WRITE_TO_BYTEHOUSE_TIMESTAMP_OFFSET: -8h