You need to enable JavaScript to run this app.
导航
Flink Connector
最近更新时间:2025.01.14 20:47:54首次发布时间:2024.09.20 18:02:26

Flink Connector for ByteHouse 连接器专门用于通过 Flink 将数据加载到 ByteHouse。本文为您介绍如何使用Flink Connector连接ByteHouse,并为您提供了典型场景下数据写入ByteHouse的使用示例。

背景信息

Flink Connector 可以通过 Table API&SQL 和 Flink 的 DataStreamAPI 两种方式连接ByteHouse并处理数据。连接ByteHouse前,您需根据下文引导下载安装对应版本的驱动,并添加对应的依赖。完成准备工作后即可使用Connector连接ByteHouse进行数据处理。

准备工作

下载驱动

Flink 版本

驱动程序

发布日期

1.18

flink-sql-connector-bytehouse-ce_2.12-1.27.117-1.18.jar
未知大小

2025-01-13

1.17

flink-sql-connector-bytehouse-ce_2.12-1.27.117-1.17.jar
未知大小

2025-01-13

1.16

flink-sql-connector-bytehouse-ce_2.12-1.27.117-1.16.jar
未知大小

2025-01-13

1.15

flink-sql-connector-bytehouse-ce_2.12-1.27.117-1.15.jar
未知大小

2025-01-13

1.13
(Scala版本: 2.11及以上)

flink-sql-connector-bytehouse-ce-1.27.43-1.13.jar
未知大小

2024-03-25

1.11
(Scala版本: 2.11及以上)

flink-sql-connector-bytehouse-ce_2.11-1.27.104-1.11.jar
未知大小

2024-10-23

注意

请使用与 Flink 版本相匹配的驱动,以保障功能正常使用。

本地安装

要将下载的flink-sql-connector-bytehouse-ce-${flink-sql-connector-bytehouse-ce.version}.jar文件安装到本地Maven存储库中,请运行以下命令:

mvn install:install-file \
    -Dfile=${your_path}/flink-sql-connector-bytehouse-ce-${flink-sql-connector-bytehouse-ce.version}.jar \
    -DgroupId=com.bytedance.bytehouse \
    -DartifactId=flink-sql-connector-bytehouse-ce \
    -Dversion=${flink-sql-connector-bytehouse-ce.version} \
    -Dpackaging=jar

添加Maven依赖

对于要使用 Flink connector 连接器进行编译的 Maven 项目,请将以下依赖项添加到项目的pom.xml文件中。scala.version是2.11或2.12,与 Flink 发行版的关联 Scala 版本相对应。${flink-sql-connector-bytehouse-ce.version}可以设置为所需的 Flink connector 连接器版本。

<dependency>
    <groupId>com.bytedance.bytehouse</groupId>
    <artifactId>flink-sql-connector-bytehouse-ce</artifactId>
    <version>${flink-sql-connector-bytehouse-ce.version}</version>
</dependency>

使用说明

DDL定义

CREATE TABLE bh_ce_sink (
  f0 VARCHAR,
  f1 VARCHAR,
  f2 VARCHAR
) WITH (
  'connector' = 'bytehouse-ce',
  'clickhouse.cluster' = 'bytehouse_cluster_***',
  'clickhouse.shard-discovery.service.host' = '7249621***.bytehouse-ce.ivolces.com',
  'username' = 'user_a',
  'password' = 'pa***45',
  'database' = 'default',
  'table-name' = 'doc_test',
  'sink.buffer-flush.interval' = '10 second',
  'sink.buffer-flush.max-rows' = '5000',
  'clickhouse.shard-discovery.address-mapping' = '192.18.*.*|8123:192.168.*.*|8123'
);

With参数说明

选项

是否必选

默认

类型

描述

connector

String

指定要使用的连接器。需配置为bytehouse-ce(ByteHouse企业版) 或clickhouse

database

String

要连接ByteHouse企业版数据库的名称。

table-name

String

要连接ByteHouse企业版表的名称。
请注意,这里的“表”是指每个分片的本地表。

clickhouse.cluster

String

ByteHouse群集名称。

clickhouse.shard-discovery.kind

SYSTEM_CLUSTERS

String

ByteHouse分片发现的类型。支持的值为:

  • SYSTEM_CLUSTERS:通过system.cluster检索分片IP。
  • CE_GATEWAY:通过ByteHouse企业版网关检索分片IP。通常在通过公共网络连接时可使用此种方式进行跟踪,但不建议在生产环境中使用。
  • CE_API_CLUSTERS:通过ByteHouse企业版特定OpenAPI检索分片IP。建议在生产中使用。

clickhouse.shard-discovery.service.host

127.0.0.1

String

ByteHouse 分片发现服务的主机名。

clickhouse.shard-discovery.service.port

8123

Integer

ByteHouse分片发现服务的端口号。

clickhouse.shard-discovery.address-mapping

Map

已发现分片的地址映射。

bytehouse-ce.api.get-shard-info

内置

String

用于检索分片信息的 ByteHouse API。

bytehouse-ce.api.account-id

String

调用ByteHouseAPI的火山引擎主账号ID。

bytehouse.ce.gateway.host

String

ByteHouse 企业版 网关的主机。

bytehouse.ce.gateway.port

8123

Integer

ByteHouse 企业版 网关的端口。

username

String

JDBC 连接用户名。

password

String

JDBC 连接密码。

jdbc.query.timeout

10 minutes

Duration

通过JDBC执行查询的超时设置。

sink.group-by.key

String

预下沉分组的key。key可以由多个字段组成,用逗号分隔。

sink.group-by.expression

String

预接收器分组的表达式。如果设置了它,所有涉及的字段名也必须列在sink.group-by.key中。

sink.group-by.number-of-groups

Integer

记录的预接收器分组限定的组数。取值不能小于sink.parallelism

  • 如果没有设置,则取sink.parallelism的取值。
  • 建议将此值设置为分片数的倍数,其中倍数是正整数(例如,1、2、3)或2的负幂(例如,1/2、1/4、1/8)。

sharding-strategy

String

ByteHouse 分布式分片策略。支持的值有:

  • NONE:不进行写入分区。
  • RANDOM:随机写入。
  • ROUND_ROBIN:顺序写入(轮询)。
  • HASH:根据 shard_key 进行哈希写入。

sharding-key

String

哈希分区的键。 键可以包含多个字段,以逗号分隔。

sharding-expression

String

哈希分区的表达式。 如果设置此项,则所有涉及的字段名称也必须在分区键partition-key中列出。

sink.buffer-flush.interval

1 second

Duration

两次批量刷新之间的最大间隔。最小值为200 ms

sink.buffer-flush.max-rows

50,000

Integer

刷新前缓冲记录的最大值。 该值最少为 100

sink.buffer-flush.max-batches

6

Integer

通过异步刷新触发过载预防的待处理批次数量的阈值。 该值最小为 1

sink.max-retries

3

Integer

刷新数据失败时的最大重试次数。 设置为-1意味着无限次重试。

sink.parallelism

Integer

刷新数据的最大并行度。默认情况下,并行度由框架使用上游链式运算符的相同并行度来确定。通常,建议与分片数相同。

sink.proactive-validate

false

Boolean

指定是否启用主动数据验证(即在添加到批次之前要验证的每个记录)。默认情况下,被动数据验证(即仅在数据刷新尝试失败时触发数据验证)用于减少运行时开销。

sink.mode

insert

String

选择要接收的数据记录。支持的值为:

  • insert:只接受INSERT的记录。
  • upsert:接受INSERTUPDATE_AFTERDELETE的记录。
  • upsert-no-delete:接受INSERTUPDATE_AFTER的记录。
  • upsert-all:接受各种记录。
  • upsert-all-no-delete:接受除DELETE以外的所有记录。
  • delete-only:只接受DELETE的记录。

如果此值设置为insert以外的任何值,则ByteHouse表必须包含UNIQUE KEY子句。此外,相应Flink表的PRIMARY KEY必须包括作为ByteHouse表中UNIQUE KEY一部分的所有列名。

metrics.update-interval

5 seconds

Duration

刷新指标的固定间隔。最短为5秒

metrics.log-level

INFO

String

日志度量的日志级别。这对应于Log4J内置的标准日志级别

timestamp-offset

Duration

时间戳数据的附加时间偏移。

lookup.async.scale-factor

1

Integer

异步查找的比例因子。

  • 如果设置为小于2,则查找以同步(SYNC)模式运行。
  • 如果设置为nn>=2),则查找以异步(ASYNC)模式运行,每个查找实例的最大并发等于n

lookup.cache.max-rows

0

Long

数据表的查找缓存的最大行数。如果设置为 0,则禁用缓存。

lookup.cache.ttl

Duration

ByteHouse 表的查找缓存的 TTL。如果未指定,则不为缓存记录设置 TTL。

lookup.max-retries

3

Integer

查找操作失败时允许的最大重试次数。

jdbc.max-retry-backoff

32 seconds

Duration

连续尝试刷新数据之间的最大退避时间。
请注意,批量刷新的重试使用指数退避,从1秒延迟开始。

max-rows-per-second

0

Long

每秒可处理的最大行数。如果设置为非正值,则禁用速率限制。
可选用于lookup和sink,现在不适用于source。

数据类型映射

Fink SQL 字段类型

ByteHouse 字段类型

TINYINT

Int8

SMALLINT

Int16

INT

Int32

BIGINT

Int64

DECIMAL(20, 0)

UInt64

FLOAT

Float32

DOUBLE

Float64

DECIMAL(p, s)

Decimal(P, S)

BOOLEAN

Int8

DATE

Date

TIME [(p)] [WITHOUT TIMEZONE]

暂不支持,请使用 String 类型替代

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

DateTime

STRING

String

BYTES

ARRAY

ARRAY

使用示例

结果表示例

以下为一个将AWS Kinesis Data Stream表单(bh_ce_source)中的数据加载到ByteHouse CE表(bh_ce_sink)中的示例。

CREATE TEMPORARY TABLE `bh_ce_source` (  -- source table
  `id` BIGINT NOT NULL,
  `time` TIMESTAMP(0),
  `content` ARRAY<DECIMAL(20, 0)>
) WITH (
  'connector' = 'kinesis',
  'stream' = 'demo_stream',
  'format' = 'json',
  'aws.region' = 'cn-north-1',
  'aws.credentials.provider' = 'BASIC',
  'aws.credentials.basic.accesskeyid' = '???',
  'aws.credentials.basic.secretkey' = '???',
  'scan.shard.getrecords.maxretries' = '7'
);

CREATE TEMPORARY TABLE `bh_ce_sink` (  -- sink table
  `id` STRING NOT NULL,
  `event_time` STRING,
  `content` ARRAY<DECIMAL(20, 0)>
) WITH (
  'connector' = 'bytehouse-ce',
  'clickhouse.shard-discovery.kind' = 'CE_API_CLUSTERS',
  'clickhouse.shard-discovery.service.host' = '7314499681491849526.bytehouse-ce.ivolces.com',
  'clickhouse.shard-discovery.service.port' = '80',
  'bytehouse-ce.api.get-shard-info' = '/openapi/v1/clusters/{cluster_name}/nodes',
  'bytehouse-ce.api.account-id' = '<< your Volcano account ID >>',
  'username' = '<< your ByteHouse CE username >>',
  'password' = '<< your ByteHouse CE password >>',
  'sink.overload.log-level' = 'DEBUG',
  'sink.buffer-flush.interval' = '1 second',
  'sink.buffer-flush.max-rows' = '5000',
  'sink.parallelism' = '10',  -- Recommended to be no more than the number of shards
  'clickhouse.cluster' = '<< your cluster name of ByteHouse CE >>',
  'database' = 'my_db',
  'table-name' = 'my_table_local',  -- MUST BE LOCAL TABLE
  'sink.group-by.key' = 'event_time',
  'sink.group-by.expression' = 'cityHash64(event_time)',
  'sink.group-by.number-of-groups' = '16',  -- Need to be n (n > 0) times of the number of shards
  'sharding-strategy' = 'HASH',
  'sharding-key' = 'event_time',
  'sharding-expression' = 'cityHash64(event_time)'
);

INSERT INTO `bh_ce_sink` (  -- data loading
  `id`,
  `event_time`,
  `content`
)
SELECT
  IFNULL(CAST(`id` AS STRING), '43'),  -- type casting with default value
  CAST(`time` AS STRING),  -- type casting
  IFNULL(`content`, ARRAY[123,12345])  -- default value
FROM `bh_ce_source`

维表示例

下面是通过FlinkSQL采用ByteHouse CE表进行查找的示例。

CREATE TABLE main_table (
  some_id BIGINT,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'datagen',
  'fields.some_id.kind' = 'random',
  'fields.some_id.min' = '0',
  'fields.some_id.max' = '799999',
  'rows-per-second' = '1000',
  'number-of-rows' = '100000'
);

CREATE TABLE lookup_table (
  aiid VARCHAR,
  content VARCHAR,
  PRIMARY KEY (aiid) NOT ENFORCED
) WITH (
  'connector' = 'bytehouse-ce',
  ---- Connection settings ----
  'clickhouse.shard-discovery.kind' = 'CE_API_CLUSTERS',
  'clickhouse.shard-discovery.service.host' = '7314499681491849526.bytehouse-ce.ivolces.com',
  'clickhouse.shard-discovery.service.port' = '80',
  'bytehouse-ce.api.get-shard-info' = '/openapi/v1/clusters/{cluster_name}/nodes',
  'bytehouse-ce.api.account-id' = '<< your Volcano account ID >>',
  'username' = '<< your ByteHouse CE username >>',
  'password' = '<< your ByteHouse CE password >>',
  ---- Lookup settings ----
  'lookup.async.scale-factor' = '8', -- set '1' to disable asynchronous lookup, i.e. SYNC mode
  'lookup.cache.max-rows' = '0', -- set '0' to disable caching
  'lookup.cache.ttl' = '1 minute',
  'bytehouse-ce.enable-query-log' = 'false', -- set 'true' for debugging
  ---- Table settings ----
  'clickhouse.cluster' = '<< your cluster name of ByteHouse CE >>',
  'database' = 'my_db',
  'table-name' = 'my_lookup_all'  -- NEED TO BE DISTRIBUTED TABLE
  'sharding-strategy' = 'HASH',
  'sharding-key' = 'aiid',
  'sharding-expression' = 'cityHash64(aiid)' -- same as that set for the distributed table
);

CREATE TABLE print_sink (
  some_id BIGINT,
  content VARCHAR
) WITH (
  'connector' = 'print'
);

INSERT INTO print_sink
SELECT
  m.some_id,
  l.content
FROM
  main_table AS m
JOIN
  lookup_table FOR SYSTEM_TIME AS OF m.proctime AS l
ON
  CAST(m.some_id AS VARCHAR) = l.aiid;

结果表示例

package bytehouse.flink.connectoer.demo;
import com.bytedance.bytehouse.flink.connector.clickhouse.ClickHouseSinkFunction;
import com.bytedance.bytehouse.flink.connector.clickhouse.api.java.ClickHouseSinkFunctionBuilder;
import com.bytedance.bytehouse.flink.table.api.RowDataConstructor;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
public class AcceptLogSinkDataStreamExample {
    public static void main(String[] args) throws Exception {
        final String gatewayHost = args[0];
        final String username = args[1];
        final String password = args[2];
        final String clusterName = args[3];
        final String dbName = args[4];
        final String tableName = args[5];
        final long rowsPerSecond = Long.parseLong(args[6]);
        final long numberOfRows = Long.parseLong(args[7]);
        // Creating the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Adding a source to the data stream
        DataStream<RowData> dataStream =
                env.addSource(
                                new DataGeneratorSource<>(new RowDataGenerator(), rowsPerSecond, numberOfRows))
                        .returns(TypeInformation.of(RowData.class));
        // List of columns representing the table schema
        List<Column> columns =
                Arrays.asList(
                        Column.physical("create_time", DataTypes.TIMESTAMP(0)),
                        Column.physical("completion_id", DataTypes.STRING()),
                        Column.physical("device_mid", DataTypes.STRING()));
        try (@SuppressWarnings("unchecked")
             ClickHouseSinkFunction<RowData> sink =
                     new ClickHouseSinkFunctionBuilder.Upsert(clusterName, dbName, tableName)
                             .withSchema(columns)
                             .withShardDiscoveryKind("CE_GATEWAY")
                             .withGateway(gatewayHost)
                             .withAccount(username, password)
                             .withPrimaryKey(Arrays.asList("completion_id"))
                             .withShardingKey("completion_id")
                             .withFlushInterval(Duration.ofSeconds(1))
                             .build()) {
            // Add the sink to the data stream
            dataStream.addSink(sink);
            // Trigger the execution
            env.execute();
        }
    }
    static class RowDataGenerator implements DataGenerator<RowData> {
        private final Random random = new Random();
        private final RowDataConstructor rowDataConstructor =
                RowDataConstructor.apply(
                        new DataType[] {DataTypes.TIMESTAMP(0), DataTypes.STRING(), DataTypes.STRING()});
        @Override
        public RowData next() {
            final Object[] rowDataFields = {
                    LocalDateTime.now(), "CID-" + random.nextInt(1000), "DMID-" + random.nextInt(1000)
            };
            return rowDataConstructor.constructInsert(rowDataFields);
        }
        @Override
        public void open(
                String name, FunctionInitializationContext context, RuntimeContext runtimeContext)
                throws Exception {
            // Initialization code here (if needed)
        }
        @Override
        public boolean hasNext() {
            return true; // Continuous generation
        }
    }
}