Flink Connector Driver for ByteHouse 云数仓版连接器专门用于通过 Flink 将数据加载到 ByteHouse云数仓版。
本文将介绍通过 Table API&SQL 和 Flink 的 DataStreamAPI 两种方式连接ByteHouse并处理数据。
根据您安装的 Flink 版本,下载匹配的 Flink SQL 或 Flink DataStream API 驱动。
Flink 版本 | 驱动程序 | 发布日期 |
---|---|---|
1.18 | 2025-01-13 | |
1.17 | 2025-01-13 | |
1.16 | 2025-01-13 | |
1.15 | 2025-01-13 | |
1.11 | 2024-10-23 |
请运行以下命令,将下载的flink-sql-connector-bytehouse-cdw-${flink-sql-connector-bytehouse-cdw.version}.jar
文件安装到本地 Maven 存储库:
mvn install:install-file \ -Dfile=${your_path}/flink-sql-connector-bytehouse-cdw-${flink-sql-connector-bytehouse-cdw.version}.jar \ -DgroupId=com.bytedance.bytehouse \ -DartifactId=flink-sql-connector-bytehouse-cdw \ -Dversion=${flink-sql-connector-bytehouse-cdw.version} \ -Dpackaging=jar
然后,您可以使用 IDE(例如 IntelliJ)将其添加为项目中的依赖项。
对于要使用 Flink connector 连接器进行编译的 Maven 项目,请将以下依赖项添加到项目的 pom.xml 文件中。scala.version
需要是 2.11 或 2.12,与 Flink 发行版的关联 Scala 版本相对应。${flink-sql-connector-bytehouse-cdw.version}
可以设置为所需的 Flink connector 连接器版本。
<dependency> <groupId>com.bytedance.bytehouse</groupId> <artifactId>flink-sql-connector-bytehouse-cdw_${scala.version}</artifactId> <version>${flink-sql-connector-bytehouse-cdw.version}</version> </dependency> <dependency> <groupId>com.bytedance.bytehouse</groupId> <artifactId>flink-sql-connector-bytehouse-cdw_2.12</artifactId> <version>1.27.100_snapshot7</version> </dependency>
然后,将以下存储库添加到 pom.xml
文件:
<repository> <id>bytedance</id> <name>ByteDance Public Repository</name> <url>https://artifact.bytedance.com/repository/releases</url> </repository>
下面是通过 FlinkSQL 将数据表单加载到 ByteHouse 云数仓版数据表中的示例。
说明
CREATE TABLE `bh_de_source` ( -- source table `id` BIGINT NOT NULL, `time` TIMESTAMP(0), `content` ARRAY<DECIMAL(20, 0)> ) PARTITIONED BY (`id`) WITH ( 'connector' = 'kinesis', 'stream' = 'demo_stream', 'format' = 'json', 'aws.region' = 'cn-north-1', 'aws.credentials.provider' = 'BASIC', 'aws.credentials.basic.accesskeyid' = '???', 'aws.credentials.basic.secretkey' = '???', 'scan.shard.getrecords.maxretries' = '7' ); CREATE TABLE `bh_de_sink` ( -- sink table `id` STRING NOT NULL, `event_time` STRING, `content` ARRAY<DECIMAL(20, 0)> ) PARTITIONED BY (`id`) WITH ( 'connector' = 'bytehouse-cdw', 'jdbc.enable-gateway-connection' = 'true', 'bytehouse.gateway.region' = 'VOLCANO_PRIVATE', 'bytehouse.gateway.host' = 'tenant-xxxx.bytehouse.ivolces.com', 'bytehouse.gateway.port' = '19000', 'bytehouse.gateway.api-token' = '<< your API token >>', 'bytehouse.gateway.virtual-warehouse' = 'default_vw', 'database' = 'demo_database', 'table-name' = 'demo_table' ); INSERT INTO `bh_de_sink` ( -- data loading `id`, `event_time`, `content` ) SELECT IFNULL(CAST(`id` AS STRING), '43'), -- type casting with default value CAST(`time` AS STRING), -- type casting IFNULL(`content`, 'hello') -- default value FROM `bh_de_source`
Flink 连接器的 DataStream API 将源数据类型限制为 RowData
。
DataStream API 的使用主要是通过 CnchSinkFunctionBuilder
获取 CnchSinkFunction
实例,这有助于连接器的各种配置。下面是一个演示基本用法的示例。
import com.bytedance.bytehouse.flink.connector.cnch.CnchSinkFunction; import com.bytedance.bytehouse.flink.connector.cnch.api.java.CnchSinkFunctionBuilder; import com.bytedance.bytehouse.flink.table.data.RowDataConstructor; import java.sql.Date; import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.Random; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; StreamExecutionEnvironment env = ...; // Adding a source to the data stream DataStream<RowData> dataStream = env.addSource(...).returns(TypeInformation.of(RowData.class)); // List of columns representing the table schema List<Column> columns = Arrays.asList( Column.physical("year", DataTypes.INT()), Column.physical("npc", DataTypes.STRING()), Column.physical("offence", DataTypes.STRING()), Column.physical("case_no", DataTypes.INT())); try (@SuppressWarnings("unchecked") CnchSinkFunction<RowData, ?> cnchSink = new CnchSinkFunctionBuilder.Insert(database, tableName) .withSchema(columns) .withGatewayConnection("VOLCANO_PRIVATE", "your_host", "your_port") .withGatewayApiToken("<< your API token >>") .withGatewayVirtualWarehouse("default_vw") .withFlushInterval(Duration.ofSeconds(1)) .build()) { // Add the sink to the data stream dataStream.addSink(cnchSink); // Trigger the execution env.execute(); }
参数 | 必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
| 是 | (none) | String | 指定要使用的驱动,这里应该是 |
| 是 | (none) | String | 需要连接的 ByteHouse 云数仓版数据库的名称。 |
| 是 | (none) | String | 需要连接的 ByteHouse 云数仓版表的名称。 |
| 否 | (none) | String | 用户名。 |
| 否 | (none) | String | 密码。 |
| 否 |
| Duration | 通过 JDBC 执行查询的超时设置。 |
| 否 |
| Boolean | 指定 JDBC 连接是否需要经过 ByteHouse网关。 |
| 否 | (none) | String | ByteHouse 网关区域。支持的值包括:
|
| 否 | (none) | String | ByteHouse 网关的私有主机。前提是将 bytehouse.gateway.region 设置为 |
| 否 |
| Integer | ByteHouse 网关的私有端口。前提是将 bytehouse.gateway.region 设置为 |
| 否 | (none) | String | 通过 ByteHouse Gateway 进行查询处理的计算组的名称或 ID。默认情况下,使用通过 ByteHouse 控制台配置的默认计算组。 |
| 否 | (none) | String | ByteHouse 网关账户 ID。 |
| 否 | (none) | String | ByteHouse Gateway 访问密钥 ID。 |
| 否 | (none) | String | ByteHouse 网关密钥。 |
| 否 | (none) | String | ByteHouse 网关API token令牌。 |
| 否 | 1 | Integer | 存储转储的并行度(即每次插入执行的线程数)。 |
| 否 | (none) | String | 预接收器分组的密钥。密钥可以由多个字段组成,以逗号分隔。 |
| 否 | (none) | String | 预接收器分组的表达式。如果设置了,所有涉及的字段名称也必须在 sink.group-by.key 中列出。 |
| 否 | (none) | Integer | 记录预接收器分组的组数。如果未指定,它将回退到 sink.parallelism(如果提供)。The ByteHouse Gateway secret key. |
| 否 |
| Duration | 两次批量刷新之间的最大间隔。最小为 200 毫秒(ms)。 |
| 否 |
| Integer | 刷新前缓冲记录的最大大小。最小为 100。 |
| 否 |
| Integer | 触发异步刷新过载预防的待处理批次数的阈值。最小为 1。 |
| 否 |
| Integer | 刷新数据失败时的最大尝试次数。将其设置为 -1 表示无限次重试。 |
| 否 | (none) | Integer | 刷新数据的最大并行度。默认情况下,并行度由框架使用与上游链式运算符相同的并行度来确定。 |
| 否 |
| Boolean | 指定是否启用主动数据验证(即,在添加到批次之前要验证的每个记录)。默认情况下,使用被动数据验证(即,仅在数据刷新尝试失败时触发数据验证)来减少运行时开销。 |
| 否 |
| String | 选择要接收的数据记录。支持的值有:
|
| 否 |
| Boolean | 允许检查点等待,直到所有缓冲批次都完全耗尽。如果设置为 false,则所有缓冲批次将在每个检查点期间写入检查点状态。如果已知数据刷新到表的速度很快,建议将其设置为 true。 |
| 否 |
| Duration | 刷新指标的固定间隔。 该时间最少为 |
| 否 |
| String | 记录指标的日志级别。 这对应于 Log4J 内置的标准日志级别。 |
| 否 | (none) | Duration | 时间戳数据的附加时间偏移量。 |
| 否 |
| Integer | 异步查找的比例因子。如果设置为小于 2,查找将以同步 (SYNC) 模式运行。如果设置为 n(n >= 2),查找将以异步 (ASYNC) 模式运行,每个查找实例的最大并发数等于 n。 |
| 否 |
| Long | 数据表的查找缓存的最大行数。如果设置为 0,则禁用缓存。 |
| 否 | (none) | Duration | ByteHouse 表的查找缓存的 TTL。如果未指定,则不为缓存记录设置 TTL。 |
| 否 |
| Integer | 查找操作失败时允许的最大重试次数。 |