Flink Connector for ByteHouse 连接器专门用于通过 Flink 将数据加载到 ByteHouse。本文为您介绍如何使用Flink Connector连接ByteHouse,并为您提供了典型场景下数据写入ByteHouse的使用示例。
Flink Connector 可以通过 Table API&SQL 和 Flink 的 DataStreamAPI 两种方式连接ByteHouse并处理数据。连接ByteHouse前,您需根据下文引导下载安装对应版本的驱动,并添加对应的依赖。完成准备工作后即可使用Connector连接ByteHouse进行数据处理。
Flink 版本 | 驱动程序 | 发布日期 |
---|---|---|
1.18 | 2025-01-13 | |
1.17 | 2025-01-13 | |
1.16 | 2025-01-13 | |
1.15 | 2025-01-13 | |
1.13 | 2024-03-25 | |
1.11 | 2024-10-23 |
注意
请使用与 Flink 版本相匹配的驱动,以保障功能正常使用。
要将下载的flink-sql-connector-bytehouse-ce-${flink-sql-connector-bytehouse-ce.version}.jar
文件安装到本地Maven存储库中,请运行以下命令:
mvn install:install-file \ -Dfile=${your_path}/flink-sql-connector-bytehouse-ce-${flink-sql-connector-bytehouse-ce.version}.jar \ -DgroupId=com.bytedance.bytehouse \ -DartifactId=flink-sql-connector-bytehouse-ce \ -Dversion=${flink-sql-connector-bytehouse-ce.version} \ -Dpackaging=jar
对于要使用 Flink connector 连接器进行编译的 Maven 项目,请将以下依赖项添加到项目的pom.xml
文件中。scala.version
是2.11或2.12,与 Flink 发行版的关联 Scala 版本相对应。${flink-sql-connector-bytehouse-ce.version}
可以设置为所需的 Flink connector 连接器版本。
<dependency> <groupId>com.bytedance.bytehouse</groupId> <artifactId>flink-sql-connector-bytehouse-ce</artifactId> <version>${flink-sql-connector-bytehouse-ce.version}</version> </dependency>
CREATE TABLE bh_ce_sink ( f0 VARCHAR, f1 VARCHAR, f2 VARCHAR ) WITH ( 'connector' = 'bytehouse-ce', 'clickhouse.cluster' = 'bytehouse_cluster_***', 'clickhouse.shard-discovery.service.host' = '7249621***.bytehouse-ce.ivolces.com', 'username' = 'user_a', 'password' = 'pa***45', 'database' = 'default', 'table-name' = 'doc_test', 'sink.buffer-flush.interval' = '10 second', 'sink.buffer-flush.max-rows' = '5000', 'clickhouse.shard-discovery.address-mapping' = '192.18.*.*|8123:192.168.*.*|8123' );
选项 | 是否必选 | 默认 | 类型 | 描述 |
---|---|---|---|---|
| 是 | 无 | String | 指定要使用的连接器。需配置为 |
| 是 | 无 | String | 要连接ByteHouse企业版数据库的名称。 |
| 是 | 无 | String | 要连接ByteHouse企业版表的名称。 |
| 是 | 无 | String | ByteHouse群集名称。 |
| 否 |
| String | ByteHouse分片发现的类型。支持的值为:
|
| 否 |
| String | ByteHouse 分片发现服务的主机名。 |
| 否 |
| Integer | ByteHouse分片发现服务的端口号。 |
| 否 | 无 | Map | 已发现分片的地址映射。 |
| 否 | 内置 | String | 用于检索分片信息的 ByteHouse API。 |
| 否 | 无 | String | 调用ByteHouseAPI的火山引擎主账号ID。 |
| 否 | 无 | String | ByteHouse 企业版 网关的主机。 |
| 否 |
| Integer | ByteHouse 企业版 网关的端口。 |
| 否 | 无 | String | JDBC 连接用户名。 |
| 否 | 无 | String | JDBC 连接密码。 |
| 否 |
| Duration | 通过JDBC执行查询的超时设置。 |
| 否 | 无 | String | 预下沉分组的key。key可以由多个字段组成,用逗号分隔。 |
| 否 | 无 | String | 预接收器分组的表达式。如果设置了它,所有涉及的字段名也必须列在 |
| 否 | 无 | Integer | 记录的预接收器分组限定的组数。取值不能小于
|
| 否 | 无 | String | ByteHouse 分布式分片策略。支持的值有:
|
| 否 | 无 | String | 哈希分区的键。 键可以包含多个字段,以逗号分隔。 |
| 否 | 无 | String | 哈希分区的表达式。 如果设置此项,则所有涉及的字段名称也必须在分区键 |
| 否 |
| Duration | 两次批量刷新之间的最大间隔。最小值为 |
| 否 |
| Integer | 刷新前缓冲记录的最大值。 该值最少为 |
| 否 |
| Integer | 通过异步刷新触发过载预防的待处理批次数量的阈值。 该值最小为 |
| 否 |
| Integer | 刷新数据失败时的最大重试次数。 设置为 |
| 否 | 无 | Integer | 刷新数据的最大并行度。默认情况下,并行度由框架使用上游链式运算符的相同并行度来确定。通常,建议与分片数相同。 |
| 否 |
| Boolean | 指定是否启用主动数据验证(即在添加到批次之前要验证的每个记录)。默认情况下,被动数据验证(即仅在数据刷新尝试失败时触发数据验证)用于减少运行时开销。 |
| 否 |
| String | 选择要接收的数据记录。支持的值为:
如果此值设置为 |
| 否 |
| Duration | 刷新指标的固定间隔。最短为 |
| 否 |
| String | 日志度量的日志级别。这对应于Log4J内置的标准日志级别。 |
| 否 | 无 | Duration | 时间戳数据的附加时间偏移。 |
| 否 |
| Integer | 异步查找的比例因子。
|
| 否 |
| Long | 数据表的查找缓存的最大行数。如果设置为 0,则禁用缓存。 |
| 否 | 无 | Duration | ByteHouse 表的查找缓存的 TTL。如果未指定,则不为缓存记录设置 TTL。 |
| 否 |
| Integer | 查找操作失败时允许的最大重试次数。 |
| 否 |
| Duration | 连续尝试刷新数据之间的最大退避时间。 |
| 否 |
| Long | 每秒可处理的最大行数。如果设置为非正值,则禁用速率限制。 |
Fink SQL 字段类型 | ByteHouse 字段类型 |
---|---|
TINYINT | Int8 |
SMALLINT | Int16 |
INT | Int32 |
BIGINT | Int64 |
DECIMAL(20, 0) | UInt64 |
FLOAT | Float32 |
DOUBLE | Float64 |
DECIMAL(p, s) | Decimal(P, S) |
BOOLEAN | Int8 |
DATE | Date |
TIME [(p)] [WITHOUT TIMEZONE] | 暂不支持,请使用 String 类型替代 |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] | DateTime |
STRING | String |
BYTES | |
ARRAY | ARRAY |
以下为一个将AWS Kinesis Data Stream表单(bh_ce_source)中的数据加载到ByteHouse CE表(bh_ce_sink)中的示例。
CREATE TEMPORARY TABLE `bh_ce_source` ( -- source table `id` BIGINT NOT NULL, `time` TIMESTAMP(0), `content` ARRAY<DECIMAL(20, 0)> ) WITH ( 'connector' = 'kinesis', 'stream' = 'demo_stream', 'format' = 'json', 'aws.region' = 'cn-north-1', 'aws.credentials.provider' = 'BASIC', 'aws.credentials.basic.accesskeyid' = '???', 'aws.credentials.basic.secretkey' = '???', 'scan.shard.getrecords.maxretries' = '7' ); CREATE TEMPORARY TABLE `bh_ce_sink` ( -- sink table `id` STRING NOT NULL, `event_time` STRING, `content` ARRAY<DECIMAL(20, 0)> ) WITH ( 'connector' = 'bytehouse-ce', 'clickhouse.shard-discovery.kind' = 'CE_API_CLUSTERS', 'clickhouse.shard-discovery.service.host' = '7314499681491849526.bytehouse-ce.ivolces.com', 'clickhouse.shard-discovery.service.port' = '80', 'bytehouse-ce.api.get-shard-info' = '/openapi/v1/clusters/{cluster_name}/nodes', 'bytehouse-ce.api.account-id' = '<< your Volcano account ID >>', 'username' = '<< your ByteHouse CE username >>', 'password' = '<< your ByteHouse CE password >>', 'sink.overload.log-level' = 'DEBUG', 'sink.buffer-flush.interval' = '1 second', 'sink.buffer-flush.max-rows' = '5000', 'sink.parallelism' = '10', -- Recommended to be no more than the number of shards 'clickhouse.cluster' = '<< your cluster name of ByteHouse CE >>', 'database' = 'my_db', 'table-name' = 'my_table_local', -- MUST BE LOCAL TABLE 'sink.group-by.key' = 'event_time', 'sink.group-by.expression' = 'cityHash64(event_time)', 'sink.group-by.number-of-groups' = '16', -- Need to be n (n > 0) times of the number of shards 'sharding-strategy' = 'HASH', 'sharding-key' = 'event_time', 'sharding-expression' = 'cityHash64(event_time)' ); INSERT INTO `bh_ce_sink` ( -- data loading `id`, `event_time`, `content` ) SELECT IFNULL(CAST(`id` AS STRING), '43'), -- type casting with default value CAST(`time` AS STRING), -- type casting IFNULL(`content`, ARRAY[123,12345]) -- default value FROM `bh_ce_source`
下面是通过FlinkSQL采用ByteHouse CE表进行查找的示例。
CREATE TABLE main_table ( some_id BIGINT, proctime AS PROCTIME() ) WITH ( 'connector' = 'datagen', 'fields.some_id.kind' = 'random', 'fields.some_id.min' = '0', 'fields.some_id.max' = '799999', 'rows-per-second' = '1000', 'number-of-rows' = '100000' ); CREATE TABLE lookup_table ( aiid VARCHAR, content VARCHAR, PRIMARY KEY (aiid) NOT ENFORCED ) WITH ( 'connector' = 'bytehouse-ce', ---- Connection settings ---- 'clickhouse.shard-discovery.kind' = 'CE_API_CLUSTERS', 'clickhouse.shard-discovery.service.host' = '7314499681491849526.bytehouse-ce.ivolces.com', 'clickhouse.shard-discovery.service.port' = '80', 'bytehouse-ce.api.get-shard-info' = '/openapi/v1/clusters/{cluster_name}/nodes', 'bytehouse-ce.api.account-id' = '<< your Volcano account ID >>', 'username' = '<< your ByteHouse CE username >>', 'password' = '<< your ByteHouse CE password >>', ---- Lookup settings ---- 'lookup.async.scale-factor' = '8', -- set '1' to disable asynchronous lookup, i.e. SYNC mode 'lookup.cache.max-rows' = '0', -- set '0' to disable caching 'lookup.cache.ttl' = '1 minute', 'bytehouse-ce.enable-query-log' = 'false', -- set 'true' for debugging ---- Table settings ---- 'clickhouse.cluster' = '<< your cluster name of ByteHouse CE >>', 'database' = 'my_db', 'table-name' = 'my_lookup_all' -- NEED TO BE DISTRIBUTED TABLE 'sharding-strategy' = 'HASH', 'sharding-key' = 'aiid', 'sharding-expression' = 'cityHash64(aiid)' -- same as that set for the distributed table ); CREATE TABLE print_sink ( some_id BIGINT, content VARCHAR ) WITH ( 'connector' = 'print' ); INSERT INTO print_sink SELECT m.some_id, l.content FROM main_table AS m JOIN lookup_table FOR SYSTEM_TIME AS OF m.proctime AS l ON CAST(m.some_id AS VARCHAR) = l.aiid;
package bytehouse.flink.connectoer.demo; import com.bytedance.bytehouse.flink.connector.clickhouse.ClickHouseSinkFunction; import com.bytedance.bytehouse.flink.connector.clickhouse.api.java.ClickHouseSinkFunctionBuilder; import com.bytedance.bytehouse.flink.table.api.RowDataConstructor; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator; import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import java.time.Duration; import java.time.LocalDateTime; import java.util.Arrays; import java.util.List; import java.util.Random; public class AcceptLogSinkDataStreamExample { public static void main(String[] args) throws Exception { final String gatewayHost = args[0]; final String username = args[1]; final String password = args[2]; final String clusterName = args[3]; final String dbName = args[4]; final String tableName = args[5]; final long rowsPerSecond = Long.parseLong(args[6]); final long numberOfRows = Long.parseLong(args[7]); // Creating the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Adding a source to the data stream DataStream<RowData> dataStream = env.addSource( new DataGeneratorSource<>(new RowDataGenerator(), rowsPerSecond, numberOfRows)) .returns(TypeInformation.of(RowData.class)); // List of columns representing the table schema List<Column> columns = Arrays.asList( Column.physical("create_time", DataTypes.TIMESTAMP(0)), Column.physical("completion_id", DataTypes.STRING()), Column.physical("device_mid", DataTypes.STRING())); try (@SuppressWarnings("unchecked") ClickHouseSinkFunction<RowData> sink = new ClickHouseSinkFunctionBuilder.Upsert(clusterName, dbName, tableName) .withSchema(columns) .withShardDiscoveryKind("CE_GATEWAY") .withGateway(gatewayHost) .withAccount(username, password) .withPrimaryKey(Arrays.asList("completion_id")) .withShardingKey("completion_id") .withFlushInterval(Duration.ofSeconds(1)) .build()) { // Add the sink to the data stream dataStream.addSink(sink); // Trigger the execution env.execute(); } } static class RowDataGenerator implements DataGenerator<RowData> { private final Random random = new Random(); private final RowDataConstructor rowDataConstructor = RowDataConstructor.apply( new DataType[] {DataTypes.TIMESTAMP(0), DataTypes.STRING(), DataTypes.STRING()}); @Override public RowData next() { final Object[] rowDataFields = { LocalDateTime.now(), "CID-" + random.nextInt(1000), "DMID-" + random.nextInt(1000) }; return rowDataConstructor.constructInsert(rowDataFields); } @Override public void open( String name, FunctionInitializationContext context, RuntimeContext runtimeContext) throws Exception { // Initialization code here (if needed) } @Override public boolean hasNext() { return true; // Continuous generation } } }