在 Flink 控制台,bytehouse-ce 连接器支持做结果表,可以通过 Flink 任务将数据写入到 ByteHouse 目标表。
ByteHouse 是一款云原生数据仓库,是火山引擎基于开源 ClickHouse 进行深度优化和改造的版本,提供海量数据上更强的查询服务和数据写入性能。
ByteHouse 企业版(CE)基于火山内部的丰富场景,以及 ClickHouse 开源版的痛点进行了深度定制,包括多场景表引擎、扩展数据类型、多级存储等功能。如需了解 ByteHouse 企业版更多信息,请参见ByteHouse 企业版简介。
ByteHouse CE 连接器暂时仅支持在 Flink 1.16-volcano 引擎版本中使用。
CREATE TABLE bh_ce_sink ( f0 VARCHAR, f1 VARCHAR, f2 VARCHAR ) WITH ( 'connector' = 'bytehouse-ce', 'clickhouse.cluster' = 'bytehouse_cluster_***', 'clickhouse.shard-discovery.service.host' = '7249621***.bytehouse-ce.ivolces.com', 'username' = 'user_a', 'password' = 'pa***45', 'database' = 'default', 'table-name' = 'doc_test', 'sink.buffer-flush.interval' = '10 second', 'sink.buffer-flush.max-rows' = '5000', 'clickhouse.shard-discovery.address-mapping' = '192.18.*.*|8123:192.168.*.*|8123' );
如下表格是 Flink SQL 数据类型和 ByteHouse 数据类型的映射关系。如果需要了解
Flink SQL 字段类型 | ByteHouse 字段类型 |
---|---|
TINYINT | Int8 |
SMALLINT | Int16 |
INT | Int32 |
BIGINT | Int64 |
DECIMAL(20, 0) | UInt64 |
FLOAT | Float32 |
DOUBLE | Float64 |
DECIMAL(p, s) | Decimal(P, S) |
BOOLEAN | Int8 |
DATE | Date |
TIME [(p)] [WITHOUT TIMEZONE] | 暂不支持,请使用 String 类型替代 |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] | DateTime |
STRING | String |
BYTES | |
ARRAY | ARRAY |
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 是 | (none) | String | 指定使用的连接器,此处是 |
database | 是 | (none) | String | 数据库名称。需要在 ByteHouse CE 控制台提前创建数据库,请参见创建库表。 |
table-name | 是 | (none) | String | 表格名称。需要在 ByteHouse CE 控制台提前创建表,请参见创建库表。 |
clickhouse.cluster | 是 | (none) | String | ByteHouse CE 集群名称。需要在 ByteHouse CE 控制台提前创建集群,请参见创建集群。 |
clickhouse.shard-discovery.kind | 否 | SYSTEM_CLUSTERS | String | 分片发现类型。
|
clickhouse.shard-discovery.service.host | 否 | 127.0.0.1 | String | 分片发现服务的主机地址。 |
clickhouse.shard-discovery.service.port | 否 | 8123 | Integer | 分片发现服务的端口号。 |
clickhouse.shard-discovery.address-mapping | 否 | (none) | Map | 分片发现地址映射,将实际的分片地址映射到可供访问的地址。 |
bytehouse-ce.api.get-consul-info | 否 | (built-in) | String | 用于获取 Consul 服务信息的 ByteHouse CE API。 |
bytehouse-ce.api.get-shard-info | 否 | (built-in) | String | 用于获取分片信息的 ByteHouse CE API。 |
bytehouse-ce.auth.api | 否 | (built-in) | String | 用于进行身份验证的 ByteHouse CE API。 |
username | 否 | (none) | String | JDBC 用户名。 |
password | 否 | (none) | String | JDBC 用户密码。 |
sharding-strategy | 否 | NONE | String | 分区策略。
|
sharding-key | 否 | (none) | String | 哈希分区键,用于确定数据分布到不同分片。可以由一个或多个字段组成,多个字段之间使用逗号进行分隔。 |
sharding-expression | 否 | (none) | String | 哈希分区表达式,用于确定数据分布到不同分片。如果设置了哈希分区表达式,则所有相关字段的名称也必须在分区键中列出。 |
sink.buffer-flush.interval | 否 | 1 second | Duration | 刷新时间间隔,最小值为 |
sink.buffer-flush.max-rows | 否 | 50,000 | Integer | 缓冲记录大小,最小值为 |
sink.buffer-flush.max-batches | 否 | 6 | Integer | 数据写入到 Sink 的缓冲区时的最大批次数,最小值为 |
sink.max-retries | 否 | -1 | Integer | 刷新数据失败时的最大尝试次数。设置为 |
sink.parallelism | 否 | (none) | Integer | 刷新数据的并行度。默认情况下,与上游算子并行度保持一致。 |
sink.proactive-validate | 否 | false | Boolean | 是否主动验证数据。
|
sink.enable-upsert | 否 | false | Boolean | 是否启用 Upsert 操作(插入或更新)到 Sink。 |
metrics.update-interval | 否 | 5 seconds | Duration | 刷新指标的时间间隔,最小设置为 5 seconds。 |
metrics.log-level | 否 | INFO | String | 日志级别。 |
CREATE TABLE random_source ( f0 VARCHAR, f1 VARCHAR, f2 VARCHAR ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); CREATE TABLE bh_ce_sink ( f0 VARCHAR, f1 VARCHAR, f2 VARCHAR ) WITH ( 'connector' = 'bytehouse-ce', 'clickhouse.cluster' = 'bytehouse_cluster_***', 'clickhouse.shard-discovery.service.host' = '7249621***.bytehouse-ce.ivolces.com', 'username' = 'user_a', 'password' = 'pa***45', 'database' = 'default', 'table-name' = 'doc_test', 'sink.buffer-flush.interval' = '10 second', 'sink.buffer-flush.max-rows' = '5000', 'clickhouse.shard-discovery.address-mapping' = '192.18.*.*|8123:192.168.*.*|8123' ); INSERT INTO bh_ce_sink SELECT f0, f1, f2 FROM random_source;
问题描述:
{"t": "2024-10-17 10:00:00"}
t
字段为 TIMESTAMP 类型。2024-10-17 18:00:00
问题原因:源端(例如 Kafka 数据源)时间戳数据以不带时区的格式输出时(如 “2024-10-17 10:00:00” ),默认按 UTC 解析为 Epoch 时间戳;如果源端业务的时区并非 UTC,那么按上述方式输出时间戳值则会间接地引入 UTC 与源端业务时区之间的小时数偏差。
解决方案:
'timestamp-offset' = '-8h'
,这样子在单个 ByteHouse Sink 中生效。containerized.taskmanager.env.FLINK_WRITE_TO_BYTEHOUSE_TIMESTAMP_OFFSET: -8h