在 Flink 控制台,bytehouse-cdw 连接器支持做结果表,可以通过 Flink 任务将数据写入到 ByteHouse 目标表。
ByteHouse 是一款云原生数据仓库,云数仓版(CDW)是一个支持实时导入和离线导入的自助数据分析平台,能够对海量数据进行高效分析。
如需了解 ByteHouse 云数仓版更多信息,请参见 ByteHouse 云数仓版简介。
CREATE TABLE bh_cdw ( f0 VARCHAR, f1 VARCHAR, f2 VARCHAR) WITH ( 'connector' = 'bytehouse-cdw', 'database' = 'doc_db', 'table-name' = 'doc_table_2', 'username' = 'user-a', 'password' = 'qa***6', -- 指定 ByteHouse Gateway 的地域。 -- 示例VOLCANO_PRIVATE为火山引擎私有网络,此时需要ByteHouse CDW和Flink处于相同VPC。 'bytehouse.gateway.region' = 'VOLCANO_PRIVATE', 'bytehouse.gateway.host' = 'tenant-xxxx.bytehouse.ivolces.com', 'bytehouse.gateway.port' = '19000', -- 用来对数据进行分组和管理的虚拟仓库。 'bytehouse.gateway.virtual-warehouse' = 'test', 'jdbc.enable-gateway-connection' = 'true', 'bytehouse.gateway.account' = '210***34', 'bytehouse.gateway.access-key-id' = '<your-access-key>', 'bytehouse.gateway.secret-key' = '<your-secret-key>', 'sink.buffer-flush.interval' = '5 second', 'sink.buffer-flush.max-rows' = '2000' );
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 是 | (none) | String | 指定使用的连接器,此处是 bytehouse-cdw 连接器。 |
database | 是 | (none) | String | 数据库名称。需要在 ByteHouse CDW 控制台提前创建数据库,请参见创建库表。 |
table-name | 是 | (none) | String | 表格名称。需要在 ByteHouse CDW 控制台提前创建表,请参见创建库表。 |
username | 否 | (none) | String | JDBC 帐户名。设置 username,需要同时设置 password。 |
password | 否 | (none) | String | JDBC 帐户密码。 |
jdbc.enable-gateway-connection | 否 | true | Boolean | JDBC 连接是否通过 ByteHouse Gateway。
|
bytehouse.gateway.region | 否 | (none) | String | 指定 ByteHouse Gateway 的地域。
|
bytehouse.gateway.host | 否 | (none) | String | ByteHouse 网关的私有主机。前提是将 bytehouse.gateway.region 设置为 |
bytehouse.gateway.port | 否 | 19000 | Integer | ByteHouse 网关的私有端口。前提是将 bytehouse.gateway.region 设置为 |
bytehouse.gateway.virtual-warehouse | 否 | (none) | String | 用于指定虚拟仓库。 |
bytehouse.gateway.account | 否 | (none) | String | 指定连接器的帐户 ID,用于认证和授权。 |
bytehouse.gateway.access-key-id | 否 | (none) | String | 连接器帐户的 Access Key。 |
bytehouse.gateway.secret-key | 否 | (none) | String | 连接器帐户的 Secret Key。 |
bytehouse.gateway.api-token | 否 | (none) | String | 连接器帐户的 API Token。 |
sink.buffer-flush.interval | 否 | 1 second | Duration | 刷新时间间隔,最小值为 |
sink.buffer-flush.max-rows | 否 | 100,000 | Integer | 缓冲记录大小,最小值为 |
sink.buffer-flush.max-batches | 否 | 32 | Integer | 数据写入到 Sink 的缓冲区时的最大批次数,最小值为 |
sink.max-retries | 否 | 3 | Integer | 刷新数据失败时的最大尝试次数。 |
sink.parallelism | 否 | (none) | Integer | 刷新数据的并行度。默认情况下,与上游算子并行度保持一致。 |
metrics.update-interval | 否 | 5 seconds | Duration | 刷新指标的时间间隔,最小设置为 5 seconds。 |
metrics.log-level | 否 | INFO | String | 日志级别。 |
如下表格是 Flink SQL 数据类型和 ByteHouse 数据类型的映射关系。如果需要了解。
Flink SQL 字段类型 | ByteHouse 字段类型 |
---|---|
TINYINT | Int8 |
SMALLINT | Int16 |
INT | Int32 |
BIGINT | Int64 |
DECIMAL(20, 0) | UInt64 |
FLOAT | Float32 |
DOUBLE | Float64 |
DECIMAL(p, s) | Decimal(P, S) |
BOOLEAN | Int8 |
DATE | Date |
TIME [(p)] [WITHOUT TIMEZONE] | 暂不支持,请使用 String 类型替代 |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] | DateTime |
STRING | String |
BYTES | |
ARRAY | ARRAY |
CREATE TABLE random_source ( f0 VARCHAR, f1 VARCHAR, f2 VARCHAR) WITH ( 'connector' = 'datagen', 'rows-per-second'='1' ); CREATE TABLE bh_cdw ( f0 VARCHAR, f1 VARCHAR, f2 VARCHAR) WITH ( 'connector' = 'bytehouse-cdw', 'database' = 'doc_db', 'table-name' = 'doc_table_2', 'username' = 'user-a', 'password' = 'qa***6', -- 指定 ByteHouse Gateway 的地域。 -- 示例VOLCANO_PRIVATE为火山引擎私有网络,此时需要ByteHouse CDW和Flink处于相同VPC。 'bytehouse.gateway.region' = 'VOLCANO_PRIVATE', 'bytehouse.gateway.host' = 'tenant-xxxx.bytehouse.ivolces.com', 'bytehouse.gateway.port' = '19000', -- 用来对数据进行分组和管理的虚拟仓库。 'bytehouse.gateway.virtual-warehouse' = 'test', 'jdbc.enable-gateway-connection' = 'true', 'bytehouse.gateway.account' = '210***34', 'bytehouse.gateway.access-key-id' = '<your-access-key>', 'bytehouse.gateway.secret-key' = '<your-secret-key>', 'sink.buffer-flush.interval' = '5 second', 'sink.buffer-flush.max-rows' = '2000' ); INSERT INTO bh_cdw SELECT f0, f1, f2 FROM random_source;
Flink 连接器的 DataStream API 源数据类型为 RowData
。通过 DataStream API 的使用主要流程是通过 CnchSinkFunctionBuilder
获取 CnchSinkFunction
实例,下面是一个演示基本用法的示例。详细参数配置可以参考 ByteHouse 官方文档。
StreamExecutionEnvironment env = ...; // Adding a source to the data stream DataStream<RowData> dataStream = env.addSource(...).returns(TypeInformation.of(RowData.class)); // List of columns representing the table schema List<Column> columns = Arrays.asList( Column.physical("year", DataTypes.INT()), Column.physical("npc", DataTypes.STRING()), Column.physical("offence", DataTypes.STRING()), Column.physical("case_no", DataTypes.INT())); try (@SuppressWarnings("unchecked") CnchSinkFunction<RowData, ?> cnchSink = new CnchSinkFunctionBuilder.Insert(database, tableName) .withSchema(columns) .withGatewayConnection("VOLCANO_PRIVATE", "your_host", "your_port") .withGatewayApiToken("<< your API token >>") .withGatewayVirtualWarehouse("default_vw") .withFlushInterval(Duration.ofSeconds(1)) .build()) { // Add the sink to the data stream dataStream.addSink(cnchSink); // Trigger the execution env.execute(); }
问题描述:
{"t": "2024-10-17 10:00:00"}
t
字段为 TIMESTAMP 类型。2024-10-17 18:00:00
问题原因:源端(例如 Kafka 数据源)时间戳数据以不带时区的格式输出时(如 “2024-10-17 10:00:00” ),默认按 UTC 解析为 Epoch 时间戳;如果源端业务的时区并非 UTC,那么按上述方式输出时间戳值则会间接地引入 UTC 与源端业务时区之间的小时数偏差。
解决方案:
'timestmap-offset' = '-8h'
,这样子在单个 ByteHouse Sink 中生效。containerized.taskmanager.env.FLINK_WRITE_TO_BYTEHOUSE_TIMESTAMP_OFFSET: -8h